Skip to content

Commit

Permalink
Merge pull request #112 from luin/cluster/auto-resubscribe
Browse files Browse the repository at this point in the history
Auto-resubscribe channels in cluster mode. Close #110
  • Loading branch information
luin committed Jul 26, 2015
2 parents 53ccb38 + 687d3f3 commit 05a0d12
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 25 deletions.
6 changes: 4 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[![Dependency Status](https://david-dm.org/luin/ioredis.svg)](https://david-dm.org/luin/ioredis)
[![Join the chat at https://gitter.im/luin/ioredis](https://badges.gitter.im/Join%20Chat.svg)](https://gitter.im/luin/ioredis?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge)

A delightful, performance-focused Redis client for Node and io.js
A robust, performance-focused and full-featured Redis client for Node and io.js.

Support Redis >= 2.6.12 and (Node.js >= 0.10.16 or io.js).

Expand Down Expand Up @@ -502,6 +502,8 @@ close | client will emit `close` when an established Redis server connection
reconnecting | client will emit `reconnecting` after `close` when a reconnection would be made. The argument of the event is the time(ms) before reconnecting.
end | client will emit `end` after `close` when no more reconnections would be made.

You can also check out the `Redis#status` property to get the current connection status.

## Offline Queue
When a command can't be processed by Redis(being sent before `ready` event), by default it's added to the offline queue and will be
executed when it can be processed. You can disable this feature by set `enableOfflineQueue`
Expand All @@ -515,7 +517,7 @@ var redis = new Redis({ enableOfflineQueue: false });

## Sentinel
ioredis supports Sentinel out of the box. It works transparently as all features that work when
you connect to a single node also work when you connect to a sentinel group. Make sure to run Redis 2.8+ if you want to use this feature.
you connect to a single node also work when you connect to a sentinel group. Make sure to run Redis >= 2.8.12 if you want to use this feature.

To connect using Sentinel, use:

Expand Down
45 changes: 36 additions & 9 deletions lib/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,47 @@ Cluster.prototype.selectSubscriber = function () {
if (this.subscriber === null) {
return;
}
if (this.subscriber.status === 'wait') {
this.subscriber.connect().catch(noop);
// Re-subscribe previous channels
var previousChannels = { subscribe: [], psubscribe: [] };
if (this.lastActiveSubscriber && this.lastActiveSubscriber.prevCondition) {
var subscriber = this.lastActiveSubscriber.prevCondition.subscriber;
if (subscriber) {
previousChannels.subscribe = subscriber.channels('subscribe');
previousChannels.psubscribe = subscriber.channels('psubscribe');
}
}
var _this = this;
['message', 'messageBuffer'].forEach(function (event) {
_this.subscriber.on(event, function (arg1, arg2) {
if (previousChannels.subscribe.length || previousChannels.psubscribe.length) {
var pending = 0;
_.forEach(['subscribe', 'psubscribe'], function (type) {
var channels = previousChannels[type];
if (channels.length) {
pending += 1;
debug('%s %d channels', type, channels.length);
this.subscriber[type](channels).then(function () {
if (!--pending) {
this.lastActiveSubscriber = this.subscriber;
}
}.bind(this)).catch(noop);
}
}, this);
} else {
if (this.subscriber.status === 'wait') {
this.subscriber.connect().catch(noop);
}
this.lastActiveSubscriber = this.subscriber;
}
_.forEach(['message', 'messageBuffer'], function (event) {
var _this = this;
this.subscriber.on(event, function (arg1, arg2) {
_this.emit(event, arg1, arg2);
});
});
['pmessage', 'pmessageBuffer'].forEach(function (event) {
_this.subscriber.on(event, function (arg1, arg2, arg3) {
}, this);
_.forEach(['pmessage', 'pmessageBuffer'], function (event) {
var _this = this;
this.subscriber.on(event, function (arg1, arg2, arg3) {
_this.emit(event, arg1, arg2, arg3);
});
});
}, this);
};

Cluster.prototype.setStatus = function (status) {
Expand Down
78 changes: 64 additions & 14 deletions test/functional/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -753,24 +753,19 @@ describe('cluster', function () {

describe('pub/sub', function () {
it('should receive messages', function (done) {
var slotTable = [
[0, 1, ['127.0.0.1', 30001]],
[2, 16383, ['127.0.0.1', 30002]]
];
var node1 = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return slotTable;
}
});
var node2 = new MockServer(30002, function (argv) {
var handler = function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return slotTable;
return [
[0, 1, ['127.0.0.1', 30001]],
[2, 16383, ['127.0.0.1', 30002]]
];
}
});
};
var node1 = new MockServer(30001, handler);
var node2 = new MockServer(30002, handler);

var options = [ { host: '127.0.0.1', port: '30001' } ];
var sub = new Redis.Cluster(options);
var pub = new Redis.Cluster(options);

sub.subscribe('test cluster', function () {
node1.write(node1.clients[0], ['message', 'test channel', 'hi']);
Expand All @@ -779,10 +774,65 @@ describe('cluster', function () {
expect(channel).to.eql('test channel');
expect(message).to.eql('hi');
sub.disconnect();
pub.disconnect();
disconnect([node1, node2], done);
});
});

it('should re-subscribe after reconnection', function (done) {
var server = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return [
[0, 16383, ['127.0.0.1', 30001]]
];
} else if (argv[0] === 'subscribe' || argv[0] === 'psubscribe') {
return [argv[0], argv[1]];
}
});
var client = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);

client.subscribe('test cluster', function () {
var subscribe = Redis.prototype.subscribe;
stub(Redis.prototype, 'subscribe', function (channels) {
expect(channels).to.eql(['test cluster']);
Redis.prototype.subscribe.restore();
client.disconnect();
disconnect([server], done);
return Redis.prototype.subscribe.apply(this, arguments);
});
client.on('end', function () {
client.connect();
});
client.disconnect();
});
});

it('should re-psubscribe after reconnection', function (done) {
var server = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return [
[0, 16383, ['127.0.0.1', 30001]]
];
} else if (argv[0] === 'subscribe' || argv[0] === 'psubscribe') {
return [argv[0], argv[1]];
}
});
var client = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }]);

client.psubscribe('test?', function () {
var psubscribe = Redis.prototype.psubscribe;
stub(Redis.prototype, 'psubscribe', function (channels) {
expect(channels).to.eql(['test?']);
Redis.prototype.psubscribe.restore();
client.disconnect();
disconnect([server], done);
return Redis.prototype.psubscribe.apply(this, arguments);
});
client.on('end', function () {
client.connect();
});
client.disconnect();
});
});
});

describe('readonly', function() {
Expand Down

0 comments on commit 05a0d12

Please sign in to comment.