Skip to content

Commit

Permalink
feat(cluster): support update startupNodes in clusterRetryStrategy
Browse files Browse the repository at this point in the history
  • Loading branch information
luin committed Mar 5, 2016
1 parent 4d04fc2 commit 4a46766
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 42 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,14 @@ but a few so that if one is unreachable the client will try the next one, and th
return delay;
}
```
It' possible to modify the `startupNodes` property in order to switch to another set of nodes here:

```javascript
function (times) {
this.startupNodes = [{ port: 6790, host: '127.0.0.1' }];
return Math.min(100 + times * 2, 2000);
}
```

* `enableOfflineQueue`: Similar to the `enableOfflineQueue` option of `Redis` class.
* `enableReadyCheck`: When enabled, "ready" event will only be emitted when `CLUSTER INFO` command
Expand Down
28 changes: 22 additions & 6 deletions lib/cluster/connection_pool.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var util = require('util');
var utils = require('../utils');
var EventEmitter = require('events').EventEmitter;
var _ = require('lodash');
var Redis = require('../redis');
Expand Down Expand Up @@ -77,7 +78,7 @@ ConnectionPool.prototype.findOrCreate = function (node, readOnly) {
this.emit('+node', redis);
}

return this.nodes.all[node.key];
return redis;
};

/**
Expand All @@ -89,11 +90,26 @@ ConnectionPool.prototype.findOrCreate = function (node, readOnly) {
*/
ConnectionPool.prototype.reset = function (nodes) {
var newNodes = {};
for (var i = 0; i < nodes.length; i++) {
var node = nodes[i];
node.key = node.host + ':' + node.port;
newNodes[node.key] = node;
}
nodes.forEach(function (node) {
var options = {};
if (typeof node === 'object') {
_.defaults(options, node);
} else if (typeof node === 'string') {
_.defaults(options, utils.parseURL(node));
} else if (typeof node === 'number') {
options.port = node;
} else {
throw new Error('Invalid argument ' + node);
}
if (typeof options.port === 'string') {
options.port = parseInt(options.port, 10);
}
delete options.db;

options.key = options.host + ':' + options.port;
newNodes[options.key] = options;
});

var _this = this;
Object.keys(this.nodes.all).forEach(function (key) {
if (!newNodes[key]) {
Expand Down
28 changes: 6 additions & 22 deletions lib/cluster/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,28 +49,8 @@ function Cluster(startupNodes, options) {
'". Expected "all", "master", "slave" or a custom function');
}

if (!Array.isArray(startupNodes) || startupNodes.length === 0) {
throw new Error('`startupNodes` should contain at least one node.');
}

this.connectionPool = new ConnectionPool(this.options.redisOptions);
this.startupNodes = startupNodes.map(function (node) {
var options = {};
if (typeof node === 'object') {
_.defaults(options, node);
} else if (typeof node === 'string') {
_.defaults(options, utils.parseURL(node));
} else if (typeof node === 'number') {
options.port = node;
} else {
throw new Error('Invalid argument ' + node);
}
if (typeof options.port === 'string') {
options.port = parseInt(options.port, 10);
}
delete options.db;
return options;
});
this.startupNodes = startupNodes;

var _this = this;
this.connectionPool.on('-node', function (redis) {
Expand Down Expand Up @@ -151,6 +131,10 @@ Cluster.prototype.connect = function () {
}
this.setStatus('connecting');

if (!Array.isArray(this.startupNodes) || this.startupNodes.length === 0) {
throw new Error('`startupNodes` should contain at least one node.');
}

this.connectionPool.reset(this.startupNodes);

var closeListener;
Expand Down Expand Up @@ -183,7 +167,7 @@ Cluster.prototype.connect = function () {
this.once('close', function () {
var retryDelay;
if (!this.manuallyClosing && typeof this.options.clusterRetryStrategy === 'function') {
retryDelay = this.options.clusterRetryStrategy(++this.retryAttempts);
retryDelay = this.options.clusterRetryStrategy.call(this, ++this.retryAttempts);
}
if (typeof retryDelay === 'number') {
this.setStatus('reconnecting');
Expand Down
14 changes: 0 additions & 14 deletions test/unit/cluster.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,6 @@ describe('cluster', function () {
expect(cluster.options).to.have.property('scaleReads', 'master');
});

it('should throw when startupNodes is not an array or is empty', function () {
expect(function () {
new Cluster();
}).to.throw(/startupNodes/);

expect(function () {
new Cluster([]);
}).to.throw(/startupNodes/);

expect(function () {
new Cluster([{}]);
}).to.not.throw(/startupNodes/);
});

describe('#executeFailoverCommands', function () {
it('should execute the commands', function (done) {
var cluster = {
Expand Down

0 comments on commit 4a46766

Please sign in to comment.