Skip to content

Commit

Permalink
fix(cluster): make blocking commands works with cluster (#867)
Browse files Browse the repository at this point in the history
As discussed in #850, we should use a separate connection for refreshing slots to avoid conflict with blocking commands invoked by users.

Close #850
  • Loading branch information
luin authored May 15, 2019
1 parent b323c1c commit 68db71b
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 20 deletions.
45 changes: 32 additions & 13 deletions lib/cluster/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,27 @@ class Cluster extends EventEmitter {
this.offlineQueue = new Deque()
}

clearNodesRefreshInterval() {
if (this.slotsTimer) {
clearTimeout(this.slotsTimer)
this.slotsTimer = null
}
}

resetNodesRefreshInterval() {
if (this.slotsTimer) {
return
}
this.slotsTimer = setInterval(function () {
this.refreshSlotsCache()
}.bind(this), this.options.slotsRefreshInterval)
const nextRound = () => {
this.slotsTimer = setTimeout(() => {
debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)')
this.refreshSlotsCache(() => {
nextRound()
})
}, this.options.slotsRefreshInterval)
}

nextRound()
}

/**
Expand Down Expand Up @@ -245,10 +259,7 @@ class Cluster extends EventEmitter {
this.reconnectTimeout = null
debug('Canceled reconnecting attempts')
}
if (this.slotsTimer) {
clearInterval(this.slotsTimer)
this.slotsTimer = null
}
this.clearNodesRefreshInterval()

this.subscriber.stop()
if (status === 'wait') {
Expand Down Expand Up @@ -276,10 +287,7 @@ class Cluster extends EventEmitter {
clearTimeout(this.reconnectTimeout)
this.reconnectTimeout = null
}
if (this.slotsTimer) {
clearInterval(this.slotsTimer)
this.slotsTimer = null
}
this.clearNodesRefreshInterval()

this.subscriber.stop()

Expand Down Expand Up @@ -471,6 +479,7 @@ class Cluster extends EventEmitter {
}
_this.connectionPool.findOrCreate(_this.natMapper(key))
tryConnection()
debug('refreshing slot caches... (triggered by MOVED error)')
_this.refreshSlotsCache()
},
ask: function (slot, key) {
Expand Down Expand Up @@ -608,9 +617,19 @@ class Cluster extends EventEmitter {
if (!redis) {
return callback(new Error('Node is disconnected'))
}
redis.cluster('slots', timeout((err, result) => {

// Use a duplication of the connection to avoid
// timeouts when the connection is in the blocking
// mode (e.g. waiting for BLPOP).
const duplicatedConnection = redis.duplicate({
enableOfflineQueue: true,
enableReadyCheck: false,
retryStrategy: null,
connectionName: 'ioredisClusterRefresher'
})
duplicatedConnection.cluster('slots', timeout((err, result) => {
duplicatedConnection.disconnect()
if (err) {
redis.disconnect()
return callback(err)
}
if (this.status === 'disconnecting' || this.status === 'close' || this.status === 'end') {
Expand Down
12 changes: 9 additions & 3 deletions test/functional/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,12 @@ describe('cluster', function () {
return 0;
}
});
let hasDone = false
new MockServer(30002, function () {
if (hasDone) {
return
}
hasDone = true
client.disconnect();
done();
});
Expand Down Expand Up @@ -250,8 +255,8 @@ describe('cluster', function () {
describe('#nodes()', function () {
it('should return the corrent nodes', function (done) {
var slotTable = [
[0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
[5461, 10922, ['127.0.0.1', 30002]]
[0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]],
[16382, 16383, ['127.0.0.1', 30002]]
];
var node = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
Expand All @@ -271,7 +276,8 @@ describe('cluster', function () {
});

var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);
cluster.on('ready', function () {
// Make sure 30001 has been connected
cluster.get('foo', function () {
expect(cluster.nodes()).to.have.lengthOf(3);
expect(cluster.nodes('all')).to.have.lengthOf(3);
expect(cluster.nodes('master')).to.have.lengthOf(2);
Expand Down
4 changes: 4 additions & 0 deletions test/functional/cluster/moved.js
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ describe('cluster:MOVED', function () {
if (argv[0] === 'get' && argv[1] === 'foo') {
expect(moved).to.eql(false);
moved = true;
slotTable[0][1] = 16381
slotTable[1][0] = 16382
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});
Expand Down Expand Up @@ -103,6 +105,8 @@ describe('cluster:MOVED', function () {
if (argv[0] === 'get' && argv[1] === 'foo') {
expect(moved).to.eql(false);
moved = true;
slotTable[0][1] = 16381
slotTable[1][0] = 16382
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});
Expand Down
6 changes: 3 additions & 3 deletions test/functional/cluster/quit.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ describe('cluster:quit', () => {
it('failed when quit returns error', function (done) {
const ERROR_MESSAGE = 'quit random error'
const slotTable = [
[0, 1000, ['127.0.0.1', 30001]],
[1001, 16383, ['127.0.0.1', 30002]]
[0, 16381, ['127.0.0.1', 30001]],
[16382, 16383, ['127.0.0.1', 30002]]
]
new MockServer(30001, function (argv, c) {
if (argv[0] === 'quit') {
Expand All @@ -49,7 +49,7 @@ describe('cluster:quit', () => {
const cluster = new Redis.Cluster([
{ host: '127.0.0.1', port: '30001' }
])
cluster.on('ready', () => {
cluster.get('foo', () => {
cluster.quit((err) => {
expect(err.message).to.eql(ERROR_MESSAGE)
cluster.disconnect()
Expand Down
2 changes: 1 addition & 1 deletion test/helpers/mock_server.js
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ MockServer.prototype.write = function (c, data) {

MockServer.prototype.findClientByName = function (name) {
for (const client of this.clients) {
if (client._connectionName === name) {
if (client && client._connectionName === name) {
return client
}
}
Expand Down

0 comments on commit 68db71b

Please sign in to comment.