From fdc93403f40c10b6503e9336ce743d841effc78f Mon Sep 17 00:00:00 2001 From: pasha-codefresh Date: Wed, 15 Sep 2021 19:05:48 +0300 Subject: [PATCH] fix: Redis should reconnect on connectivity issue (#7207) fix: Redis should reconnect on connectivity issue (#7207) Signed-off-by: pashavictorovich Signed-off-by: viktorplakida --- util/cache/cache.go | 54 +++++++++++++++++++++++++++++----------- util/cache/redis_hook.go | 39 +++++++++++++++++++++++++++++ util/session/state.go | 4 ++- 3 files changed, 81 insertions(+), 16 deletions(-) create mode 100644 util/cache/redis_hook.go diff --git a/util/cache/cache.go b/util/cache/cache.go index ef9bd3a6c0f94..4865fae6fb2f3 100644 --- a/util/cache/cache.go +++ b/util/cache/cache.go @@ -31,6 +31,43 @@ func NewCache(client CacheClient) *Cache { return &Cache{client} } +func buildRedisClient(redisAddress, password string, redisDB, maxRetries int, tlsConfig *tls.Config) *redis.Client { + opts := &redis.Options{ + Addr: redisAddress, + Password: password, + DB: redisDB, + MaxRetries: maxRetries, + TLSConfig: tlsConfig, + } + + client := redis.NewClient(opts) + + client.AddHook(redis.Hook(NewArgoRedisHook(func() { + *client = *buildRedisClient(redisAddress, password, redisDB, maxRetries, tlsConfig) + }))) + + return client +} + +func buildFailoverRedisClient(sentinelMaster, password string, redisDB, maxRetries int, tlsConfig *tls.Config, sentinelAddresses []string) *redis.Client { + opts := &redis.FailoverOptions{ + MasterName: sentinelMaster, + SentinelAddrs: sentinelAddresses, + DB: redisDB, + Password: password, + MaxRetries: maxRetries, + TLSConfig: tlsConfig, + } + + client := redis.NewFailoverClient(opts) + + client.AddHook(redis.Hook(NewArgoRedisHook(func() { + *client = *buildFailoverRedisClient(sentinelMaster, password, redisDB, maxRetries, tlsConfig, sentinelAddresses) + }))) + + return client +} + // AddCacheFlagsToCmd adds flags which control caching to the specified command func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client)) func() (*Cache, error) { redisAddress := "" @@ -84,14 +121,7 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client)) password := os.Getenv(envRedisPassword) maxRetries := env.ParseNumFromEnv(envRedisRetryCount, defaultRedisRetryCount, 0, math.MaxInt32) if len(sentinelAddresses) > 0 { - client := redis.NewFailoverClient(&redis.FailoverOptions{ - MasterName: sentinelMaster, - SentinelAddrs: sentinelAddresses, - DB: redisDB, - Password: password, - MaxRetries: maxRetries, - TLSConfig: tlsConfig, - }) + client := buildFailoverRedisClient(sentinelMaster, password, redisDB, maxRetries, tlsConfig, sentinelAddresses) for i := range opts { opts[i](client) } @@ -101,13 +131,7 @@ func AddCacheFlagsToCmd(cmd *cobra.Command, opts ...func(client *redis.Client)) redisAddress = common.DefaultRedisAddr } - client := redis.NewClient(&redis.Options{ - Addr: redisAddress, - Password: password, - DB: redisDB, - MaxRetries: maxRetries, - TLSConfig: tlsConfig, - }) + client := buildRedisClient(redisAddress, password, redisDB, maxRetries, tlsConfig) for i := range opts { opts[i](client) } diff --git a/util/cache/redis_hook.go b/util/cache/redis_hook.go new file mode 100644 index 0000000000000..c378dd398c44a --- /dev/null +++ b/util/cache/redis_hook.go @@ -0,0 +1,39 @@ +package cache + +import ( + "context" + "strings" + + "github.com/go-redis/redis/v8" + log "github.com/sirupsen/logrus" +) + +const NoSuchHostErr = "no such host" + +type argoRedisHooks struct { + reconnectCallback func() +} + +func NewArgoRedisHook(reconnectCallback func()) *argoRedisHooks { + return &argoRedisHooks{reconnectCallback: reconnectCallback} +} + +func (hook *argoRedisHooks) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) { + return ctx, nil +} + +func (hook *argoRedisHooks) AfterProcess(ctx context.Context, cmd redis.Cmder) error { + if cmd.Err() != nil && strings.Contains(cmd.Err().Error(), NoSuchHostErr) { + log.Warnf("Reconnect to redis because error: \"%v\"", cmd.Err()) + hook.reconnectCallback() + } + return nil +} + +func (hook *argoRedisHooks) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) { + return ctx, nil +} + +func (hook *argoRedisHooks) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error { + return nil +} diff --git a/util/session/state.go b/util/session/state.go index de39ec85140d3..f0032d89200fd 100644 --- a/util/session/state.go +++ b/util/session/state.go @@ -69,9 +69,11 @@ func (storage *userStateStorage) watchRevokedTokens(ctx context.Context) { } func (storage *userStateStorage) loadRevokedTokensSafe() { - for err := storage.loadRevokedTokens(); err != nil; { + err := storage.loadRevokedTokens() + for err != nil { log.Warnf("Failed to resync revoked tokens. retrying again in 1 minute: %v", err) time.Sleep(time.Minute) + err = storage.loadRevokedTokens() } }