From 1cf2ac1755cca389e513cad2271e1aaa355e5439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Zihua=20=E5=AD=90=E9=AA=85?= Date: Mon, 2 May 2016 22:41:58 +0800 Subject: [PATCH] fix: reconnect when getting fatal error (#292) * fix: reconnect when getting fatal error * test: add test for Redis#flushQueue * test: add test for fatal error --- lib/redis.js | 24 +++++++++++++++++------- lib/redis/parser.js | 4 +++- test/functional/fatal_error.js | 21 +++++++++++++++++++++ test/unit/redis.js | 34 +++++++++++++++++++++++++++++++++- 4 files changed, 74 insertions(+), 9 deletions(-) create mode 100644 test/functional/fatal_error.js diff --git a/lib/redis.js b/lib/redis.js index 3fcd7a00..b810807f 100644 --- a/lib/redis.js +++ b/lib/redis.js @@ -353,18 +353,28 @@ Redis.prototype.duplicate = function (override) { * Flush offline queue and command queue with error. * * @param {Error} error - The error object to send to the commands + * @param {object} options * @private */ -Redis.prototype.flushQueue = function (error) { +Redis.prototype.flushQueue = function (error, options) { + options = _.defaults({}, options, { + offlineQueue: true, + commandQueue: true + }); + var item; - while (this.offlineQueue.length > 0) { - item = this.offlineQueue.shift(); - item.command.reject(error); + if (options.offlineQueue) { + while (this.offlineQueue.length > 0) { + item = this.offlineQueue.shift(); + item.command.reject(error); + } } - while (this.commandQueue.length > 0) { - item = this.commandQueue.shift(); - item.command.reject(error); + if (options.commandQueue) { + while (this.commandQueue.length > 0) { + item = this.commandQueue.shift(); + item.command.reject(error); + } } }; diff --git a/lib/redis/parser.js b/lib/redis/parser.js index 39c3ec66..6137cc4c 100644 --- a/lib/redis/parser.js +++ b/lib/redis/parser.js @@ -28,7 +28,9 @@ exports.initParser = function () { _this.returnReply(reply); }, returnFatalError: function (err) { - _this.emit('error', new Error('Redis reply parser error: ' + err.stack)); + _this.flushQueue(err, { offlineQueue: false }); + _this.silentEmit('error', new Error('Redis parser fatal error: ' + err.stack)); + _this.disconnect(true); } }); }; diff --git a/test/functional/fatal_error.js b/test/functional/fatal_error.js new file mode 100644 index 00000000..b2b3671f --- /dev/null +++ b/test/functional/fatal_error.js @@ -0,0 +1,21 @@ +'use strict'; + +describe('fatal_error', function () { + it('should handle fatal error of parser', function (done) { + var redis = new Redis(); + redis.once('ready', function () { + var execute = redis.replyParser.execute; + redis.replyParser.execute = function () { + execute.call(redis.replyParser, '&'); + }; + redis.get('foo', function (err) { + expect(err.message).to.match(/Protocol error/); + redis.replyParser.execute = execute; + redis.get('bar', function (err) { + expect(err).to.eql(null); + done(); + }); + }); + }); + }); +}); diff --git a/test/unit/redis.js b/test/unit/redis.js index 35f2427a..ed3dfe8e 100644 --- a/test/unit/redis.js +++ b/test/unit/redis.js @@ -84,7 +84,7 @@ describe('Redis', function () { } Redis.prototype.connect.restore(); - function getOption () { + function getOption() { var redis = Redis.apply(null, arguments); return redis.options; } @@ -115,4 +115,36 @@ describe('Redis', function () { redis.end(); }); }); + + describe('#flushQueue', function () { + it('should flush all queues by default', function () { + var flushQueue = Redis.prototype.flushQueue; + var redis = { + offlineQueue: [{ command: { reject: function () {} } }], + commandQueue: [{ command: { reject: function () {} } }] + }; + var offline = mock(redis.offlineQueue[0].command); + var command = mock(redis.commandQueue[0].command); + offline.expects('reject').once(); + command.expects('reject').once(); + flushQueue.call(redis); + offline.verify(); + command.verify(); + }); + + it('should be able to ignore a queue', function () { + var flushQueue = Redis.prototype.flushQueue; + var redis = { + offlineQueue: [{ command: { reject: function () {} } }], + commandQueue: [{ command: { reject: function () {} } }] + }; + var offline = mock(redis.offlineQueue[0].command); + var command = mock(redis.commandQueue[0].command); + offline.expects('reject').once(); + command.expects('reject').never(); + flushQueue.call(redis, new Error(), { commandQueue: false }); + offline.verify(); + command.verify(); + }); + }); });