-
Notifications
You must be signed in to change notification settings - Fork 107
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix: redis connect with promise race (#2107)
* fix: redis connect with promise race timeoutPromise should be a function so that it returns a new promise everytime. * fix: redis connect with promise race timeoutPromise should be a function so that it returns a new promise everytime. Also we should use reject so that it will error if timed out. * added extra padding for redis failure and passing events if redis error comes up * refactor: checkAndConnectConnection method * feat: tested redis * refactor redis for incorporating end signal * comments addressed * test coverage increased * test coverage increased * introducing redis_error stat * introducing redis_latency stat * comments addressed+1 * stat fix * chore: comments addressed+2 * fix: discard duplicate cart event (#2091) * fix: discard duplicate cart event * Removed cart_create event * Removed cart_create event * chore: utils test cases added * fix: updated design flow for cart duplication * chore: test cases added * fix: directory path * fix: directory path * feat: added some more redis metric * Update src/util/redisConnector.js Co-authored-by: Sudip Paul <67197965+ItsSudip@users.noreply.github.com> * comments addressed * chore: conflicts resolved * chore: comments addressed * chore: ideas incorporated * chore: comments addressed * chore: comments addressed * fix: redis mock * fix: redis --------- Co-authored-by: Sudip Paul <67197965+ItsSudip@users.noreply.github.com> * chore: update import statement * chore: comments addressed * feat: redis return type changes * chore: add metrics in prometheus.js * replaced hmget with hget * added padding * removed use of isObject * console removes * added error logs * chore: addressed comments * chore: addressed comments * solve build issue * chore: addressed comments --------- Co-authored-by: Anant Jain <anantjain45823@gmail.com> Co-authored-by: Anant Jain <62471433+anantjain45823@users.noreply.github.com> Co-authored-by: Sudip Paul <67197965+ItsSudip@users.noreply.github.com>
- Loading branch information
1 parent
c126f7b
commit d439485
Showing
18 changed files
with
2,576 additions
and
2,216 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,154 @@ | ||
const Redis = require('ioredis'); | ||
const { isDefinedAndNotNull } = require('../../v0/util'); | ||
const { RedisError } = require('../../v0/util/errorTypes'); | ||
const log = require('../../logger'); | ||
const stats = require('../stats'); | ||
|
||
const timeoutPromise = () => new Promise((_, reject) => { | ||
setTimeout( | ||
() => reject(new Error("Timeout while connecting to redis")), | ||
1000 | ||
); | ||
}); | ||
|
||
const RedisDB = { | ||
init() { | ||
if (process.env.USE_REDIS_DB && process.env.USE_REDIS_DB !== 'false') { | ||
this.host = process.env.REDIS_HOST || 'localhost'; | ||
this.port = parseInt(process.env.REDIS_PORT, 10) || 6379; | ||
this.password = process.env.REDIS_PASSWORD; | ||
this.userName = process.env.REDIS_USERNAME; | ||
this.maxRetries = parseInt(process.env.REDIS_MAX_RETRIES, 10) || 5; | ||
this.timeAfterRetry = parseInt(process.env.REDIS_TIME_AFTER_RETRY_IN_MS, 10) || 500; | ||
this.client = new Redis({ | ||
host: this.host, | ||
port: this.port, | ||
password: this.password, | ||
username: this.userName, | ||
enableReadyCheck: true, | ||
retryStrategy: (times) => { | ||
if (times <= this.maxRetries) { | ||
return (1 + times) * this.timeAfterRetry; // reconnect after | ||
} | ||
stats.increment("redis_error", { | ||
operation: 'redis_down', | ||
}); | ||
log.error(`Redis is down at ${this.host}:${this.port}`); | ||
return false; // stop retrying | ||
}, | ||
}); | ||
this.client.on('ready', () => { | ||
log.info(`Connected to redis at ${this.host}:${this.port}`); | ||
}); | ||
} | ||
}, | ||
|
||
|
||
async checkRedisConnectionReadyState() { | ||
try { | ||
await this.client.connect(); | ||
} catch (error) { | ||
return new Promise((resolve) => { | ||
this.client.on('ready', () => { | ||
resolve(); | ||
}); | ||
}); | ||
} | ||
return Promise.resolve(); | ||
}, | ||
|
||
/** | ||
* Checks connection with redis and if not connected, tries to connect and throws error if connection request fails | ||
*/ | ||
async checkAndConnectConnection() { | ||
if (!this.client || this.client.status === "end") { | ||
this.init(); | ||
} | ||
if (this.client.status !== 'ready') { | ||
await Promise.race([this.checkRedisConnectionReadyState(), timeoutPromise()]); | ||
} | ||
}, | ||
/** | ||
* Used to get value from redis depending on the key and the expected value type | ||
* @param {*} hashKey parent key | ||
* @param {*} isObjExpected false if fetched value can not be json | ||
* @param {*} key key for which value needs to be extracted, required if isObjExpected is true | ||
* @returns value which can be json or string or number | ||
* storage of data in case isObjExpected is true | ||
* hashKey:{ | ||
* key1: {internalKey1:val1}, | ||
* key2: {internalKey2:val2}, | ||
* } | ||
*/ | ||
async getVal(hashKey, key, isObjExpected = true) { | ||
try { | ||
await this.checkAndConnectConnection(); // check if redis is connected and if not, connect | ||
let redisVal; | ||
if (isObjExpected === true) { | ||
redisVal = await this.client.hget(hashKey, key); | ||
if (isDefinedAndNotNull(redisVal)) { | ||
try { | ||
return JSON.parse(redisVal); | ||
} catch (e) { | ||
// do nothing | ||
return redisVal; | ||
} | ||
} | ||
return redisVal; | ||
} | ||
return this.client.get(hashKey); | ||
} catch (e) { | ||
stats.increment("redis_error", { | ||
operation: "get" | ||
}); | ||
log.error(`Error getting value from Redis: ${e}`); | ||
throw new RedisError(`Error getting value from Redis: ${e}`); | ||
} | ||
}, | ||
/** | ||
* Used to set value in redis depending on the key and the value type | ||
* @param {*} key key for which value needs to be stored | ||
* @param {*} value to be stored in redis send it in array format [field1, value1, field2, value2] | ||
* if Value is an object | ||
* @param {*} expiryTimeInSec expiry time of data in redis by default 1 hr | ||
* @param {*} isValJson set to false if value is not a json object | ||
* | ||
*/ | ||
async setVal(key, value, expiryTimeInSec = 60 * 60) { | ||
try { | ||
await this.checkAndConnectConnection(); // check if redis is connected and if not, connect | ||
if (typeof value === "object") { | ||
const valueToStore = value.map(element => { | ||
if (typeof element === "object") { | ||
return JSON.stringify(element); | ||
} | ||
return element; | ||
}) | ||
await this.client.multi() | ||
.hmset(key, ...valueToStore) | ||
.expire(key, expiryTimeInSec) | ||
.exec(); | ||
} else { | ||
await this.client.multi() | ||
.set(key, value) | ||
.expire(key, expiryTimeInSec) | ||
.exec(); | ||
|
||
} | ||
} catch (e) { | ||
stats.increment("redis_error", { | ||
operation: "set" | ||
}); | ||
log.error(`Error setting value in Redis due ${e}`); | ||
throw new RedisError(`Error setting value in Redis due ${e}`); | ||
} | ||
}, | ||
async disconnect() { | ||
if (process.env.USE_REDIS_DB && process.env.USE_REDIS_DB !== 'false') { | ||
log.info(`Disconnecting from redis at ${this.host}:${this.port}`); | ||
this.client.disconnect(); | ||
} | ||
} | ||
}; | ||
|
||
module.exports = { RedisDB }; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,110 @@ | ||
const fs = require("fs"); | ||
const path = require("path"); | ||
const version = "v0"; | ||
const { RedisDB } = require('./redisConnector'); | ||
jest.mock('ioredis', () => require('../../../test/__mocks__/redis')); | ||
const sourcesList = ['shopify'] | ||
const destList = []; | ||
process.env.USE_REDIS_DB = 'true'; | ||
|
||
const timeoutPromise = () => new Promise((resolve, _) => { | ||
setTimeout( | ||
() => resolve(), | ||
100 | ||
); | ||
}); | ||
|
||
describe('checkRedisConnectionReadyState', () => { | ||
RedisDB.init(); | ||
it('should resolve if client connects after initial connection error', async () => { | ||
RedisDB.client.end(3); | ||
await Promise.race([RedisDB.checkRedisConnectionReadyState(), timeoutPromise()]); | ||
expect(RedisDB.client.status).toBe('ready'); | ||
}); | ||
it('should resolve if client is already connected', async () => { | ||
await RedisDB.checkRedisConnectionReadyState(); | ||
expect(RedisDB.client.status).toBe('ready'); | ||
}); | ||
}); | ||
describe('checkAndConnectConnection', () => { | ||
it('Status is end', async () => { | ||
RedisDB.client.end(11); | ||
await Promise.race([RedisDB.checkAndConnectConnection(), timeoutPromise()]); | ||
expect(RedisDB.client.status).toBe('ready'); | ||
}); | ||
it('should resolve if client is already connected', async () => { | ||
await RedisDB.checkAndConnectConnection(); | ||
expect(RedisDB.client.status).toBe('ready'); | ||
}); | ||
}); | ||
describe(`Source Tests`, () => { | ||
sourcesList.forEach((source) => { | ||
const testDataFile = fs.readFileSync( | ||
path.resolve(__dirname, `./testData/${source}_source.json`) | ||
); | ||
const data = JSON.parse(testDataFile); | ||
const transformer = require(`../../${version}/sources/${source}/transform`); | ||
|
||
data.forEach((dataPoint, index) => { | ||
it(`${index}. ${source} - ${dataPoint.description}`, async () => { | ||
try { | ||
const output = await transformer.process(dataPoint.input); | ||
expect(output).toEqual(dataPoint.output); | ||
} catch (error) { | ||
expect(error.message).toEqual(dataPoint.output.error); | ||
} | ||
}); | ||
}); | ||
}) | ||
}); | ||
|
||
describe(`Redis Class Get Tests`, () => { | ||
const testDataFile = fs.readFileSync( | ||
path.resolve(__dirname, `./testData/redisConnector.json`) | ||
); | ||
const data = JSON.parse(testDataFile); | ||
data.forEach((dataPoint, index) => { | ||
it(`${index}. Redis Get- ${dataPoint.description}`, async () => { | ||
try { | ||
const output = await RedisDB.getVal(dataPoint.input.value, isObjExpected = false); | ||
expect(output).toEqual(dataPoint.output); | ||
} catch (error) { | ||
expect(error.message).toEqual(dataPoint.output.error); | ||
} | ||
}); | ||
}); | ||
it(`Redis Get- Nothing Found in redis - return null`, async () => { | ||
const dataPoint = { | ||
input: { | ||
value: "not_in_redis", | ||
}, | ||
output: { | ||
value: null | ||
} | ||
}; | ||
const output = await RedisDB.getVal(dataPoint.input.value, "key1"); | ||
expect(output).toEqual(dataPoint.output.value); | ||
}); | ||
}); | ||
|
||
describe(`Redis Class Set Test`, () => { | ||
it(`Redis Set Fail Case Test`, async () => { | ||
try { | ||
await RedisDB.setVal("error", "test"); | ||
} catch (error) { | ||
expect(error.message).toEqual("Error setting value in Redis due Error: Connection is Closed"); | ||
} | ||
}); | ||
it(`Redis Set Fail Case Test`, async () => { | ||
const result = "OK" | ||
await RedisDB.setVal("Key", "test"); | ||
expect(result).toEqual("OK"); | ||
}); | ||
}); | ||
describe(`Redis Disconnect`, () => { | ||
it(`Redis Disconnect Test`, async () => { | ||
const result = "OK" | ||
await RedisDB.disconnect(); | ||
expect(result).toEqual("OK"); | ||
}); | ||
}); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.