Skip to content

Commit

Permalink
perf(cluster): improve the performance of calculating slots (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
luin authored Jun 12, 2016
1 parent 1caeb51 commit 3ab4e8a
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 153 deletions.
3 changes: 2 additions & 1 deletion lib/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ var Promise = require('bluebird');
var fbuffer = require('flexbuffer');
var utils = require('./utils');
var commands = require('redis-commands');
var calculateSlot = require('cluster-key-slot');

/**
* Command instance
Expand Down Expand Up @@ -84,7 +85,7 @@ Command.prototype.getSlot = function () {
if (typeof this._slot === 'undefined') {
var key = this.getKeys()[0];
if (key) {
this.slot = utils.calcSlot(key);
this.slot = calculateSlot(key);
} else {
this.slot = null;
}
Expand Down
48 changes: 27 additions & 21 deletions lib/pipeline.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ var Commander = require('./commander');
var Command = require('./command');
var fbuffer = require('flexbuffer');
var Promise = require('bluebird');
var utils = require('./utils');
var util = require('util');
var commands = require('redis-commands');
var calculateSlot = require('cluster-key-slot');

function Pipeline(redis) {
Commander.call(this);
Expand Down Expand Up @@ -210,25 +210,33 @@ Pipeline.prototype.exec = function (callback) {
if (_.isEmpty(this._queue)) {
this.resolve([]);
}
var pipelineSlot;
// Check whether scripts exists and get a sampleKey.
var scripts = [];
for (var i = 0; i < this._queue.length; ++i) {
var item = this._queue[i];
if (this.isCluster) {
var keys = item.getKeys();
for (var j = 0; j < keys.length; ++j) {
var slot = utils.calcSlot(keys[j]);
if (typeof pipelineSlot === 'undefined') {
pipelineSlot = slot;
}
if (pipelineSlot !== slot) {
this.reject(new Error('All keys in the pipeline should belong to the same slot(expect "' +
keys[j] + '" belongs to slot ' + pipelineSlot + ').'));
return this.promise;
}
var pipelineSlot, i;
if (this.isCluster) {
// List of the first key for each command
var sampleKeys = [];
for (i = 0; i < this._queue.length; i++) {
var keys = this._queue[i].getKeys();
if (keys.length) {
sampleKeys.push(keys[0]);
}
}

if (sampleKeys.length) {
pipelineSlot = calculateSlot.generateMulti(sampleKeys);
if (pipelineSlot < 0) {
this.reject(new Error('All keys in the pipeline should belong to the same slot'));
return this.promise;
}
} else {
// Send the pipeline to a random node
pipelineSlot = Math.random() * 16384 | 0;
}
}

// Check whether scripts exists
var scripts = [];
for (i = 0; i < this._queue.length; ++i) {
var item = this._queue[i];
if (this.isCluster && item.isCustomCommand) {
this.reject(new Error('Sending custom commands in pipeline is not supported in Cluster mode.'));
return this.promise;
Expand All @@ -242,14 +250,12 @@ Pipeline.prototype.exec = function (callback) {
}
scripts.push(script);
}
if (this.isCluster && typeof pipelineSlot === 'undefined') {
pipelineSlot = Math.random() * 16384 | 0;
}

var _this = this;
if (!scripts.length) {
return execPipeline();
}

return this.redis.script('exists', scripts.map(function (item) {
return item.sha;
})).then(function (results) {
Expand Down
91 changes: 0 additions & 91 deletions lib/utils/crc.js

This file was deleted.

18 changes: 0 additions & 18 deletions lib/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -213,24 +213,6 @@ exports.toArg = function (arg) {
return String(arg);
};

var crc16 = require('./crc');
/**
* Calculate slot by key
*
* @param {string} key
* @return {number}
*/
exports.calcSlot = function (key) {
var s = key.indexOf('{');
if (s !== -1) {
var e = key.indexOf('}', s + 2);
if (e !== -1) {
key = key.slice(s + 1, e);
}
}
return crc16(key) & 16383;
};

/**
* Optimize error stack
*
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"license": "MIT",
"dependencies": {
"bluebird": "^3.3.4",
"cluster-key-slot": "^1.0.5",
"debug": "^2.2.0",
"double-ended-queue": "^2.1.0-0",
"flexbuffer": "0.0.6",
Expand Down
21 changes: 11 additions & 10 deletions test/functional/cluster.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var utils = require('../../lib/utils');
var calculateSlot = require('cluster-key-slot');
var Promise = require('bluebird');

describe('cluster', function () {
Expand Down Expand Up @@ -325,7 +326,7 @@ describe('cluster', function () {
if (argv[0] === 'get' && argv[1] === 'foo') {
expect(moved).to.eql(false);
moved = true;
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});

Expand All @@ -345,7 +346,7 @@ describe('cluster', function () {
];
}
if (argv[0] === 'get' && argv[1] === 'foo') {
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30002');
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30002');
}
});
var node2 = new MockServer(30002, function (argv) {
Expand Down Expand Up @@ -397,7 +398,7 @@ describe('cluster', function () {
if (argv[0] === 'get' && argv[1] === 'foo') {
expect(moved).to.eql(false);
moved = true;
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});

Expand Down Expand Up @@ -439,7 +440,7 @@ describe('cluster', function () {
disconnect([node1, node2], done);
});
} else {
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
}
});
Expand Down Expand Up @@ -530,7 +531,7 @@ describe('cluster', function () {
];
} else if (argv[0] === 'get' && argv[1] === 'foo') {
redirectTimes += 1;
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
};
var node1 = new MockServer(30001, argvHandler);
Expand Down Expand Up @@ -650,7 +651,7 @@ describe('cluster', function () {
expect(moved).to.eql(false);
moved = true;
}
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});

Expand Down Expand Up @@ -693,7 +694,7 @@ describe('cluster', function () {
return slotTable;
}
if (argv[1] === 'foo') {
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});

Expand Down Expand Up @@ -755,7 +756,7 @@ describe('cluster', function () {
return slotTable;
}
if (argv[0] === 'get' && argv[1] === 'foo') {
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
});

Expand Down Expand Up @@ -836,7 +837,7 @@ describe('cluster', function () {
}
if (argv[0] === 'get' && argv[1] === 'foo') {
moved = true;
return new Error('MOVED ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('MOVED ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
if (argv[0] === 'exec') {
return new Error('EXECABORT Transaction discarded because of previous errors.');
Expand Down Expand Up @@ -889,7 +890,7 @@ describe('cluster', function () {
return slotTable;
}
if (argv[0] === 'get' && argv[1] === 'foo') {
return new Error('ASK ' + utils.calcSlot('foo') + ' 127.0.0.1:30001');
return new Error('ASK ' + calculateSlot('foo') + ' 127.0.0.1:30001');
}
if (argv[0] === 'exec') {
return new Error('EXECABORT Transaction discarded because of previous errors.');
Expand Down
16 changes: 16 additions & 0 deletions test/unit/command.js
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,22 @@ describe('Command', function () {
});
});

describe('#getSlot()', function () {
it('should return correctly', function () {
expectSlot('123', 5970);
expectSlot('ab{c', 4619);
expectSlot('ab{c}2', 7365);
expectSlot('ab{{c}2', 2150);
expectSlot('ab{qq}{c}2', 5598);
expectSlot('ab}', 11817);
expectSlot('encoding', 3060);

function expectSlot(key, slot) {
expect(new Command('get', [key]).getSlot()).to.eql(slot);
}
});
});

describe('.checkFlag()', function () {
it('should return correct result', function () {
expect(Command.checkFlag('VALID_IN_SUBSCRIBER_MODE', 'ping')).to.eql(true);
Expand Down
13 changes: 1 addition & 12 deletions test/unit/utils.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict';

var utils = require('../../lib/utils');

describe('utils', function () {
describe('.bufferEqual', function () {
it('should return correctly', function () {
Expand Down Expand Up @@ -97,18 +98,6 @@ describe('utils', function () {
});
});

describe('.calcSlot', function () {
it('should return correctly', function () {
expect(utils.calcSlot('123')).to.eql(5970);
expect(utils.calcSlot('ab{c')).to.eql(4619);
expect(utils.calcSlot('ab{c}2')).to.eql(7365);
expect(utils.calcSlot('ab{{c}2')).to.eql(2150);
expect(utils.calcSlot('ab{qq}{c}2')).to.eql(5598);
expect(utils.calcSlot('ab}')).to.eql(11817);
expect(utils.calcSlot('encoding')).to.eql(3060);
});
});

describe('.toArg', function () {
it('should return correctly', function () {
expect(utils.toArg(null)).to.eql('');
Expand Down

0 comments on commit 3ab4e8a

Please sign in to comment.