Skip to content

Commit

Permalink
feat(cluster): add enableReadyCheck option for cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Mar 5, 2016
1 parent 433a2d0 commit b63cdc7
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 13 deletions.
2 changes: 2 additions & 0 deletions API.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,13 @@ Creates a Redis Cluster instance
| startupNodes | <code>Array.&lt;Object&gt;</code> | | An array of nodes in the cluster, [{ port: number, host: string }] |
| options | <code>Object</code> | | |
| [options.enableOfflineQueue] | <code>boolean</code> | <code>true</code> | See Redis class |
| [options.enableReadyCheck] | <code>boolean</code> | <code>true</code> | When enabled, ioredis only emits "ready" event when `CLUSTER INFO` command reporting the cluster is ready for handling commands. |
| [options.scaleReads] | <code>string</code> | <code>&quot;master&quot;</code> | Scale reads to the node with the specified role. Available values are "master", "slave" and "all". |
| [options.maxRedirections] | <code>number</code> | <code>16</code> | When a MOVED or ASK error is received, client will redirect the command to another node. This option limits the max redirections allowed to send a command. |
| [options.clusterRetryStrategy] | <code>function</code> | | See "Quick Start" section |
| [options.retryDelayOnFailover] | <code>number</code> | <code>100</code> | When an error is received when sending a command(e.g. "Connection is closed." when the target Redis node is down), |
| [options.retryDelayOnClusterDown] | <code>number</code> | <code>100</code> | When a CLUSTERDOWN error is received, client will retry if `retryDelayOnClusterDown` is valid delay time. |
| [options.redisOptions] | <code>Object</code> | | Passed to the constructor of `Redis`. |

