Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix autopipeling for buffer function + added built in commands #1231

Merged
merged 8 commits into from
May 22, 2021
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions lib/autoPipelining.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@ export const notAllowedAutoPipelineCommands = [
"unpsubscribe",
];

function findAutoPipeline(client, _commandName, ...args: Array<string>): string {
function findAutoPipeline(
client,
_commandName,
...args: Array<string>
): string {
if (!client.isCluster) {
return "main";
}
Expand Down Expand Up @@ -68,8 +72,13 @@ function executeAutoPipeline(client, slotKey: string) {
});
}

export function shouldUseAutoPipelining(client, commandName: string): boolean {
export function shouldUseAutoPipelining(
client,
functionName: string,
commandName: string
): boolean {
return (
functionName &&
adamnoakes marked this conversation as resolved.
Show resolved Hide resolved
client.options.enableAutoPipelining &&
!client.isPipeline &&
!notAllowedAutoPipelineCommands.includes(commandName) &&
Expand All @@ -79,6 +88,7 @@ export function shouldUseAutoPipelining(client, commandName: string): boolean {

export function executeWithAutoPipelining(
client,
functionName: string,
commandName: string,
args: string[],
callback
Expand All @@ -94,10 +104,13 @@ export function executeWithAutoPipelining(
return;
}

executeWithAutoPipelining(client, commandName, args, callback).then(
resolve,
reject
);
executeWithAutoPipelining(
client,
functionName,
commandName,
args,
callback
).then(resolve, reject);
});
});
}
Expand Down Expand Up @@ -138,7 +151,7 @@ export function executeWithAutoPipelining(
resolve(value);
});

pipeline[commandName](...args);
pipeline[functionName](...args);
});

return asCallback(autoPipelinePromise, callback);
Expand Down
83 changes: 68 additions & 15 deletions lib/commander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export default function Commander() {
showFriendlyErrorStack: false,
});
this.scriptsSet = {};
this.addedBuiltinSet = new Set();
}

