From 97c1de1f13bd00bb2be89b285713796f872a8f55 Mon Sep 17 00:00:00 2001 From: luin Date: Sat, 25 Jul 2015 23:29:07 +0800 Subject: [PATCH 1/5] Auto-resubscribe channels in cluster mode. --- lib/cluster.js | 29 +++++++++++++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 8f4463d3..2d494f0c 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -207,8 +207,33 @@ Cluster.prototype.selectSubscriber = function () { if (this.subscriber === null) { return; } - if (this.subscriber.status === 'wait') { - this.subscriber.connect().catch(noop); + if (this.lastActiveSubscriber && this.lastActiveSubscriber.prevCondition && this.lastActiveSubscriber.prevCondition.subscriber) { + var pending = 0; + var subscribeChannels = this.lastActiveSubscriber.prevCondition.subscriber.channels('subscribe'); + if (subscribeChannels.length) { + pending += 1; + debug('subscribe %d channels', subscribeChannels.length); + this.subscriber.subscribe(subscribeChannels).then(function () { + if (!--pending) { + this.lastActiveSubscriber = this.subscriber; + } + }.bind(this)).catch(noop); + } + var psubscribeChannels = this.lastActiveSubscriber.prevCondition.subscriber.channels('psubscribe'); + if (psubscribeChannels.length) { + pending += 1; + debug('psubscribe %d channels', psubscribeChannels.length); + this.subscriber.psubscribe(psubscribeChannels).then(function () { + if (!--pending) { + this.lastActiveSubscriber = this.subscriber; + } + }.bind(this)).catch(noop); + } + } else { + if (this.subscriber.status === 'wait') { + this.subscriber.connect().catch(noop); + } + this.lastActiveSubscriber = this.subscriber; } var _this = this; ['message', 'messageBuffer'].forEach(function (event) { From 145d8585218718098e826a31a7380d2212baa478 Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 26 Jul 2015 11:42:01 +0800 Subject: [PATCH 2/5] Refactor the code --- lib/cluster.js | 47 ++++++++++++++++++++++++----------------------- 1 file changed, 24 insertions(+), 23 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 2d494f0c..7d4b3972 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -207,28 +207,29 @@ Cluster.prototype.selectSubscriber = function () { if (this.subscriber === null) { return; } - if (this.lastActiveSubscriber && this.lastActiveSubscriber.prevCondition && this.lastActiveSubscriber.prevCondition.subscriber) { - var pending = 0; - var subscribeChannels = this.lastActiveSubscriber.prevCondition.subscriber.channels('subscribe'); - if (subscribeChannels.length) { - pending += 1; - debug('subscribe %d channels', subscribeChannels.length); - this.subscriber.subscribe(subscribeChannels).then(function () { - if (!--pending) { - this.lastActiveSubscriber = this.subscriber; - } - }.bind(this)).catch(noop); - } - var psubscribeChannels = this.lastActiveSubscriber.prevCondition.subscriber.channels('psubscribe'); - if (psubscribeChannels.length) { - pending += 1; - debug('psubscribe %d channels', psubscribeChannels.length); - this.subscriber.psubscribe(psubscribeChannels).then(function () { - if (!--pending) { - this.lastActiveSubscriber = this.subscriber; - } - }.bind(this)).catch(noop); + // Re-subscribe previous channels + var previousChannels = { subscribe: [], psubscribe: [] }; + if (this.lastActiveSubscriber && this.lastActiveSubscriber.prevCondition) { + var subscriber = this.lastActiveSubscriber.prevCondition.subscriber; + if (subscriber) { + previousChannels.subscribe = subscriber.channels('subscribe'); + previousChannels.psubscribe = subscriber.channels('psubscribe'); } + } + if (previousChannels.subscribe.length || previousChannels.psubscribe.length) { + var pending = 0; + _.forEach(['subscribe', 'psubscribe'], function (type) { + var channels = previousChannels[type]; + if (channels.length) { + pending += 1; + debug('%s %d channels', type, channels.length); + this.subscriber[type](channels).then(function () { + if (!--pending) { + this.lastActiveSubscriber = this.subscriber; + } + }.bind(this)).catch(noop); + } + }, this); } else { if (this.subscriber.status === 'wait') { this.subscriber.connect().catch(noop); @@ -236,12 +237,12 @@ Cluster.prototype.selectSubscriber = function () { this.lastActiveSubscriber = this.subscriber; } var _this = this; - ['message', 'messageBuffer'].forEach(function (event) { + _.forEach(['message', 'messageBuffer'], function (event) { _this.subscriber.on(event, function (arg1, arg2) { _this.emit(event, arg1, arg2); }); }); - ['pmessage', 'pmessageBuffer'].forEach(function (event) { + _.forEach(['pmessage', 'pmessageBuffer'], function (event) { _this.subscriber.on(event, function (arg1, arg2, arg3) { _this.emit(event, arg1, arg2, arg3); }); From dc044d63548989d4782ab46c67f2f34fbbd0c289 Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 26 Jul 2015 11:43:55 +0800 Subject: [PATCH 3/5] Better performance --- lib/cluster.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/lib/cluster.js b/lib/cluster.js index 7d4b3972..23058518 100644 --- a/lib/cluster.js +++ b/lib/cluster.js @@ -236,17 +236,18 @@ Cluster.prototype.selectSubscriber = function () { } this.lastActiveSubscriber = this.subscriber; } - var _this = this; _.forEach(['message', 'messageBuffer'], function (event) { - _this.subscriber.on(event, function (arg1, arg2) { + var _this = this; + this.subscriber.on(event, function (arg1, arg2) { _this.emit(event, arg1, arg2); }); - }); + }, this); _.forEach(['pmessage', 'pmessageBuffer'], function (event) { - _this.subscriber.on(event, function (arg1, arg2, arg3) { + var _this = this; + this.subscriber.on(event, function (arg1, arg2, arg3) { _this.emit(event, arg1, arg2, arg3); }); - }); + }, this); }; Cluster.prototype.setStatus = function (status) { From 15f50e4168addfd88bf3c318293b5e7d7882c029 Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 26 Jul 2015 11:49:55 +0800 Subject: [PATCH 4/5] Update README --- README.md | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index cf712461..6ad7b51a 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ [![Dependency Status](https://david-dm.org/luin/ioredis.svg)](https://david-dm.org/luin/ioredis) [![Join the chat at https://gitter.im/luin/ioredis](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/luin/ioredis?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) -A delightful, performance-focused Redis client for Node and io.js +A robust, performance-focused and full-featured Redis client for Node and io.js. Support Redis >= 2.6.12 and (Node.js >= 0.10.16 or io.js). @@ -502,6 +502,8 @@ close | client will emit `close` when an established Redis server connection reconnecting | client will emit `reconnecting` after `close` when a reconnection would be made. The argument of the event is the time(ms) before reconnecting. end | client will emit `end` after `close` when no more reconnections would be made. +You can also check out the `Redis#status` property to get the current connection status. + ## Offline Queue When a command can't be processed by Redis(being sent before `ready` event), by default it's added to the offline queue and will be executed when it can be processed. You can disable this feature by set `enableOfflineQueue` @@ -515,7 +517,7 @@ var redis = new Redis({ enableOfflineQueue: false }); ## Sentinel ioredis supports Sentinel out of the box. It works transparently as all features that work when -you connect to a single node also work when you connect to a sentinel group. Make sure to run Redis 2.8+ if you want to use this feature. +you connect to a single node also work when you connect to a sentinel group. Make sure to run Redis >= 2.8.12 if you want to use this feature. To connect using Sentinel, use: From 687d3f3f07569f6384b43893d87dbda560e4afcf Mon Sep 17 00:00:00 2001 From: luin Date: Sun, 26 Jul 2015 12:35:47 +0800 Subject: [PATCH 5/5] Add tests for re-subscribing --- test/functional/cluster.js | 78 +++++++++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/test/functional/cluster.js b/test/functional/cluster.js index a6045387..57e63c53 100644 --- a/test/functional/cluster.js +++ b/test/functional/cluster.js @@ -753,24 +753,19 @@ describe('cluster', function () { describe('pub/sub', function () { it('should receive messages', function (done) { - var slotTable = [ - [0, 1, ['127.0.0.1', 30001]], - [2, 16383, ['127.0.0.1', 30002]] - ]; - var node1 = new MockServer(30001, function (argv) { - if (argv[0] === 'cluster' && argv[1] === 'slots') { - return slotTable; - } - }); - var node2 = new MockServer(30002, function (argv) { + var handler = function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { - return slotTable; + return [ + [0, 1, ['127.0.0.1', 30001]], + [2, 16383, ['127.0.0.1', 30002]] + ]; } - }); + }; + var node1 = new MockServer(30001, handler); + var node2 = new MockServer(30002, handler); var options = [ { host: '127.0.0.1', port: '30001' } ]; var sub = new Redis.Cluster(options); - var pub = new Redis.Cluster(options); sub.subscribe('test cluster', function () { node1.write(node1.clients[0], ['message', 'test channel', 'hi']); @@ -779,10 +774,65 @@ describe('cluster', function () { expect(channel).to.eql('test channel'); expect(message).to.eql('hi'); sub.disconnect(); - pub.disconnect(); disconnect([node1, node2], done); }); }); + + it('should re-subscribe after reconnection', function (done) { + var server = new MockServer(30001, function (argv) { + if (argv[0] === 'cluster' && argv[1] === 'slots') { + return [ + [0, 16383, ['127.0.0.1', 30001]] + ]; + } else if (argv[0] === 'subscribe' || argv[0] === 'psubscribe') { + return [argv[0], argv[1]]; + } + }); + var client = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); + + client.subscribe('test cluster', function () { + var subscribe = Redis.prototype.subscribe; + stub(Redis.prototype, 'subscribe', function (channels) { + expect(channels).to.eql(['test cluster']); + Redis.prototype.subscribe.restore(); + client.disconnect(); + disconnect([server], done); + return Redis.prototype.subscribe.apply(this, arguments); + }); + client.on('end', function () { + client.connect(); + }); + client.disconnect(); + }); + }); + + it('should re-psubscribe after reconnection', function (done) { + var server = new MockServer(30001, function (argv) { + if (argv[0] === 'cluster' && argv[1] === 'slots') { + return [ + [0, 16383, ['127.0.0.1', 30001]] + ]; + } else if (argv[0] === 'subscribe' || argv[0] === 'psubscribe') { + return [argv[0], argv[1]]; + } + }); + var client = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]); + + client.psubscribe('test?', function () { + var psubscribe = Redis.prototype.psubscribe; + stub(Redis.prototype, 'psubscribe', function (channels) { + expect(channels).to.eql(['test?']); + Redis.prototype.psubscribe.restore(); + client.disconnect(); + disconnect([server], done); + return Redis.prototype.psubscribe.apply(this, arguments); + }); + client.on('end', function () { + client.connect(); + }); + client.disconnect(); + }); + }); }); describe('readonly', function() {