diff --git a/lib/cluster/ClusterSubscriber.ts b/lib/cluster/ClusterSubscriber.ts new file mode 100644 index 00000000..eadb4d1e --- /dev/null +++ b/lib/cluster/ClusterSubscriber.ts @@ -0,0 +1,122 @@ +import {EventEmitter} from 'events' +import ConnectionPool from './ConnectionPool' +import {sample, noop} from '../utils/lodash' +import {getNodeKey} from './util' + +const Redis = require('../redis') +const debug = require('../utils/debug')('ioredis:cluster:subscriber') + +const SUBSCRIBER_CONNECTION_NAME = 'ioredisClusterSubscriber' + +export default class ClusterSubscriber { + private started: boolean = false + private subscriber: any = null + private lastActiveSubscriber: any + + constructor (private connectionPool: ConnectionPool, private emitter: EventEmitter) { + this.connectionPool.on('-node', (_, key: string) => { + if (!this.started || !this.subscriber) { + return + } + if (getNodeKey(this.subscriber.options) === key) { + debug('subscriber has left, selecting a new one...') + this.selectSubscriber() + } + }) + this.connectionPool.on('+node', () => { + if (!this.started || this.subscriber) { + return + } + debug('a new node is discovered and there is no subscriber, selecting a new one...') + this.selectSubscriber() + }) + } + + getInstance (): any { + return this.subscriber + } + + private selectSubscriber () { + const lastActiveSubscriber = this.lastActiveSubscriber + + // Disconnect the previous subscriber even if there + // will not be a new one. + if (lastActiveSubscriber) { + lastActiveSubscriber.disconnect() + } + + const sampleNode = sample(this.connectionPool.getNodes()) + if (!sampleNode) { + debug('selecting subscriber failed since there is no node discovered in the cluster yet') + this.subscriber = null + return + } + + const {port, host} = sampleNode.options + debug('selected a subscriber %s:%s', host, port) + + // Create a specialized Redis connection for the subscription. + // Note that auto reconnection is enabled here. + // `enableReadyCheck` is disabled because subscription is allowed + // when redis is loading data from the disk. + this.subscriber = new Redis({ + port, + host, + enableReadyCheck: false, + connectionName: SUBSCRIBER_CONNECTION_NAME, + lazyConnect: true + }) + + // Re-subscribe previous channels + var previousChannels = { subscribe: [], psubscribe: [] } + if (lastActiveSubscriber) { + const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition + if (condition && condition.subscriber) { + previousChannels.subscribe = condition.subscriber.channels('subscribe') + previousChannels.psubscribe = condition.subscriber.channels('psubscribe') + } + } + if (previousChannels.subscribe.length || previousChannels.psubscribe.length) { + var pending = 0 + for (const type of ['subscribe', 'psubscribe']) { + var channels = previousChannels[type] + if (channels.length) { + pending += 1 + debug('%s %d channels', type, channels.length) + this.subscriber[type](channels).then(() => { + if (!--pending) { + this.lastActiveSubscriber = this.subscriber + } + }).catch(noop) + } + } + } else { + this.lastActiveSubscriber = this.subscriber + } + for (const event of ['message', 'messageBuffer']) { + this.subscriber.on(event, (arg1, arg2) => { + this.emitter.emit(event, arg1, arg2) + }) + } + for (const event of ['pmessage', 'pmessageBuffer']) { + this.subscriber.on(event, (arg1, arg2, arg3) => { + this.emitter.emit(event, arg1, arg2, arg3) + }) + } + } + + start (): void { + this.started = true + this.selectSubscriber() + debug('started') + } + + stop (): void { + this.started = false + if (this.subscriber) { + this.subscriber.disconnect() + this.subscriber = null + } + debug('stopped') + } +} diff --git a/lib/cluster/ConnectionPool.ts b/lib/cluster/ConnectionPool.ts index ebc035c0..e6b98e09 100644 --- a/lib/cluster/ConnectionPool.ts +++ b/lib/cluster/ConnectionPool.ts @@ -1,20 +1,13 @@ import {parseURL} from '../utils' import {EventEmitter} from 'events' -import {noop, defaults} from '../utils/lodash' +import {noop, defaults, values} from '../utils/lodash' +import {IRedisOptions, getNodeKey} from './util' const Redis = require('../redis') const debug = require('../utils/debug')('ioredis:cluster:connectionPool') type NODE_TYPE = 'all' | 'master' | 'slave' -interface IRedisOptions { - [key: string]: any -} - -interface IRedisOptionsWithKey extends IRedisOptions { - key: string -} - export default class ConnectionPool extends EventEmitter { // master + slave = all private nodes: {[key in NODE_TYPE]: {[key: string]: any}} = { @@ -29,6 +22,10 @@ export default class ConnectionPool extends EventEmitter { super() } + public getNodes(role: 'all' | 'master' | 'slave' = 'all'): any[] { + return values(this.nodes[role]) + } + /** * Find or create a connection to the node * @@ -37,33 +34,34 @@ export default class ConnectionPool extends EventEmitter { * @returns {*} * @memberof ConnectionPool */ - public findOrCreate (node: IRedisOptions, readOnly: boolean = false): any { - setKey(node) + public findOrCreate(node: IRedisOptions, readOnly: boolean = false): any { + fillDefaultOptions(node) + const key = getNodeKey(node) readOnly = Boolean(readOnly) - if (this.specifiedOptions[node.key]) { - Object.assign(node, this.specifiedOptions[node.key]) + if (this.specifiedOptions[key]) { + Object.assign(node, this.specifiedOptions[key]) } else { - this.specifiedOptions[node.key] = node + this.specifiedOptions[key] = node } let redis - if (this.nodes.all[node.key]) { - redis = this.nodes.all[node.key] + if (this.nodes.all[key]) { + redis = this.nodes.all[key] if (redis.options.readOnly !== readOnly) { redis.options.readOnly = readOnly - debug('Change role of %s to %s', node.key, readOnly ? 'slave' : 'master') + debug('Change role of %s to %s', key, readOnly ? 'slave' : 'master') redis[readOnly ? 'readonly' : 'readwrite']().catch(noop) if (readOnly) { - delete this.nodes.master[node.key] - this.nodes.slave[node.key] = redis + delete this.nodes.master[key] + this.nodes.slave[key] = redis } else { - delete this.nodes.slave[node.key] - this.nodes.master[node.key] = redis + delete this.nodes.slave[key] + this.nodes.master[key] = redis } } } else { - debug('Connecting to %s as %s', node.key, readOnly ? 'slave' : 'master') + debug('Connecting to %s as %s', key, readOnly ? 'slave' : 'master') redis = new Redis(defaults({ // Never try to reconnect when a node is lose, // instead, waiting for a `MOVED` error and @@ -75,23 +73,23 @@ export default class ConnectionPool extends EventEmitter { enableOfflineQueue: true, readOnly: readOnly }, node, this.redisOptions, { lazyConnect: true })) - this.nodes.all[node.key] = redis - this.nodes[readOnly ? 'slave' : 'master'][node.key] = redis + this.nodes.all[key] = redis + this.nodes[readOnly ? 'slave' : 'master'][key] = redis redis.once('end', () => { - delete this.nodes.all[node.key] - delete this.nodes.master[node.key] - delete this.nodes.slave[node.key] - this.emit('-node', redis) + delete this.nodes.all[key] + delete this.nodes.master[key] + delete this.nodes.slave[key] + this.emit('-node', redis, key) if (!Object.keys(this.nodes.all).length) { this.emit('drain') } }) - this.emit('+node', redis) + this.emit('+node', redis, key) redis.on('error', function (error) { - this.emit('nodeError', error) + this.emit('nodeError', error, key) }) } @@ -105,14 +103,15 @@ export default class ConnectionPool extends EventEmitter { * @param {(Array)} nodes * @memberof ConnectionPool */ - public reset (nodes: Array): void { + public reset(nodes: Array): void { + debug('Reset with %O', nodes); const newNodes = {} nodes.forEach((node) => { - const options: {port?: number | string, db?: number, key?: string} = {} + const options: IRedisOptions = {} if (typeof node === 'object') { - defaults(options, node) + Object.assign(options, node) } else if (typeof node === 'string') { - defaults(options, parseURL(node)) + Object.assign(options, parseURL(node)) } else if (typeof node === 'number') { options.port = node } else { @@ -123,8 +122,8 @@ export default class ConnectionPool extends EventEmitter { } delete options.db - setKey(options) - newNodes[options.key] = options + fillDefaultOptions(options) + newNodes[getNodeKey(options)] = options }, this) Object.keys(this.nodes.all).forEach((key) => { @@ -140,15 +139,7 @@ export default class ConnectionPool extends EventEmitter { } } -/** - * Set key property - * - * @private - */ -function setKey(node: IRedisOptions): IRedisOptionsWithKey { - node = node || {} +function fillDefaultOptions(node: IRedisOptions): void { node.port = node.port || 6379 node.host = node.host || '127.0.0.1' - node.key = node.key || node.host + ':' + node.port - return node } diff --git a/lib/cluster/index.js b/lib/cluster/index.js index c7f7e590..8b4a471b 100644 --- a/lib/cluster/index.js +++ b/lib/cluster/index.js @@ -13,8 +13,10 @@ var Command = require('../command'); var commands = require('redis-commands'); var asCallback = require('standard-as-callback'); var ConnectionPool = require('./ConnectionPool').default; +var ClusterSubscriber = require('./ClusterSubscriber').default; var DelayQueue = require('./DelayQueue').default; var PromiseContainer = require('../promiseContainer'); +var {AbortError} = require('redis-errors'); /** * Creates a Redis Cluster instance @@ -59,21 +61,17 @@ function Cluster(startupNodes, options) { this.connectionPool = new ConnectionPool(this.options.redisOptions); this.startupNodes = startupNodes; - var _this = this; - this.connectionPool.on('-node', function (redis) { - if (_this.status !== 'disconnecting' && _this.subscriber === redis) { - _this.selectSubscriber(); - } - _this.emit('-node', redis); + this.connectionPool.on('-node', (redis, key) => { + this.emit('-node', redis); }); - this.connectionPool.on('+node', function (redis) { - _this.emit('+node', redis); + this.connectionPool.on('+node', (redis) => { + this.emit('+node', redis); }); - this.connectionPool.on('drain', function () { - _this.setStatus('close'); + this.connectionPool.on('drain', () => { + this.setStatus('close'); }); - this.connectionPool.on('nodeError', function (error) { - _this.emit('node error', error); + this.connectionPool.on('nodeError', (error) => { + this.emit('node error', error); }); this.slots = []; @@ -82,12 +80,14 @@ function Cluster(startupNodes, options) { this.resetOfflineQueue(); this.delayQueue = new DelayQueue(); - this.subscriber = null; + this.subscriber = new ClusterSubscriber(this.connectionPool, this) if (this.options.lazyConnect) { this.setStatus('wait'); } else { - this.connect().catch(_.noop); + this.connect().catch((err) => { + debug('connecting failed: %s', err) + }); } } @@ -193,7 +193,7 @@ Cluster.prototype.connect = function () { this.connectionPool.reset([]); } }.bind(this)); - this.selectSubscriber(); + this.subscriber.start(); }.bind(this)); }; @@ -244,6 +244,7 @@ Cluster.prototype.disconnect = function (reconnect) { this.slotsTimer = null; } + this.subscriber.stop(); if (status === 'wait') { this.setStatus('close'); this._handleCloseEvent(); @@ -309,59 +310,7 @@ Cluster.prototype.nodes = function (role) { if (role !== 'all' && role !== 'master' && role !== 'slave') { throw new Error('Invalid role "' + role + '". Expected "all", "master" or "slave"'); } - return _.values(this.connectionPool.nodes[role]); -}; - -/** - * Select a subscriber from the cluster - * - * @private - */ -Cluster.prototype.selectSubscriber = function () { - this.subscriber = _.sample(this.nodes()); - if (!this.subscriber) { - return; - } - // Re-subscribe previous channels - var previousChannels = { subscribe: [], psubscribe: [] }; - if (this.lastActiveSubscriber && this.lastActiveSubscriber.prevCondition) { - var subscriber = this.lastActiveSubscriber.prevCondition.subscriber; - if (subscriber) { - previousChannels.subscribe = subscriber.channels('subscribe'); - previousChannels.psubscribe = subscriber.channels('psubscribe'); - } - } - var _this = this; - if (previousChannels.subscribe.length || previousChannels.psubscribe.length) { - var pending = 0; - _.forEach(['subscribe', 'psubscribe'], function (type) { - var channels = previousChannels[type]; - if (channels.length) { - pending += 1; - debug('%s %d channels', type, channels.length); - _this.subscriber[type](channels).then(function () { - if (!--pending) { - _this.lastActiveSubscriber = _this.subscriber; - } - }).catch(_.noop); - } - }); - } else { - if (this.subscriber.status === 'wait') { - this.subscriber.connect().catch(_.noop); - } - this.lastActiveSubscriber = this.subscriber; - } - _.forEach(['message', 'messageBuffer'], function (event) { - _this.subscriber.on(event, function (arg1, arg2) { - _this.emit(event, arg1, arg2); - }); - }); - _.forEach(['pmessage', 'pmessageBuffer'], function (event) { - _this.subscriber.on(event, function (arg1, arg2, arg3) { - _this.emit(event, arg1, arg2, arg3); - }); - }); + return this.connectionPool.getNodes(role) }; /** @@ -513,7 +462,7 @@ Cluster.prototype.sendCommand = function (command, stream, node) { function tryConnection(random, asking) { if (_this.status === 'end') { - command.reject(new Error('Cluster is ended.')); + command.reject(new AbortError('Cluster is ended.')); return; } var redis; @@ -522,7 +471,11 @@ Cluster.prototype.sendCommand = function (command, stream, node) { redis = node.redis; } else if (Command.checkFlag('ENTER_SUBSCRIBER_MODE', command.name) || Command.checkFlag('EXIT_SUBSCRIBER_MODE', command.name)) { - redis = _this.subscriber; + redis = _this.subscriber.getInstance(); + if (!redis) { + command.reject(new AbortError('No subscriber for the cluster')); + return; + } } else { if (!random) { if (typeof targetSlot === 'number' && _this.slots[targetSlot]) { @@ -621,14 +574,15 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) { if (!redis) { return callback(new Error('Node is disconnected')); } - var _this = this; - redis.cluster('slots', utils.timeout(function (err, result) { + redis.cluster('slots', utils.timeout((err, result) => { if (err) { redis.disconnect(); return callback(err); } var nodes = []; + debug('cluster slots result count: %d', result.length) + for (var i = 0; i < result.length; ++i) { var items = result[i]; var slotRangeStart = items[0]; @@ -642,12 +596,14 @@ Cluster.prototype.getInfoFromNode = function (redis, callback) { keys.push(items[j].host + ':' + items[j].port); } + debug('cluster slots result [%d]: slots %d~%d served by %s', i, slotRangeStart, slotRangeEnd, keys) + for (var slot = slotRangeStart; slot <= slotRangeEnd; slot++) { - _this.slots[slot] = keys; + this.slots[slot] = keys; } } - _this.connectionPool.reset(nodes); + this.connectionPool.reset(nodes); callback(); }, this.options.slotsRefreshTimeout)); }; diff --git a/lib/cluster/util.ts b/lib/cluster/util.ts new file mode 100644 index 00000000..9ae9e837 --- /dev/null +++ b/lib/cluster/util.ts @@ -0,0 +1,9 @@ +export interface IRedisOptions { + [key: string]: any +} + +export function getNodeKey(node: IRedisOptions): string { + node.port = node.port || 6379 + node.host = node.host || '127.0.0.1' + return node.host + ':' + node.port +} diff --git a/lib/errors/MaxRetriesPerRequestError.js b/lib/errors/MaxRetriesPerRequestError.js deleted file mode 100644 index 35ea3bf5..00000000 --- a/lib/errors/MaxRetriesPerRequestError.js +++ /dev/null @@ -1,15 +0,0 @@ -const {AbortError} = require('redis-errors') - -module.exports = class MaxRetriesPerRequestError extends AbortError { - constructor (maxRetriesPerRequest) { - var message = `Reached the max retries per request limit (which is ${maxRetriesPerRequest}). Refer to "maxRetriesPerRequest" option for details.`; - - super(message); - Error.captureStackTrace(this, this.constructor); - } - - get name () { - return this.constructor.name; - } -}; - diff --git a/lib/errors/MaxRetriesPerRequestError.ts b/lib/errors/MaxRetriesPerRequestError.ts new file mode 100644 index 00000000..d38b503c --- /dev/null +++ b/lib/errors/MaxRetriesPerRequestError.ts @@ -0,0 +1,14 @@ +import {AbortError} from 'redis-errors' + +export default class MaxRetriesPerRequestError extends AbortError { + constructor (maxRetriesPerRequest: number) { + const message = `Reached the max retries per request limit (which is ${maxRetriesPerRequest}). Refer to "maxRetriesPerRequest" option for details.` + + super(message) + Error.captureStackTrace(this, this.constructor) + } + + get name (): string { + return this.constructor.name + } +} diff --git a/lib/errors/index.js b/lib/errors/index.js deleted file mode 100644 index 6a342fd4..00000000 --- a/lib/errors/index.js +++ /dev/null @@ -1 +0,0 @@ -exports.MaxRetriesPerRequestError = require('./MaxRetriesPerRequestError') diff --git a/lib/errors/index.ts b/lib/errors/index.ts new file mode 100644 index 00000000..52f86ab7 --- /dev/null +++ b/lib/errors/index.ts @@ -0,0 +1,5 @@ +import MaxRetriesPerRequestError from './MaxRetriesPerRequestError' + +export { + MaxRetriesPerRequestError +} diff --git a/lib/redis.js b/lib/redis.js index 93517148..1737326a 100644 --- a/lib/redis.js +++ b/lib/redis.js @@ -228,16 +228,9 @@ Redis.prototype.parseOptions = function () { * @private */ Redis.prototype.setStatus = function (status, arg) { - var address; - if (this.options.path) { - address = this.options.path; - } else if (this.stream && this.stream.remoteAddress && this.stream.remotePort) { - address = this.stream.remoteAddress + ':' + this.stream.remotePort; - } else { - address = this.options.host + ':' + this.options.port; + if (debug.enabled) { + debug('status[%s]: %s -> %s', this._getDescription(), this.status || '[empty]', status); } - - debug('status[%s]: %s -> %s', address, this.status || '[empty]', status); this.status = status; process.nextTick(this.emit.bind(this, status, arg)); }; @@ -602,7 +595,9 @@ Redis.prototype.sendCommand = function (command, stream) { } if (writable) { - debug('write command[%d] -> %s(%s)', this.condition.select, command.name, command.args); + if (debug.enabled) { + debug('write command[%s]: %d -> %s(%o)', this._getDescription(), this.condition.select, command.name, command.args); + } (stream || this.stream).write(command.toWritable()); this.commandQueue.push({ @@ -615,7 +610,9 @@ Redis.prototype.sendCommand = function (command, stream) { this.manuallyClosing = true; } } else if (this.options.enableOfflineQueue) { - debug('queue command[%d] -> %s(%s)', this.condition.select, command.name, command.args); + if (debug.enabled) { + debug('queue command[%s]: %d -> %s(%o)', this._getDescription(), this.condition.select, command.name, command.args); + } this.offlineQueue.push({ command: command, stream: stream, @@ -635,6 +632,25 @@ Redis.prototype.sendCommand = function (command, stream) { return command.promise; }; +/** + * Get description of the connection. Used for debugging. + * @private + */ +Redis.prototype._getDescription = function () { + let description; + if (this.options.path) { + description = this.options.path; + } else if (this.stream && this.stream.remoteAddress && this.stream.remotePort) { + description = this.stream.remoteAddress + ':' + this.stream.remotePort; + } else { + description = this.options.host + ':' + this.options.port; + } + if (this.options.connectionName) { + description += ` (${this.options.connectionName})` + } + return description +}; + ['scan', 'sscan', 'hscan', 'zscan', 'scanBuffer', 'sscanBuffer', 'hscanBuffer', 'zscanBuffer'] .forEach(function (command) { Redis.prototype[command + 'Stream'] = function (key, options) { diff --git a/lib/redis/event_handler.js b/lib/redis/event_handler.js index 77805594..e7e2140e 100644 --- a/lib/redis/event_handler.js +++ b/lib/redis/event_handler.js @@ -157,7 +157,7 @@ exports.readyHandler = function (self) { if (self.options.connectionName) { debug('set the connection name [%s]', self.options.connectionName); - self.client('setname', self.options.connectionName); + self.client('setname', self.options.connectionName).catch(_.noop); } if (self.options.readOnly) { diff --git a/package-lock.json b/package-lock.json index d4b30a19..c54be721 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1339,9 +1339,9 @@ "dev": true }, "typescript": { - "version": "2.9.2", - "resolved": "https://registry.npmjs.org/typescript/-/typescript-2.9.2.tgz", - "integrity": "sha512-Gr4p6nFNaoufRIY4NMdpQRNmgxVIGMs4Fcu/ujdYk3nAZqk7supzBE9idmvfZIlH/Cuj//dvi+019qEue9lV0w==", + "version": "3.1.1", + "resolved": "http://registry.npm.taobao.org/typescript/download/typescript-3.1.1.tgz", + "integrity": "sha1-M2K6ndHkguuyNVsC3+i80ZosfJY=", "dev": true }, "typical": { diff --git a/package.json b/package.json index a1b20f83..6d74dfca 100644 --- a/package.json +++ b/package.json @@ -62,7 +62,7 @@ "server-destroy": "^1.0.1", "sinon": "^1.17.3", "ts-node": "^7.0.0", - "typescript": "^2.9.2" + "typescript": "^3.1.1" }, "engines": { "node": ">=6" diff --git a/test/functional/cluster/pub_sub.js b/test/functional/cluster/pub_sub.js index 95143283..86535d56 100644 --- a/test/functional/cluster/pub_sub.js +++ b/test/functional/cluster/pub_sub.js @@ -15,7 +15,7 @@ describe('cluster:pub/sub', function () { var sub = new Redis.Cluster(options); sub.subscribe('test cluster', function () { - node1.write(node1.clients[0], ['message', 'test channel', 'hi']); + node1.write(node1.findClientByName('ioredisClusterSubscriber'), ['message', 'test channel', 'hi']); }); sub.on('message', function (channel, message) { expect(channel).to.eql('test channel'); @@ -25,6 +25,26 @@ describe('cluster:pub/sub', function () { }); }); + it('should works when sending regular commands', function (done) { + var handler = function (argv) { + if (argv[0] === 'cluster' && argv[1] === 'slots') { + return [ + [0, 16383, ['127.0.0.1', 30001]] + ]; + } + }; + new MockServer(30001, handler); + + var sub = new Redis.Cluster([{port: '30001'}]); + + sub.subscribe('test cluster', function () { + sub.set('foo', 'bar').then((res) => { + expect(res).to.eql('OK') + done(); + }); + }); + }); + it('should re-subscribe after reconnection', function (done) { new MockServer(30001, function (argv) { if (argv[0] === 'cluster' && argv[1] === 'slots') { @@ -79,4 +99,3 @@ describe('cluster:pub/sub', function () { }); }); }); - diff --git a/test/helpers/mock_server.js b/test/helpers/mock_server.js index 99130187..08f7032b 100644 --- a/test/helpers/mock_server.js +++ b/test/helpers/mock_server.js @@ -56,6 +56,9 @@ MockServer.prototype.connect = function () { returnBuffers: true, returnReply: function (reply) { reply = utils.convertBufferToString(reply); + if (reply.length === 3 && reply[0].toLowerCase() === 'client' && reply[1].toLowerCase() === 'setname') { + c._connectionName = reply[2] + } _this.write(c, _this.handler && _this.handler(reply)); }, returnError: function () { } @@ -119,6 +122,14 @@ MockServer.prototype.write = function (c, data) { } }; +MockServer.prototype.findClientByName = function (name) { + for (const client of this.clients) { + if (client._connectionName === name) { + return client + } + } +} + MockServer.REDIS_OK = '+OK'; module.exports = MockServer;