<a name="Cluster+connect"></a>
### cluster.connect() ⇒ <code>Promise</code>
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -751,7 +751,7 @@ sub.subscribe('news', function () {
Event | Description
:------------- | :-------------
connect | emits when a connection is established to the Redis server.
ready | emits immediately after `connect` event.
ready | emits when `CLUSTER INFO` reporting the cluster is able to receive commands (if `enableReadyCheck` is `true`) or immediately after `connect` event (if `enableReadyCheck` is false).
error | emits when an error occurs while connecting with a property of `lastNodeError` representing the last node error received. This event is emitted silently (only emitting if there's at least one listener).
close | emits when an established Redis server connection has closed.
reconnecting | emits after `close` when a reconnection will be made. The argument of the event is the time (in ms) before reconnecting.
Expand Down
64 changes: 58 additions & 6 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ var ConnectionPool = require('./connection_pool');
* @param {Object[]} startupNodes - An array of nodes in the cluster, [{ port: number, host: string }]
* @param {Object} options
* @param {boolean} [options.enableOfflineQueue=true] - See Redis class
* @param {boolean} [options.enableReadyCheck=true] - When enabled, ioredis only emits "ready" event when `CLUSTER INFO`
* command reporting the cluster is ready for handling commands.
* @param {string} [options.scaleReads=master] - Scale reads to the node with the specified role.
* Available values are "master", "slave" and "all".
* @param {number} [options.maxRedirections=16] - When a MOVED or ASK error is received, client will redirect the
Expand All @@ -30,6 +32,7 @@ var ConnectionPool = require('./connection_pool');
* "Connection is closed." when the target Redis node is down),
* @param {number} [options.retryDelayOnClusterDown=100] - When a CLUSTERDOWN error is received, client will retry
* if `retryDelayOnClusterDown` is valid delay time.
* @param {Object} [options.redisOptions] - Passed to the constructor of `Redis`.
* @extends [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
* @extends Commander
*/
Expand Down Expand Up @@ -103,10 +106,11 @@ function Cluster(startupNodes, options) {
*/
Cluster.defaultOptions = {
maxRedirections: 16,
enableOfflineQueue: true,
enableReadyCheck: true,
retryDelayOnFailover: 100,
retryDelayOnClusterDown: 100,
scaleReads: 'master',
enableOfflineQueue: true,
clusterRetryStrategy: function (times) {
return Math.min(100 + times * 2, 2000);
}
Expand Down Expand Up @@ -134,6 +138,12 @@ Cluster.prototype.resetClusterDownQueue = function () {
* @public
*/
Cluster.prototype.connect = function () {
function readyHandler() {
this.setStatus('ready');
this.retryAttempts = 0;
this.executeOfflineCommands();
}

return new Promise(function (resolve, reject) {
if (this.status === 'connecting' || this.status === 'connect' || this.status === 'ready') {
reject(new Error('Redis is already connecting/connected'));
Expand All @@ -146,11 +156,19 @@ Cluster.prototype.connect = function () {
var closeListener;
var refreshListener = function () {
this.removeListener('close', closeListener);
this.retryAttempts = 0;
this.manuallyClosing = false;
this.setStatus('connect');
this.setStatus('ready');
this.executeOfflineCommands();
if (this.options.enableReadyCheck) {
this._readyCheck(function (err, fail) {
if (err || fail) {
this.disconnect(true);
} else {
readyHandler.call(this);
}
}.bind(this));
} else {
readyHandler.call(this);
}
resolve();
};

Expand Down Expand Up @@ -444,7 +462,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
return;
}
var redis;
if (_this.status === 'ready') {
if (_this.status === 'ready' || (command.name === 'cluster')) {
if (node && node.redis) {
redis = node.redis;
} else if (_.includes(Command.FLAGS.ENTER_SUBSCRIBER_MODE, command.name) ||
Expand All @@ -457,7 +475,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) {
if (typeof to === 'function') {
var nodes =
nodeKeys
.map(function(key) {
.map(function (key) {
return _this.connectionPool.nodes.all[key];
});
redis = to(nodes, command);
Expand Down Expand Up @@ -582,6 +600,40 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) {
}, 1000));
};

/**
* Check whether Cluster is able to process commands
*
* @param {Function} callback
* @private
*/
Cluster.prototype._readyCheck = function (callback) {
this.cluster('info', function (err, res) {
if (err) {
return callback(err);
}
if (typeof res !== 'string') {
return callback();
}

var state;
var lines = res.split('\r\n');
for (var i = 0; i < lines.length; ++i) {
var parts = lines[i].split(':');
if (parts[0] === 'cluster_state') {
state = parts[1];
break;
}
}

if (state === 'fail') {
debug('cluster state not ok (%s)', state);
callback(null, state);
} else {
callback();
}
});
};

['sscan', 'hscan', 'zscan', 'sscanBuffer', 'hscanBuffer', 'zscanBuffer']
.forEach(function (command) {
Cluster.prototype[command + 'Stream'] = function (key, options) {
Expand Down
41 changes: 35 additions & 6 deletions test/functional/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,37 @@ describe('cluster', function () {
});
});

describe('enableReadyCheck', function () {
it('should reconnect when cluster state is not ok', function (done) {
var state = 'fail';
var server = new MockServer(30001, function (argv) {
if (argv[0] === 'cluster' && argv[1] === 'slots') {
return [
[0, 16383, ['127.0.0.1', 30001]]
];
} else if (argv[0] === 'cluster' && argv[1] === 'info') {
return 'cluster_state:' + state;
}
});
var count = 0;
var client = new Redis.Cluster([{
host: '127.0.0.1', port: '30001'
}], {
clusterRetryStrategy: function (times) {
expect(++count).to.eql(times);
if (count === 3) {
state = 'ok';
}
return 0;
}
});
client.on('ready', function () {
client.disconnect();
disconnect([server], done);
});
});
});

describe('scaleReads', function () {
beforeEach(function () {
function handler(port, argv) {
Expand Down Expand Up @@ -1010,12 +1041,11 @@ describe('cluster', function () {
context('custom', function () {
it('should send to selected slave', function (done) {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
scaleReads: function(node, command) {
scaleReads: function (node, command) {
if (command.name === 'get') {
return node[1];
} else {
return node[2];
}
return node[2];
}
});
cluster.on('ready', function () {
Expand All @@ -1035,12 +1065,11 @@ describe('cluster', function () {

it('should send writes to masters', function (done) {
var cluster = new Redis.Cluster([{ host: '127.0.0.1', port: '30001' }], {
scaleReads: function(node, command) {
scaleReads: function (node, command) {
if (command.name === 'get') {
return node[1];
} else {
return node[2];
}
return node[2];
}
});
cluster.on('ready', function () {
Expand Down

0 comments on commit b63cdc7

Please sign in to comment.