Skip to content

Commit

Permalink
fix(cluster): robust solution for pub/sub in cluster
Browse files Browse the repository at this point in the history
Previously (v3 & v4.0.0), ioredis reuse the existing connection
for subscription, which will cause problem when executing commands
on the reused connection.

From now on, a specialized connection will be created when any
subscription has made. This solves the problem above perfectly.

Close #696
  • Loading branch information
luin committed Oct 6, 2018
1 parent 19947b3 commit 4c6ffeb
Show file tree
Hide file tree
Showing 14 changed files with 482 additions and 363 deletions.
122 changes: 122 additions & 0 deletions lib/cluster/ClusterSubscriber.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import {EventEmitter} from 'events'
import ConnectionPool from './ConnectionPool'
import {sample, noop} from '../utils/lodash'
import {getNodeKey} from './util'

const Redis = require('../redis')
const debug = require('../utils/debug')('ioredis:cluster:subscriber')

const SUBSCRIBER_CONNECTION_NAME = 'ioredisClusterSubscriber'

export default class ClusterSubscriber {
private started: boolean = false
private subscriber: any = null
private lastActiveSubscriber: any

constructor (private connectionPool: ConnectionPool, private emitter: EventEmitter) {
this.connectionPool.on('-node', (_, key: string) => {
if (!this.started || !this.subscriber) {
return
}
if (getNodeKey(this.subscriber.options) === key) {
debug('subscriber has left, selecting a new one...')
this.selectSubscriber()
}
})
this.connectionPool.on('+node', () => {
if (!this.started || this.subscriber) {
return
}
debug('a new node is discovered and there is no subscriber, selecting a new one...')
this.selectSubscriber()
})
}

getInstance (): any {
return this.subscriber
}

private selectSubscriber () {
const lastActiveSubscriber = this.lastActiveSubscriber

// Disconnect the previous subscriber even if there
// will not be a new one.
if (lastActiveSubscriber) {
lastActiveSubscriber.disconnect()
}

const sampleNode = sample(this.connectionPool.getNodes())
if (!sampleNode) {
debug('selecting subscriber failed since there is no node discovered in the cluster yet')
this.subscriber = null
return
}

const {port, host} = sampleNode.options
debug('selected a subscriber %s:%s', host, port)

// Create a specialized Redis connection for the subscription.
// Note that auto reconnection is enabled here.
// `enableReadyCheck` is disabled because subscription is allowed
// when redis is loading data from the disk.
this.subscriber = new Redis({
port,
host,
enableReadyCheck: false,
connectionName: SUBSCRIBER_CONNECTION_NAME,
lazyConnect: true
})

// Re-subscribe previous channels
var previousChannels = { subscribe: [], psubscribe: [] }
if (lastActiveSubscriber) {
const condition = lastActiveSubscriber.condition || lastActiveSubscriber.prevCondition
if (condition && condition.subscriber) {
previousChannels.subscribe = condition.subscriber.channels('subscribe')
previousChannels.psubscribe = condition.subscriber.channels('psubscribe')
}
}
if (previousChannels.subscribe.length || previousChannels.psubscribe.length) {
var pending = 0
for (const type of ['subscribe', 'psubscribe']) {
var channels = previousChannels[type]
if (channels.length) {
pending += 1
debug('%s %d channels', type, channels.length)
this.subscriber[type](channels).then(() => {
if (!--pending) {
this.lastActiveSubscriber = this.subscriber
}
}).catch(noop)
}
}
} else {
this.lastActiveSubscriber = this.subscriber
}
for (const event of ['message', 'messageBuffer']) {
this.subscriber.on(event, (arg1, arg2) => {
this.emitter.emit(event, arg1, arg2)
})
}
for (const event of ['pmessage', 'pmessageBuffer']) {
this.subscriber.on(event, (arg1, arg2, arg3) => {
this.emitter.emit(event, arg1, arg2, arg3)
})
}
}

start (): void {
this.started = true
this.selectSubscriber()
debug('started')
}

stop (): void {
this.started = false
if (this.subscriber) {
this.subscriber.disconnect()
this.subscriber = null
}
debug('stopped')
}
}
81 changes: 36 additions & 45 deletions lib/cluster/ConnectionPool.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,13 @@
import {parseURL} from '../utils'
import {EventEmitter} from 'events'
import {noop, defaults} from '../utils/lodash'
import {noop, defaults, values} from '../utils/lodash'
import {IRedisOptions, getNodeKey} from './util'

const Redis = require('../redis')
const debug = require('../utils/debug')('ioredis:cluster:connectionPool')

type NODE_TYPE = 'all' | 'master' | 'slave'

interface IRedisOptions {
[key: string]: any
}

interface IRedisOptionsWithKey extends IRedisOptions {
key: string
}

