Skip to content

Commit

Permalink
fix(cluster): remove node immediately when slots are redistributed
Browse files Browse the repository at this point in the history
Close: #930
  • Loading branch information
luin committed Jul 16, 2019
1 parent ec9acea commit b430d0e
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 3 deletions.
18 changes: 15 additions & 3 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,9 +93,7 @@ export default class ConnectionPool extends EventEmitter {
this.nodes[readOnly ? "slave" : "master"][key] = redis;

redis.once("end", () => {
delete this.nodes.all[key];
delete this.nodes.master[key];
delete this.nodes.slave[key];
this.removeNode(key);
this.emit("-node", redis, key);
if (!Object.keys(this.nodes.all).length) {
this.emit("drain");
Expand All @@ -112,6 +110,19 @@ export default class ConnectionPool extends EventEmitter {
return redis;
}

/**
* Remove a node from the pool.
*/
private removeNode(key: string): void {
const { nodes } = this;
if (nodes.all[key]) {
debug("Remove %s from the pool", key);
delete nodes.all[key];
}
delete nodes.master[key];
delete nodes.slave[key];
}

/**
* Reset the pool with a set of nodes.
* The old node will be removed.
Expand All @@ -136,6 +147,7 @@ export default class ConnectionPool extends EventEmitter {
if (!newNodes[key]) {
debug("Disconnect %s because the node does not hold any slot", key);
this.nodes.all[key].disconnect();
this.removeNode(key);
}
});
Object.keys(newNodes).forEach(key => {
Expand Down
9 changes: 9 additions & 0 deletions test/unit/clusters/ConnectionPool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,14 @@ describe("ConnectionPool", () => {
expect(stub.callCount).to.eql(2);
expect(stub.firstCall.args[1]).to.eql(false);
});

it("remove the node immediately instead of waiting for 'end' event", () => {
const pool = new ConnectionPool({});
pool.reset([{ host: "127.0.0.1", port: 300001 }]);
expect(pool.getNodes().length).to.eql(1);

pool.reset([]);
expect(pool.getNodes().length).to.eql(0);
});
});
});

0 comments on commit b430d0e

Please sign in to comment.