const commands = require("redis-commands").list.filter(function (command) {
Expand All @@ -57,21 +58,43 @@ Commander.prototype.getBuiltinCommands = function () {
*/
Commander.prototype.createBuiltinCommand = function (commandName) {
return {
string: generateFunction(commandName, "utf8"),
buffer: generateFunction(commandName, null),
string: generateFunction(null, commandName, "utf8"),
buffer: generateFunction(null, commandName, null),
};
};

/**
* Create add builtin command
*
* @param {string} commandName - command name
* @return {object} functions
* @public
*/
Commander.prototype.addBuiltinCommand = function (commandName) {
adamnoakes marked this conversation as resolved.
Show resolved Hide resolved
this.addedBuiltinSet.add(commandName);
this[commandName] = generateFunction(commandName, commandName, "utf8");
this[commandName + "Buffer"] = generateFunction(
commandName + "Buffer",
commandName,
null
);
};

commands.forEach(function (commandName) {
Commander.prototype[commandName] = generateFunction(commandName, "utf8");
Commander.prototype[commandName] = generateFunction(
commandName,
commandName,
"utf8"
);
Commander.prototype[commandName + "Buffer"] = generateFunction(
commandName + "Buffer",
commandName,
null
);
});

Commander.prototype.call = generateFunction("utf8");
Commander.prototype.callBuffer = generateFunction(null);
Commander.prototype.call = generateFunction("call", "utf8");
Commander.prototype.callBuffer = generateFunction("callBuffer", null);
// eslint-disable-next-line @typescript-eslint/camelcase
Commander.prototype.send_command = Commander.prototype.call;

Expand All @@ -93,8 +116,13 @@ Commander.prototype.defineCommand = function (name, definition) {
definition.readOnly
);
this.scriptsSet[name] = script;
this[name] = generateScriptingFunction(name, script, "utf8");
this[name + "Buffer"] = generateScriptingFunction(name, script, null);
this[name] = generateScriptingFunction(name, name, script, "utf8");
this[name + "Buffer"] = generateScriptingFunction(
name + "Buffer",
name,
script,
null
);
};

/**
Expand All @@ -105,9 +133,17 @@ Commander.prototype.defineCommand = function (name, definition) {
*/
Commander.prototype.sendCommand = function () {};

function generateFunction(_encoding: string);
function generateFunction(_commandName: string | void, _encoding: string);
function generateFunction(_commandName?: string, _encoding?: string) {
function generateFunction(functionName: string | null, _encoding: string);
function generateFunction(
functionName: string | null,
_commandName: string | void,
_encoding: string
);
function generateFunction(
functionName: string | null,
_commandName?: string,
_encoding?: string
) {
if (typeof _encoding === "undefined") {
_encoding = _commandName;
_commandName = null;
Expand Down Expand Up @@ -139,18 +175,29 @@ function generateFunction(_commandName?: string, _encoding?: string) {
}

// No auto pipeline, use regular command sending
if (!shouldUseAutoPipelining(this, commandName)) {
if (!shouldUseAutoPipelining(this, functionName, commandName)) {
return this.sendCommand(
new Command(commandName, args, options, callback)
);
}

// Create a new pipeline and make sure it's scheduled
return executeWithAutoPipelining(this, commandName, args, callback);
return executeWithAutoPipelining(
this,
functionName,
commandName,
args,
callback
);
};
}

function generateScriptingFunction(name, script, encoding) {
function generateScriptingFunction(
functionName,
commandName,
script,
encoding
) {
return function () {
let length = arguments.length;
const lastArgIndex = length - 1;
Expand Down Expand Up @@ -183,11 +230,17 @@ function generateScriptingFunction(name, script, encoding) {
}

// No auto pipeline, use regular command sending
if (!shouldUseAutoPipelining(this, name)) {
if (!shouldUseAutoPipelining(this, functionName, commandName)) {
return script.execute(this, args, options, callback);
}

// Create a new pipeline and make sure it's scheduled
return executeWithAutoPipelining(this, name, args, callback);
return executeWithAutoPipelining(
this,
functionName,
commandName,
args,
callback
);
};
}
5 changes: 5 additions & 0 deletions lib/pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,11 @@ export default function Pipeline(redis) {
this[name + "Buffer"] = redis[name + "Buffer"];
});

redis.addedBuiltinSet.forEach((name) => {
this[name] = redis[name];
this[name + "Buffer"] = redis[name + "Buffer"];
});

const Promise = PromiseContainer.get();
this.promise = new Promise((resolve, reject) => {
this.resolve = resolve;
Expand Down
11 changes: 11 additions & 0 deletions test/functional/autopipelining.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { expect, use } from "chai";
import Redis from "../../lib/redis";
import { ReplyError } from "redis-errors";
import * as sinon from "sinon";

use(require("chai-as-promised"));

Expand Down Expand Up @@ -44,6 +46,15 @@ describe("autoPipelining for single node", function () {
await promise;
});

it("should support buffer commands", async () => {
const redis = new Redis({ enableAutoPipelining: true });
const buffer = Buffer.from("bar");
await redis.setBuffer("foo", buffer);
const promise = redis.getBuffer("foo");
expect(redis.autoPipelineQueueSize).to.eql(1);
expect(await promise).to.eql(buffer);
});

it("should support custom commands", async () => {
const redis = new Redis({ enableAutoPipelining: true });

Expand Down
20 changes: 10 additions & 10 deletions test/functional/cluster/autopipelining.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { expect, use } from "chai";
import * as calculateKeySlot from 'cluster-key-slot';
import * as calculateKeySlot from "cluster-key-slot";

import { default as Cluster } from "../../../lib/cluster";
import MockServer from "../../helpers/mock_server";
Expand Down Expand Up @@ -400,11 +400,11 @@ describe("autoPipelining for cluster", function () {
const promise2 = cluster.set("foo5", "bar");
const promise3 = cluster.set("foo2", "bar");
const promise4 = cluster.set("foo6", "bar");

// Override slots to induce a failure
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
const key5Slot = calculateKeySlot("foo5");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];

Expand Down Expand Up @@ -492,9 +492,9 @@ describe("autoPipelining for cluster", function () {
expect(cluster.autoPipelineQueueSize).to.eql(4);

// Override slots to induce a failure
const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key5Slot = calculateKeySlot('foo5');
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
const key5Slot = calculateKeySlot("foo5");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
cluster.slots[key2Slot] = cluster.slots[key5Slot];
});
Expand Down Expand Up @@ -541,8 +541,8 @@ describe("autoPipelining for cluster", function () {

expect(cluster.autoPipelineQueueSize).to.eql(3);

const key1Slot = calculateKeySlot('foo1');
const key2Slot = calculateKeySlot('foo2');
const key1Slot = calculateKeySlot("foo1");
const key2Slot = calculateKeySlot("foo2");
cluster.slots[key1Slot] = cluster.slots[key2Slot];
});
});
Expand Down
11 changes: 11 additions & 0 deletions test/functional/pipeline.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import Redis from "../../lib/redis";
import { expect } from "chai";
import * as sinon from "sinon";

describe("pipeline", function () {
it("should return correct result", function (done) {
Expand Down Expand Up @@ -138,6 +139,16 @@ describe("pipeline", function () {
});
});

it("should include added built in commands", async () => {
const redis = new Redis({ keyPrefix: "foo:" });
redis.addBuiltinCommand("someCommand");
sinon.stub(redis, "sendCommand").callsFake((command) => {
command.resolve(Buffer.from("OK"));
});
const result = await redis.pipeline().someCommand().exec();
expect(result).to.eql([[null, "OK"]]);
});

describe("custom commands", function () {
let redis;

Expand Down
22 changes: 22 additions & 0 deletions test/unit/commander.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,28 @@ describe("Commander", function () {
});
});

describe("#addBuiltinCommand()", () => {
beforeEach(() => sinon.spy(Commander.prototype, "sendCommand"));
afterEach(() => sinon.restore());
it("adds string command", () => {
const c = new Commander();
c.addBuiltinCommand("someCommand");
c.someCommand();
const command = Commander.prototype.sendCommand.getCall(0).args[0];
expect(command.name).to.eql("someCommand");
expect(command.replyEncoding).to.eql("utf8");
});

it("adds buffer command", () => {
const c = new Commander();
c.addBuiltinCommand("someCommand");
c.someCommandBuffer();
const command = Commander.prototype.sendCommand.getCall(0).args[0];
expect(command.name).to.eql("someCommand");
expect(command.replyEncoding).to.eql(null);
});
});

it("should pass the correct arguments", function () {
sinon.stub(Commander.prototype, "sendCommand").callsFake((command) => {
return command;
Expand Down