From a46415187d32bfdc974072403edb8aca2df282d6 Mon Sep 17 00:00:00 2001 From: Andres Kalle Date: Sat, 24 Apr 2021 12:41:30 +0300 Subject: [PATCH] feat(sentinel): detect failover from +switch-master messages (#1328) --- .../SentinelConnector/FailoverDetector.ts | 65 ++++ lib/connectors/SentinelConnector/index.ts | 95 ++++- lib/connectors/SentinelConnector/types.ts | 27 ++ lib/redis/RedisOptions.ts | 9 + lib/redis/index.ts | 5 +- test/functional/sentinel.ts | 326 ++++++++++++++++-- test/helpers/mock_server.ts | 4 + test/helpers/once.ts | 28 ++ 8 files changed, 525 insertions(+), 34 deletions(-) create mode 100644 lib/connectors/SentinelConnector/FailoverDetector.ts create mode 100644 test/helpers/once.ts diff --git a/lib/connectors/SentinelConnector/FailoverDetector.ts b/lib/connectors/SentinelConnector/FailoverDetector.ts new file mode 100644 index 00000000..2b3a9d9e --- /dev/null +++ b/lib/connectors/SentinelConnector/FailoverDetector.ts @@ -0,0 +1,65 @@ +import { Debug } from "../../utils"; +import SentinelConnector from "./index"; +import { ISentinel } from "./types"; + +const debug = Debug("FailoverDetector"); + +const CHANNEL_NAME = "+switch-master"; + +export class FailoverDetector { + private connector: SentinelConnector; + private sentinels: ISentinel[]; + private isDisconnected = false; + + // sentinels can't be used for regular commands after this + constructor(connector: SentinelConnector, sentinels: ISentinel[]) { + this.connector = connector; + this.sentinels = sentinels; + } + + public cleanup() { + this.isDisconnected = true; + + for (const sentinel of this.sentinels) { + sentinel.client.disconnect(); + } + } + + public async subscribe() { + debug("Starting FailoverDetector"); + + const promises: Promise[] = []; + + for (const sentinel of this.sentinels) { + const promise = sentinel.client.subscribe(CHANNEL_NAME).catch((err) => { + debug( + "Failed to subscribe to failover messages on sentinel %s:%s (%s)", + sentinel.address.host || "127.0.0.1", + sentinel.address.port || 26739, + err.message + ); + }); + + promises.push(promise); + + sentinel.client.on("message", (channel: string) => { + if (!this.isDisconnected && channel === CHANNEL_NAME) { + this.disconnect(); + } + }); + } + + await Promise.all(promises); + } + + private disconnect() { + // Avoid disconnecting more than once per failover. + // A new FailoverDetector will be created after reconnecting. + this.isDisconnected = true; + + debug("Failover detected, disconnecting"); + + // Will call this.cleanup() + this.connector.disconnect(); + } +} diff --git a/lib/connectors/SentinelConnector/index.ts b/lib/connectors/SentinelConnector/index.ts index 26d24bf0..8a92417c 100644 --- a/lib/connectors/SentinelConnector/index.ts +++ b/lib/connectors/SentinelConnector/index.ts @@ -1,3 +1,4 @@ +import { EventEmitter } from "events"; import { createConnection } from "net"; import { INatMap } from "../../cluster/ClusterOptions"; import { @@ -12,10 +13,12 @@ import { isIIpcConnectionOptions, } from "../StandaloneConnector"; import SentinelIterator from "./SentinelIterator"; -import { ISentinelAddress } from "./types"; +import { IRedisClient, ISentinelAddress, ISentinel } from "./types"; import AbstractConnector, { ErrorEmitter } from "../AbstractConnector"; import { NetStream } from "../../types"; import Redis from "../../redis"; +import { IRedisOptions } from "../../redis/RedisOptions"; +import { FailoverDetector } from "./FailoverDetector"; const debug = Debug("SentinelConnector"); @@ -39,6 +42,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { sentinelPassword?: string; sentinels: Array>; sentinelRetryStrategy?: (retryAttempts: number) => number | void | null; + sentinelReconnectStrategy?: (retryAttempts: number) => number | void | null; preferredSlaves?: PreferredSlaves; connectTimeout?: number; disconnectTimeout?: number; @@ -47,11 +51,14 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions { sentinelTLS?: ConnectionOptions; natMap?: INatMap; updateSentinels?: boolean; + sentinelMaxConnections?: number; } export default class SentinelConnector extends AbstractConnector { private retryAttempts: number; + private failoverDetector: FailoverDetector | null = null; protected sentinelIterator: SentinelIterator; + public emitter: EventEmitter | null = null; constructor(protected options: ISentinelConnectionOptions) { super(options.disconnectTimeout); @@ -84,6 +91,14 @@ export default class SentinelConnector extends AbstractConnector { return roleMatches; } + public disconnect(): void { + super.disconnect(); + + if (this.failoverDetector) { + this.failoverDetector.cleanup(); + } + } + public connect(eventEmitter: ErrorEmitter): Promise { this.connecting = true; this.retryAttempts = 0; @@ -134,8 +149,15 @@ export default class SentinelConnector extends AbstractConnector { throw new Error(CONNECTION_CLOSED_ERROR_MSG); } + const endpointAddress = endpoint.value.host + ":" + endpoint.value.port; + if (resolved) { - debug("resolved: %s:%s", resolved.host, resolved.port); + debug( + "resolved: %s:%s from sentinel %s", + resolved.host, + resolved.port, + endpointAddress + ); if (this.options.enableTLSForSentinelMode && this.options.tls) { Object.assign(resolved, this.options.tls); this.stream = createTLSConnection(resolved); @@ -143,14 +165,14 @@ export default class SentinelConnector extends AbstractConnector { this.stream = createConnection(resolved); } + this.stream.once("connect", () => this.initFailoverDetector()); + this.stream.once("error", (err) => { this.firstError = err; }); - this.sentinelIterator.reset(true); return this.stream; } else { - const endpointAddress = endpoint.value.host + ":" + endpoint.value.port; const errorMsg = err ? "failed to connect to sentinel " + endpointAddress + @@ -176,7 +198,7 @@ export default class SentinelConnector extends AbstractConnector { return connectToNext(); } - private async updateSentinels(client): Promise { + private async updateSentinels(client: IRedisClient): Promise { if (!this.options.updateSentinels) { return; } @@ -209,7 +231,9 @@ export default class SentinelConnector extends AbstractConnector { debug("Updated internal sentinels: %s", this.sentinelIterator); } - private async resolveMaster(client): Promise { + private async resolveMaster( + client: IRedisClient + ): Promise { const result = await client.sentinel( "get-master-addr-by-name", this.options.name @@ -224,7 +248,9 @@ export default class SentinelConnector extends AbstractConnector { ); } - private async resolveSlave(client): Promise { + private async resolveSlave( + client: IRedisClient + ): Promise { const result = await client.sentinel("slaves", this.options.name); if (!Array.isArray(result)) { @@ -251,8 +277,11 @@ export default class SentinelConnector extends AbstractConnector { return this.options.natMap[`${item.host}:${item.port}`] || item; } - private async resolve(endpoint): Promise { - const client = new Redis({ + private connectToSentinel( + endpoint: Partial, + options?: Partial + ): IRedisClient { + return new Redis({ port: endpoint.port || 26379, host: endpoint.host, username: this.options.sentinelUsername || null, @@ -268,7 +297,14 @@ export default class SentinelConnector extends AbstractConnector { connectTimeout: this.options.connectTimeout, commandTimeout: this.options.sentinelCommandTimeout, dropBufferSupport: true, + ...options, }); + } + + private async resolve( + endpoint: Partial + ): Promise { + const client = this.connectToSentinel(endpoint); // ignore the errors since resolve* methods will handle them client.on("error", noop); @@ -283,6 +319,47 @@ export default class SentinelConnector extends AbstractConnector { client.disconnect(); } } + + private async initFailoverDetector(): Promise { + // Move the current sentinel to the first position + this.sentinelIterator.reset(true); + + const sentinels: ISentinel[] = []; + + // In case of a large amount of sentinels, limit the number of concurrent connections + while (sentinels.length < this.options.sentinelMaxConnections) { + const { done, value } = this.sentinelIterator.next(); + + if (done) { + break; + } + + const client = this.connectToSentinel(value, { + lazyConnect: true, + retryStrategy: this.options.sentinelReconnectStrategy, + }); + + client.on("reconnecting", () => { + // Tests listen to this event + this.emitter?.emit("sentinelReconnecting"); + }); + + sentinels.push({ address: value, client }); + } + + this.sentinelIterator.reset(false); + + if (this.failoverDetector) { + // Clean up previous detector + this.failoverDetector.cleanup(); + } + + this.failoverDetector = new FailoverDetector(this, sentinels); + await this.failoverDetector.subscribe(); + + // Tests listen to this event + this.emitter?.emit("failoverSubscribed"); + } } function selectPreferredSentinel( diff --git a/lib/connectors/SentinelConnector/types.ts b/lib/connectors/SentinelConnector/types.ts index 14f0fba8..6d7ef7f4 100644 --- a/lib/connectors/SentinelConnector/types.ts +++ b/lib/connectors/SentinelConnector/types.ts @@ -1,4 +1,31 @@ +import { IRedisOptions } from "../../redis/RedisOptions"; + export interface ISentinelAddress { port: number; host: string; + family?: number; +} + +// TODO: A proper typedef. This one only declares a small subset of all the members. +export interface IRedisClient { + options: IRedisOptions; + sentinel(subcommand: "sentinels", name: string): Promise; + sentinel( + subcommand: "get-master-addr-by-name", + name: string + ): Promise; + sentinel(subcommand: "slaves", name: string): Promise; + subscribe(...channelNames: string[]): Promise; + on( + event: "message", + callback: (channel: string, message: string) => void + ): void; + on(event: "error", callback: (error: Error) => void): void; + on(event: "reconnecting", callback: () => void): void; + disconnect(): void; +} + +export interface ISentinel { + address: Partial; + client: IRedisClient; } diff --git a/lib/redis/RedisOptions.ts b/lib/redis/RedisOptions.ts index 33971d14..3e813e42 100644 --- a/lib/redis/RedisOptions.ts +++ b/lib/redis/RedisOptions.ts @@ -52,6 +52,14 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = { sentinelRetryStrategy: function (times) { return Math.min(times * 10, 1000); }, + sentinelReconnectStrategy: function () { + // This strategy only applies when sentinels are used for detecting + // a failover, not during initial master resolution. + // The deployment can still function when some of the sentinels are down + // for a long period of time, so we may not want to attempt reconnection + // very often. Therefore the default interval is fairly long (1 minute). + return 60000; + }, natMap: null, enableTLSForSentinelMode: false, updateSentinels: true, @@ -75,4 +83,5 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = { enableAutoPipelining: false, autoPipeliningIgnoredCommands: [], maxScriptsCachingTime: 60000, + sentinelMaxConnections: 10, }; diff --git a/lib/redis/index.ts b/lib/redis/index.ts index 57459990..88828e7b 100644 --- a/lib/redis/index.ts +++ b/lib/redis/index.ts @@ -153,7 +153,10 @@ function Redis() { if (this.options.Connector) { this.connector = new this.options.Connector(this.options); } else if (this.options.sentinels) { - this.connector = new SentinelConnector(this.options); + const sentinelConnector = new SentinelConnector(this.options); + sentinelConnector.emitter = this; + + this.connector = sentinelConnector; } else { this.connector = new StandaloneConnector(this.options); } diff --git a/test/functional/sentinel.ts b/test/functional/sentinel.ts index 204f35cb..68834610 100644 --- a/test/functional/sentinel.ts +++ b/test/functional/sentinel.ts @@ -1,8 +1,17 @@ +import { Socket } from "net"; + import Redis from "../../lib/redis"; import MockServer from "../helpers/mock_server"; +import { once } from "../helpers/once"; import { expect } from "chai"; import * as sinon from "sinon"; +function triggerParseError(socket: Socket) { + // Valid first characters: '$', '+', '*', ':', '-' + // To trigger an error, we need to write a different character + socket.write("A"); +} + describe("sentinel", function () { describe("connect", function () { it("should connect to sentinel successfully", function (done) { @@ -47,6 +56,41 @@ describe("sentinel", function () { }); }); + it("should skip an unresponsive sentinel", async function () { + const sentinel1 = new MockServer(27379, function (argv, socket, flags) { + flags.hang = true; + }); + + const sentinel2 = new MockServer(27380, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + + const master = new MockServer(17380); + const clock = sinon.useFakeTimers(); + + const redis = new Redis({ + sentinels: [ + { host: "127.0.0.1", port: 27379 }, + { host: "127.0.0.1", port: 27380 }, + ], + name: "master", + sentinelCommandTimeout: 1000, + }); + + clock.tick(1000); + clock.restore(); + await once(master, "connect"); + + redis.disconnect(); + await Promise.all([ + sentinel1.disconnectPromise(), + sentinel2.disconnectPromise(), + master.disconnectPromise(), + ]); + }); + it("should call sentinelRetryStrategy when all sentinels are unreachable", function (done) { let t = 0; var redis = new Redis({ @@ -100,18 +144,11 @@ describe("sentinel", function () { } }); - it("should close the connection to the sentinel when resolving successfully", function (done) { - const sentinel = new MockServer(27379, function (argv) { - if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { - return ["127.0.0.1", "17380"]; - } - }); - const master = new MockServer(17380); + it("should close the connection to the sentinel when resolving unsuccessfully", function (done) { + const sentinel = new MockServer(27379); // Does not respond properly to get-master-addr-by-name sentinel.once("disconnect", function () { redis.disconnect(); - master.disconnect(function () { - sentinel.disconnect(done); - }); + sentinel.disconnect(done); }); var redis = new Redis({ @@ -140,18 +177,19 @@ describe("sentinel", function () { } }); const master = new MockServer(17380); - sentinel.once("disconnect", function () { + + const redis = new Redis({ + sentinels: sentinels, + name: "master", + }); + + redis.on("ready", function () { redis.disconnect(); master.disconnect(function () { expect(cloned.length).to.eql(2); sentinel.disconnect(done); }); }); - - var redis = new Redis({ - sentinels: sentinels, - name: "master", - }); }); it("should skip additionally discovered sentinels even if they are resolved successfully", function (done) { @@ -168,7 +206,14 @@ describe("sentinel", function () { } }); const master = new MockServer(17380); - sentinel.once("disconnect", function () { + + const redis = new Redis({ + sentinels: sentinels, + updateSentinels: false, + name: "master", + }); + + redis.on("ready", function () { redis.disconnect(); master.disconnect(function () { expect(sentinels.length).to.eql(1); @@ -176,13 +221,8 @@ describe("sentinel", function () { sentinel.disconnect(done); }); }); - - var redis = new Redis({ - sentinels: sentinels, - updateSentinels: false, - name: "master", - }); }); + it("should connect to sentinel with authentication successfully", function (done) { let authed = false; var redisServer = new MockServer(17380, function (argv) { @@ -488,7 +528,7 @@ describe("sentinel", function () { } }); const master = new MockServer(17380); - master.on("connect", function (c) { + master.on("connect", function (c: Socket) { c.destroy(); master.disconnect(); redis.get("foo", function (err, res) { @@ -515,5 +555,243 @@ describe("sentinel", function () { name: "master", }); }); + + it("should connect to new master after +switch-master", async function () { + const sentinel = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [{ host: "127.0.0.1", port: 27379 }], + name: "master", + }); + + await Promise.all([ + once(master, "connect"), + once(redis, "failoverSubscribed"), + ]); + + sentinel.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + }; + + sentinel.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + sentinel.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); + + it("should detect failover from secondary sentinel", async function () { + const sentinel1 = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + const sentinel2 = new MockServer(27380); + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [ + { host: "127.0.0.1", port: 27379 }, + { host: "127.0.0.1", port: 27380 }, + ], + name: "master", + }); + + await Promise.all([ + once(master, "connect"), + once(redis, "failoverSubscribed"), + ]); + + // In this test, only the first sentinel is used to resolve the master + sentinel1.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + }; + + // But only the second sentinel broadcasts +switch-master + sentinel2.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + sentinel1.disconnectPromise(), + sentinel2.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); + + it("should detect failover when some sentinels fail", async function () { + // Will disconnect before failover + const sentinel1 = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + + // Will emit an error before failover + let sentinel2Socket: Socket | null = null; + const sentinel2 = new MockServer(27380, function (argv, socket) { + sentinel2Socket = socket; + }); + + // Fails to subscribe + const sentinel3 = new MockServer(27381, function (argv, socket, flags) { + if (argv[0] === "subscribe") { + triggerParseError(socket); + } + }); + + // The only sentinel that can successfully publish the failover message + const sentinel4 = new MockServer(27382); + + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [ + { host: "127.0.0.1", port: 27379 }, + { host: "127.0.0.1", port: 27380 }, + { host: "127.0.0.1", port: 27381 }, + { host: "127.0.0.1", port: 27382 }, + ], + name: "master", + }); + + await Promise.all([ + once(master, "connect"), + + // Must resolve even though subscribing to sentinel3 fails + once(redis, "failoverSubscribed"), + ]); + + // Fail sentinels 1 and 2 + await sentinel1.disconnectPromise(); + triggerParseError(sentinel2Socket); + + sentinel4.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + }; + + sentinel4.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + // sentinel1 is already disconnected + sentinel2.disconnectPromise(), + sentinel3.disconnectPromise(), + sentinel4.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); + + it("should detect failover after sentinel disconnects and reconnects", async function () { + const sentinel = new MockServer(27379, function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17380"]; + } + }); + + const master = new MockServer(17380); + const newMaster = new MockServer(17381); + + const redis = new Redis({ + sentinels: [{ host: "127.0.0.1", port: 27379 }], + name: "master", + sentinelReconnectStrategy: () => 1000, + }); + + await Promise.all([ + once(master, "connect"), + once(redis, "failoverSubscribed"), + ]); + + await sentinel.disconnectPromise(); + + sentinel.handler = function (argv) { + if (argv[0] === "sentinel" && argv[1] === "get-master-addr-by-name") { + return ["127.0.0.1", "17381"]; + } + if (argv[0] === "subscribe") { + sentinel.emit("test:resubscribed"); // Custom event only used in tests + } + }; + + sentinel.connect(); + + const clock = sinon.useFakeTimers(); + await once(redis, "sentinelReconnecting"); // Wait for the timeout to be set + clock.tick(1000); + clock.restore(); + await once(sentinel, "test:resubscribed"); + + sentinel.broadcast([ + "message", + "+switch-master", + "master 127.0.0.1 17380 127.0.0.1 17381", + ]); + + await Promise.all([ + once(redis, "close"), // Wait until disconnects from old master + once(master, "disconnect"), + once(newMaster, "connect"), + ]); + + redis.disconnect(); // Disconnect from new master + + await Promise.all([ + sentinel.disconnectPromise(), + master.disconnectPromise(), + newMaster.disconnectPromise(), + ]); + }); }); }); diff --git a/test/helpers/mock_server.ts b/test/helpers/mock_server.ts index 459bb62a..979f2306 100644 --- a/test/helpers/mock_server.ts +++ b/test/helpers/mock_server.ts @@ -124,6 +124,10 @@ export default class MockServer extends EventEmitter { this.socket.destroy(callback); } + disconnectPromise() { + return new Promise((resolve) => this.disconnect(resolve)); + } + broadcast(data: any) { this.clients .filter((c) => c) diff --git a/test/helpers/once.ts b/test/helpers/once.ts new file mode 100644 index 00000000..106524f0 --- /dev/null +++ b/test/helpers/once.ts @@ -0,0 +1,28 @@ +// TODO: use 'import { once } from "events";' instead of this +// after upgrading minimum Node.js version to 10.16+ + +// This polyfill is from https://github.com/davidmarkclements/events.once + +import EventEmitter from "events"; + +export const once = ( + emitter: EventEmitter, + name: string +): Promise => { + return new Promise((resolve, reject) => { + const onceError = name === "error"; + const listener = onceError + ? resolve + : (...args: any[]) => { + emitter.removeListener("error", error); + resolve(args as T); + }; + emitter.once(name, listener); + if (onceError) return; + const error = (err: any) => { + emitter.removeListener(name, listener); + reject(err); + }; + emitter.once("error", error); + }); +};