Skip to content

Commit

Permalink
feat(sentinel): detect failover from +switch-master messages (#1328)
Browse files Browse the repository at this point in the history
  • Loading branch information
mjomble authored Apr 24, 2021
1 parent 6b821af commit a464151
Show file tree
Hide file tree
Showing 8 changed files with 525 additions and 34 deletions.
65 changes: 65 additions & 0 deletions lib/connectors/SentinelConnector/FailoverDetector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
import { Debug } from "../../utils";
import SentinelConnector from "./index";
import { ISentinel } from "./types";

const debug = Debug("FailoverDetector");

const CHANNEL_NAME = "+switch-master";

export class FailoverDetector {
private connector: SentinelConnector;
private sentinels: ISentinel[];
private isDisconnected = false;

// sentinels can't be used for regular commands after this
constructor(connector: SentinelConnector, sentinels: ISentinel[]) {
this.connector = connector;
this.sentinels = sentinels;
}

public cleanup() {
this.isDisconnected = true;

for (const sentinel of this.sentinels) {
sentinel.client.disconnect();
}
}

public async subscribe() {
debug("Starting FailoverDetector");

const promises: Promise<unknown>[] = [];

for (const sentinel of this.sentinels) {
const promise = sentinel.client.subscribe(CHANNEL_NAME).catch((err) => {
debug(
"Failed to subscribe to failover messages on sentinel %s:%s (%s)",
sentinel.address.host || "127.0.0.1",
sentinel.address.port || 26739,
err.message
);
});

promises.push(promise);

sentinel.client.on("message", (channel: string) => {
if (!this.isDisconnected && channel === CHANNEL_NAME) {
this.disconnect();
}
});
}

await Promise.all(promises);
}

private disconnect() {
// Avoid disconnecting more than once per failover.
// A new FailoverDetector will be created after reconnecting.
this.isDisconnected = true;

debug("Failover detected, disconnecting");

// Will call this.cleanup()
this.connector.disconnect();
}
}
95 changes: 86 additions & 9 deletions lib/connectors/SentinelConnector/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { EventEmitter } from "events";
import { createConnection } from "net";
import { INatMap } from "../../cluster/ClusterOptions";
import {
Expand All @@ -12,10 +13,12 @@ import {
isIIpcConnectionOptions,
} from "../StandaloneConnector";
import SentinelIterator from "./SentinelIterator";
import { ISentinelAddress } from "./types";
import { IRedisClient, ISentinelAddress, ISentinel } from "./types";
import AbstractConnector, { ErrorEmitter } from "../AbstractConnector";
import { NetStream } from "../../types";
import Redis from "../../redis";
import { IRedisOptions } from "../../redis/RedisOptions";
import { FailoverDetector } from "./FailoverDetector";

const debug = Debug("SentinelConnector");

Expand All @@ -39,6 +42,7 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
sentinelPassword?: string;
sentinels: Array<Partial<ISentinelAddress>>;
sentinelRetryStrategy?: (retryAttempts: number) => number | void | null;
sentinelReconnectStrategy?: (retryAttempts: number) => number | void | null;
preferredSlaves?: PreferredSlaves;
connectTimeout?: number;
disconnectTimeout?: number;
Expand All @@ -47,11 +51,14 @@ export interface ISentinelConnectionOptions extends ITcpConnectionOptions {
sentinelTLS?: ConnectionOptions;
natMap?: INatMap;
updateSentinels?: boolean;
sentinelMaxConnections?: number;
}

export default class SentinelConnector extends AbstractConnector {
private retryAttempts: number;
private failoverDetector: FailoverDetector | null = null;
protected sentinelIterator: SentinelIterator;
public emitter: EventEmitter | null = null;

constructor(protected options: ISentinelConnectionOptions) {
super(options.disconnectTimeout);
Expand Down Expand Up @@ -84,6 +91,14 @@ export default class SentinelConnector extends AbstractConnector {
return roleMatches;
}

public disconnect(): void {
super.disconnect();

if (this.failoverDetector) {
this.failoverDetector.cleanup();
}
}

public connect(eventEmitter: ErrorEmitter): Promise<NetStream> {
this.connecting = true;
this.retryAttempts = 0;
Expand Down Expand Up @@ -134,23 +149,30 @@ export default class SentinelConnector extends AbstractConnector {
throw new Error(CONNECTION_CLOSED_ERROR_MSG);
}

const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;

if (resolved) {
debug("resolved: %s:%s", resolved.host, resolved.port);
debug(
"resolved: %s:%s from sentinel %s",
resolved.host,
resolved.port,
endpointAddress
);
if (this.options.enableTLSForSentinelMode && this.options.tls) {
Object.assign(resolved, this.options.tls);
this.stream = createTLSConnection(resolved);
} else {
this.stream = createConnection(resolved);
}

this.stream.once("connect", () => this.initFailoverDetector());

this.stream.once("error", (err) => {
this.firstError = err;
});

this.sentinelIterator.reset(true);
return this.stream;
} else {
const endpointAddress = endpoint.value.host + ":" + endpoint.value.port;
const errorMsg = err
? "failed to connect to sentinel " +
endpointAddress +
Expand All @@ -176,7 +198,7 @@ export default class SentinelConnector extends AbstractConnector {
return connectToNext();
}

private async updateSentinels(client): Promise<void> {
private async updateSentinels(client: IRedisClient): Promise<void> {
if (!this.options.updateSentinels) {
return;
}
Expand Down Expand Up @@ -209,7 +231,9 @@ export default class SentinelConnector extends AbstractConnector {
debug("Updated internal sentinels: %s", this.sentinelIterator);
}

private async resolveMaster(client): Promise<ITcpConnectionOptions | null> {
private async resolveMaster(
client: IRedisClient
): Promise<ITcpConnectionOptions | null> {
const result = await client.sentinel(
"get-master-addr-by-name",
this.options.name
Expand All @@ -224,7 +248,9 @@ export default class SentinelConnector extends AbstractConnector {
);
}

private async resolveSlave(client): Promise<ITcpConnectionOptions | null> {
private async resolveSlave(
client: IRedisClient
): Promise<ITcpConnectionOptions | null> {
const result = await client.sentinel("slaves", this.options.name);

if (!Array.isArray(result)) {
Expand All @@ -251,8 +277,11 @@ export default class SentinelConnector extends AbstractConnector {
return this.options.natMap[`${item.host}:${item.port}`] || item;
}

private async resolve(endpoint): Promise<ITcpConnectionOptions | null> {
const client = new Redis({
private connectToSentinel(
endpoint: Partial<ISentinelAddress>,
options?: Partial<IRedisOptions>
): IRedisClient {
return new Redis({
port: endpoint.port || 26379,
host: endpoint.host,
username: this.options.sentinelUsername || null,
Expand All @@ -268,7 +297,14 @@ export default class SentinelConnector extends AbstractConnector {
connectTimeout: this.options.connectTimeout,
commandTimeout: this.options.sentinelCommandTimeout,
dropBufferSupport: true,
...options,
});
}

private async resolve(
endpoint: Partial<ISentinelAddress>
): Promise<ITcpConnectionOptions | null> {
const client = this.connectToSentinel(endpoint);

// ignore the errors since resolve* methods will handle them
client.on("error", noop);
Expand All @@ -283,6 +319,47 @@ export default class SentinelConnector extends AbstractConnector {
client.disconnect();
}
}

private async initFailoverDetector(): Promise<void> {
// Move the current sentinel to the first position
this.sentinelIterator.reset(true);

const sentinels: ISentinel[] = [];

// In case of a large amount of sentinels, limit the number of concurrent connections
while (sentinels.length < this.options.sentinelMaxConnections) {
const { done, value } = this.sentinelIterator.next();

if (done) {
break;
}

const client = this.connectToSentinel(value, {
lazyConnect: true,
retryStrategy: this.options.sentinelReconnectStrategy,
});

client.on("reconnecting", () => {
// Tests listen to this event
this.emitter?.emit("sentinelReconnecting");
});

sentinels.push({ address: value, client });
}

this.sentinelIterator.reset(false);

if (this.failoverDetector) {
// Clean up previous detector
this.failoverDetector.cleanup();
}

this.failoverDetector = new FailoverDetector(this, sentinels);
await this.failoverDetector.subscribe();

// Tests listen to this event
this.emitter?.emit("failoverSubscribed");
}
}

function selectPreferredSentinel(
Expand Down
27 changes: 27 additions & 0 deletions lib/connectors/SentinelConnector/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,31 @@
import { IRedisOptions } from "../../redis/RedisOptions";

export interface ISentinelAddress {
port: number;
host: string;
family?: number;
}

// TODO: A proper typedef. This one only declares a small subset of all the members.
export interface IRedisClient {
options: IRedisOptions;
sentinel(subcommand: "sentinels", name: string): Promise<string[]>;
sentinel(
subcommand: "get-master-addr-by-name",
name: string
): Promise<string[]>;
sentinel(subcommand: "slaves", name: string): Promise<string[]>;
subscribe(...channelNames: string[]): Promise<number>;
on(
event: "message",
callback: (channel: string, message: string) => void
): void;
on(event: "error", callback: (error: Error) => void): void;
on(event: "reconnecting", callback: () => void): void;
disconnect(): void;
}

export interface ISentinel {
address: Partial<ISentinelAddress>;
client: IRedisClient;
}
9 changes: 9 additions & 0 deletions lib/redis/RedisOptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = {
sentinelRetryStrategy: function (times) {
return Math.min(times * 10, 1000);
},
sentinelReconnectStrategy: function () {
// This strategy only applies when sentinels are used for detecting
// a failover, not during initial master resolution.
// The deployment can still function when some of the sentinels are down
// for a long period of time, so we may not want to attempt reconnection
// very often. Therefore the default interval is fairly long (1 minute).
return 60000;
},
natMap: null,
enableTLSForSentinelMode: false,
updateSentinels: true,
Expand All @@ -75,4 +83,5 @@ export const DEFAULT_REDIS_OPTIONS: IRedisOptions = {
enableAutoPipelining: false,
autoPipeliningIgnoredCommands: [],
maxScriptsCachingTime: 60000,
sentinelMaxConnections: 10,
};
5 changes: 4 additions & 1 deletion lib/redis/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,10 @@ function Redis() {
if (this.options.Connector) {
this.connector = new this.options.Connector(this.options);
} else if (this.options.sentinels) {
this.connector = new SentinelConnector(this.options);
const sentinelConnector = new SentinelConnector(this.options);
sentinelConnector.emitter = this;

this.connector = sentinelConnector;
} else {
this.connector = new StandaloneConnector(this.options);
}
Expand Down
Loading

0 comments on commit a464151

Please sign in to comment.