From 718a2385d70e02353009a62ce57a37fa7bbb8d4e Mon Sep 17 00:00:00 2001 From: luin Date: Tue, 14 May 2019 00:41:40 +0800 Subject: [PATCH 1/2] fix(cluster): make blocking commands works with cluster Close #850 --- lib/cluster/index.ts | 43 +++++++++++++++++++++--------- test/functional/cluster/index.js | 2 +- test/functional/cluster/moved.js | 2 +- test/functional/cluster/pub_sub.js | 2 +- 4 files changed, 33 insertions(+), 16 deletions(-) diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index f7dc2c4c..165f122b 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -107,13 +107,27 @@ class Cluster extends EventEmitter { this.offlineQueue = new Deque() } + clearNodesRefreshInterval() { + if (this.slotsTimer) { + clearTimeout(this.slotsTimer) + this.slotsTimer = null + } + } + resetNodesRefreshInterval() { if (this.slotsTimer) { return } - this.slotsTimer = setInterval(function () { - this.refreshSlotsCache() - }.bind(this), this.options.slotsRefreshInterval) + const nextRound = () => { + this.slotsTimer = setTimeout(() => { + debug('refreshing slot caches... (triggered by "slotsRefreshInterval" option)') + this.refreshSlotsCache(() => { + nextRound() + }) + }, this.options.slotsRefreshInterval) + } + + nextRound() } /** @@ -245,10 +259,7 @@ class Cluster extends EventEmitter { this.reconnectTimeout = null debug('Canceled reconnecting attempts') } - if (this.slotsTimer) { - clearInterval(this.slotsTimer) - this.slotsTimer = null - } + this.clearNodesRefreshInterval() this.subscriber.stop() if (status === 'wait') { @@ -276,10 +287,7 @@ class Cluster extends EventEmitter { clearTimeout(this.reconnectTimeout) this.reconnectTimeout = null } - if (this.slotsTimer) { - clearInterval(this.slotsTimer) - this.slotsTimer = null - } + this.clearNodesRefreshInterval() this.subscriber.stop() @@ -608,9 +616,18 @@ class Cluster extends EventEmitter { if (!redis) { return callback(new Error('Node is disconnected')) } - redis.cluster('slots', timeout((err, result) => { + + // Use a duplication of the connection to avoid + // timeouts when the connection is in the blocking + // mode (e.g. waiting for BLPOP). + const duplicatedConnection = redis.duplicate({ + enableOfflineQueue: true, + enableReadyCheck: false, + retryStrategy: null + }) + duplicatedConnection.cluster('slots', timeout((err, result) => { + duplicatedConnection.disconnect() if (err) { - redis.disconnect() return callback(err) } if (this.status === 'disconnecting' || this.status === 'close' || this.status === 'end') { diff --git a/test/functional/cluster/index.js b/test/functional/cluster/index.js index c66a3618..86e23969 100644 --- a/test/functional/cluster/index.js +++ b/test/functional/cluster/index.js @@ -248,7 +248,7 @@ describe('cluster', function () { }); describe('#nodes()', function () { - it('should return the corrent nodes', function (done) { + it.skip('should return the corrent nodes', function (done) { var slotTable = [ [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], [5461, 10922, ['127.0.0.1', 30002]] diff --git a/test/functional/cluster/moved.js b/test/functional/cluster/moved.js index 3046e419..793b8759 100644 --- a/test/functional/cluster/moved.js +++ b/test/functional/cluster/moved.js @@ -1,7 +1,7 @@ var calculateSlot = require('cluster-key-slot'); describe('cluster:MOVED', function () { - it('should auto redirect the command to the correct nodes', function (done) { + it.skip('should auto redirect the command to the correct nodes', function (done) { var cluster; var moved = false; var times = 0; diff --git a/test/functional/cluster/pub_sub.js b/test/functional/cluster/pub_sub.js index 9ef5596d..2c4a373f 100644 --- a/test/functional/cluster/pub_sub.js +++ b/test/functional/cluster/pub_sub.js @@ -1,5 +1,5 @@ describe('cluster:pub/sub', function () { - it('should receive messages', function (done) { + it.skip('should receive messages', function (done) { var handler = function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { return [ From 969583c7d1f4f5cb76b6b1150d1d588aeba9f7b0 Mon Sep 17 00:00:00 2001 From: luin Date: Tue, 14 May 2019 23:45:09 +0800 Subject: [PATCH 2/2] fix tests --- lib/cluster/index.ts | 4 +++- test/functional/cluster/index.js | 14 ++++++++++---- test/functional/cluster/moved.js | 6 +++++- test/functional/cluster/pub_sub.js | 2 +- test/functional/cluster/quit.js | 6 +++--- test/helpers/mock_server.js | 2 +- 6 files changed, 23 insertions(+), 11 deletions(-) diff --git a/lib/cluster/index.ts b/lib/cluster/index.ts index 165f122b..f0c6f515 100644 --- a/lib/cluster/index.ts +++ b/lib/cluster/index.ts @@ -479,6 +479,7 @@ class Cluster extends EventEmitter { } _this.connectionPool.findOrCreate(_this.natMapper(key)) tryConnection() + debug('refreshing slot caches... (triggered by MOVED error)') _this.refreshSlotsCache() }, ask: function (slot, key) { @@ -623,7 +624,8 @@ class Cluster extends EventEmitter { const duplicatedConnection = redis.duplicate({ enableOfflineQueue: true, enableReadyCheck: false, - retryStrategy: null + retryStrategy: null, + connectionName: 'ioredisClusterRefresher' }) duplicatedConnection.cluster('slots', timeout((err, result) => { duplicatedConnection.disconnect() diff --git a/test/functional/cluster/index.js b/test/functional/cluster/index.js index 86e23969..8c9084d1 100644 --- a/test/functional/cluster/index.js +++ b/test/functional/cluster/index.js @@ -104,7 +104,12 @@ describe('cluster', function () { return 0; } }); + let hasDone = false new MockServer(30002, function () { + if (hasDone) { + return + } + hasDone = true client.disconnect(); done(); }); @@ -248,10 +253,10 @@ describe('cluster', function () { }); describe('#nodes()', function () { - it.skip('should return the corrent nodes', function (done) { + it('should return the corrent nodes', function (done) { var slotTable = [ - [0, 5460, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], - [5461, 10922, ['127.0.0.1', 30002]] + [0, 16381, ['127.0.0.1', 30001], ['127.0.0.1', 30003]], + [16382, 16383, ['127.0.0.1', 30002]] ]; var node = new MockServer(30001, function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { @@ -271,7 +276,8 @@ describe('cluster', function () { }); var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); - cluster.on('ready', function () { + // Make sure 30001 has been connected + cluster.get('foo', function () { expect(cluster.nodes()).to.have.lengthOf(3); expect(cluster.nodes('all')).to.have.lengthOf(3); expect(cluster.nodes('master')).to.have.lengthOf(2); diff --git a/test/functional/cluster/moved.js b/test/functional/cluster/moved.js index 793b8759..1d5e1bb9 100644 --- a/test/functional/cluster/moved.js +++ b/test/functional/cluster/moved.js @@ -1,7 +1,7 @@ var calculateSlot = require('cluster-key-slot'); describe('cluster:MOVED', function () { - it.skip('should auto redirect the command to the correct nodes', function (done) { + it('should auto redirect the command to the correct nodes', function (done) { var cluster; var moved = false; var times = 0; @@ -30,6 +30,8 @@ describe('cluster:MOVED', function () { if (argv[0] === 'get' && argv[1] === 'foo') { expect(moved).to.eql(false); moved = true; + slotTable[0][1] = 16381 + slotTable[1][0] = 16382 return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001'); } }); @@ -103,6 +105,8 @@ describe('cluster:MOVED', function () { if (argv[0] === 'get' && argv[1] === 'foo') { expect(moved).to.eql(false); moved = true; + slotTable[0][1] = 16381 + slotTable[1][0] = 16382 return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001'); } }); diff --git a/test/functional/cluster/pub_sub.js b/test/functional/cluster/pub_sub.js index 2c4a373f..9ef5596d 100644 --- a/test/functional/cluster/pub_sub.js +++ b/test/functional/cluster/pub_sub.js @@ -1,5 +1,5 @@ describe('cluster:pub/sub', function () { - it.skip('should receive messages', function (done) { + it('should receive messages', function (done) { var handler = function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { return [ diff --git a/test/functional/cluster/quit.js b/test/functional/cluster/quit.js index fdc6d98e..983ca284 100644 --- a/test/functional/cluster/quit.js +++ b/test/functional/cluster/quit.js @@ -32,8 +32,8 @@ describe('cluster:quit', () => { it('failed when quit returns error', function (done) { const ERROR_MESSAGE = 'quit random error' const slotTable = [ - [0, 1000, ['127.0.0.1', 30001]], - [1001, 16383, ['127.0.0.1', 30002]] + [0, 16381, ['127.0.0.1', 30001]], + [16382, 16383, ['127.0.0.1', 30002]] ] new MockServer(30001, function (argv, c) { if (argv[0] === 'quit') { @@ -49,7 +49,7 @@ describe('cluster:quit', () => { const cluster = new Redis.Cluster([ { host: '127.0.0.1', port: '30001' } ]) - cluster.on('ready', () => { + cluster.get('foo', () => { cluster.quit((err) => { expect(err.message).to.eql(ERROR_MESSAGE) cluster.disconnect() diff --git a/test/helpers/mock_server.js b/test/helpers/mock_server.js index 7653496a..a237e87f 100644 --- a/test/helpers/mock_server.js +++ b/test/helpers/mock_server.js @@ -130,7 +130,7 @@ MockServer.prototype.write = function (c, data) { MockServer.prototype.findClientByName = function (name) { for (const client of this.clients) { - if (client._connectionName === name) { + if (client && client._connectionName === name) { return client } }