export default class ConnectionPool extends EventEmitter {
// master + slave = all
private nodes: {[key in NODE_TYPE]: {[key: string]: any}} = {
Expand All @@ -29,6 +22,10 @@ export default class ConnectionPool extends EventEmitter {
super()
}

public getNodes(role: 'all' | 'master' | 'slave' = 'all'): any[] {
return values(this.nodes[role])
}

/**
* Find or create a connection to the node
*
Expand All @@ -37,33 +34,34 @@ export default class ConnectionPool extends EventEmitter {
* @returns {*}
* @memberof ConnectionPool
*/
public findOrCreate (node: IRedisOptions, readOnly: boolean = false): any {
setKey(node)
public findOrCreate(node: IRedisOptions, readOnly: boolean = false): any {
fillDefaultOptions(node)
const key = getNodeKey(node)
readOnly = Boolean(readOnly)

if (this.specifiedOptions[node.key]) {
Object.assign(node, this.specifiedOptions[node.key])
if (this.specifiedOptions[key]) {
Object.assign(node, this.specifiedOptions[key])
} else {
this.specifiedOptions[node.key] = node
this.specifiedOptions[key] = node
}

let redis
if (this.nodes.all[node.key]) {
redis = this.nodes.all[node.key]
if (this.nodes.all[key]) {
redis = this.nodes.all[key]
if (redis.options.readOnly !== readOnly) {
redis.options.readOnly = readOnly
debug('Change role of %s to %s', node.key, readOnly ? 'slave' : 'master')
debug('Change role of %s to %s', key, readOnly ? 'slave' : 'master')
redis[readOnly ? 'readonly' : 'readwrite']().catch(noop)
if (readOnly) {
delete this.nodes.master[node.key]
this.nodes.slave[node.key] = redis
delete this.nodes.master[key]
this.nodes.slave[key] = redis
} else {
delete this.nodes.slave[node.key]
this.nodes.master[node.key] = redis
delete this.nodes.slave[key]
this.nodes.master[key] = redis
}
}
} else {
debug('Connecting to %s as %s', node.key, readOnly ? 'slave' : 'master')
debug('Connecting to %s as %s', key, readOnly ? 'slave' : 'master')
redis = new Redis(defaults({
// Never try to reconnect when a node is lose,
// instead, waiting for a `MOVED` error and
Expand All @@ -75,23 +73,23 @@ export default class ConnectionPool extends EventEmitter {
enableOfflineQueue: true,
readOnly: readOnly
}, node, this.redisOptions, { lazyConnect: true }))
this.nodes.all[node.key] = redis
this.nodes[readOnly ? 'slave' : 'master'][node.key] = redis
this.nodes.all[key] = redis
this.nodes[readOnly ? 'slave' : 'master'][key] = redis

redis.once('end', () => {
delete this.nodes.all[node.key]
delete this.nodes.master[node.key]
delete this.nodes.slave[node.key]
this.emit('-node', redis)
delete this.nodes.all[key]
delete this.nodes.master[key]
delete this.nodes.slave[key]
this.emit('-node', redis, key)
if (!Object.keys(this.nodes.all).length) {
this.emit('drain')
}
})

this.emit('+node', redis)
this.emit('+node', redis, key)

redis.on('error', function (error) {
this.emit('nodeError', error)
this.emit('nodeError', error, key)
})
}

Expand All @@ -105,14 +103,15 @@ export default class ConnectionPool extends EventEmitter {
* @param {(Array<string | number | object>)} nodes
* @memberof ConnectionPool
*/
public reset (nodes: Array<string | number | object>): void {
public reset(nodes: Array<string | number | object>): void {
debug('Reset with %O', nodes);
const newNodes = {}
nodes.forEach((node) => {
const options: {port?: number | string, db?: number, key?: string} = {}
const options: IRedisOptions = {}
if (typeof node === 'object') {
defaults(options, node)
Object.assign(options, node)
} else if (typeof node === 'string') {
defaults(options, parseURL(node))
Object.assign(options, parseURL(node))
} else if (typeof node === 'number') {
options.port = node
} else {
Expand All @@ -123,8 +122,8 @@ export default class ConnectionPool extends EventEmitter {
}
delete options.db

setKey(options)
newNodes[options.key] = options
fillDefaultOptions(options)
newNodes[getNodeKey(options)] = options
}, this)

Object.keys(this.nodes.all).forEach((key) => {
Expand All @@ -140,15 +139,7 @@ export default class ConnectionPool extends EventEmitter {
}
}

/**
* Set key property
*
* @private
*/
function setKey(node: IRedisOptions): IRedisOptionsWithKey {
node = node || {}
function fillDefaultOptions(node: IRedisOptions): void {
node.port = node.port || 6379
node.host = node.host || '127.0.0.1'
node.key = node.key || node.host + ':' + node.port
return <IRedisOptionsWithKey>node
}
Loading

0 comments on commit 4c6ffeb

Please sign in to comment.