Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(redis): go-redis v9 regression missing metrics and reconnect hook (#13415) (cherry-pick #15275) #17023

Merged
merged 1 commit into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ require (
github.com/pjbgf/sha1cd v0.3.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/client_model v0.3.0
github.com/prometheus/common v0.42.0 // indirect
github.com/prometheus/procfs v0.10.1 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
Expand Down
41 changes: 14 additions & 27 deletions util/cache/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"time"

ioutil "github.com/argoproj/argo-cd/v2/util/io"
Expand Down Expand Up @@ -155,41 +156,27 @@ type MetricsRegistry interface {
ObserveRedisRequestDuration(duration time.Duration)
}

var metricStartTimeKey = struct{}{}

type redisHook struct {
registry MetricsRegistry
}

func (rh *redisHook) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return context.WithValue(ctx, metricStartTimeKey, time.Now()), nil
}

func (rh *redisHook) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
cmdErr := cmd.Err()
rh.registry.IncRedisRequest(cmdErr != nil && cmdErr != redis.Nil)

startTime := ctx.Value(metricStartTimeKey).(time.Time)
duration := time.Since(startTime)
rh.registry.ObserveRedisRequestDuration(duration)

return nil
}

func (redisHook) BeforeProcessPipeline(ctx context.Context, _ []redis.Cmder) (context.Context, error) {
return ctx, nil
func (rh *redisHook) DialHook(next redis.DialHook) redis.DialHook {
return func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := next(ctx, network, addr)
return conn, err
}
}

func (redisHook) AfterProcessPipeline(_ context.Context, _ []redis.Cmder) error {
return nil
}
func (rh *redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return func(ctx context.Context, cmd redis.Cmder) error {
startTime := time.Now()

func (redisHook) DialHook(next redis.DialHook) redis.DialHook {
return nil
}
err := next(ctx, cmd)
rh.registry.IncRedisRequest(err != nil && err != redis.Nil)
rh.registry.ObserveRedisRequestDuration(time.Since(startTime))

func (redisHook) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return nil
return err
}
}

func (redisHook) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
Expand Down
40 changes: 15 additions & 25 deletions util/cache/redis_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,13 @@ package cache

import (
"context"
"strings"
"errors"
"net"

"github.com/redis/go-redis/v9"
log "github.com/sirupsen/logrus"
)

const NoSuchHostErr = "no such host"

type argoRedisHooks struct {
reconnectCallback func()
}
Expand All @@ -18,32 +17,23 @@ func NewArgoRedisHook(reconnectCallback func()) *argoRedisHooks {
return &argoRedisHooks{reconnectCallback: reconnectCallback}
}

func (hook *argoRedisHooks) BeforeProcess(ctx context.Context, cmd redis.Cmder) (context.Context, error) {
return ctx, nil
}

func (hook *argoRedisHooks) AfterProcess(ctx context.Context, cmd redis.Cmder) error {
if cmd.Err() != nil && strings.Contains(cmd.Err().Error(), NoSuchHostErr) {
log.Warnf("Reconnect to redis because error: \"%v\"", cmd.Err())
hook.reconnectCallback()
}
return nil
}

func (hook *argoRedisHooks) BeforeProcessPipeline(ctx context.Context, cmds []redis.Cmder) (context.Context, error) {
return ctx, nil
}

func (hook *argoRedisHooks) AfterProcessPipeline(ctx context.Context, cmds []redis.Cmder) error {
return nil
}

func (hook *argoRedisHooks) DialHook(next redis.DialHook) redis.DialHook {
return nil
return func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := next(ctx, network, addr)
return conn, err
}
}

func (hook *argoRedisHooks) ProcessHook(next redis.ProcessHook) redis.ProcessHook {
return nil
return func(ctx context.Context, cmd redis.Cmder) error {
var dnsError *net.DNSError
err := next(ctx, cmd)
if err != nil && errors.As(err, &dnsError) {
log.Warnf("Reconnect to redis because error: \"%v\"", err)
hook.reconnectCallback()
}
return err
}
}

func (hook *argoRedisHooks) ProcessPipelineHook(next redis.ProcessPipelineHook) redis.ProcessPipelineHook {
Expand Down
33 changes: 24 additions & 9 deletions util/cache/redis_hook_test.go
Original file line number Diff line number Diff line change
@@ -1,38 +1,53 @@
package cache

import (
"context"
"errors"
"testing"
"time"

"github.com/alicebob/miniredis/v2"
"github.com/stretchr/testify/assert"

"github.com/redis/go-redis/v9"
)

func Test_ReconnectCallbackHookCalled(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()

called := false
hook := NewArgoRedisHook(func() {
called = true
})

cmd := &redis.StringCmd{}
cmd.SetErr(errors.New("Failed to resync revoked tokens. retrying again in 1 minute: dial tcp: lookup argocd-redis on 10.179.0.10:53: no such host"))

_ = hook.AfterProcess(context.Background(), cmd)
faultyDNSRedisClient := redis.NewClient(&redis.Options{Addr: "invalidredishost.invalid:12345"})
faultyDNSRedisClient.AddHook(hook)

faultyDNSClient := NewRedisCache(faultyDNSRedisClient, 60*time.Second, RedisCompressionNone)
err = faultyDNSClient.Set(&Item{Key: "baz", Object: "foo"})
assert.Equal(t, called, true)
assert.Error(t, err)
}

func Test_ReconnectCallbackHookNotCalled(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()

called := false
hook := NewArgoRedisHook(func() {
called = true
})
cmd := &redis.StringCmd{}
cmd.SetErr(errors.New("Something wrong"))

_ = hook.AfterProcess(context.Background(), cmd)
redisClient := redis.NewClient(&redis.Options{Addr: mr.Addr()})
redisClient.AddHook(hook)
client := NewRedisCache(redisClient, 60*time.Second, RedisCompressionNone)

err = client.Set(&Item{Key: "foo", Object: "bar"})
assert.Equal(t, called, false)
assert.NoError(t, err)
}
92 changes: 92 additions & 0 deletions util/cache/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,59 @@ package cache

import (
"context"
"strconv"
"testing"
"time"

promcm "github.com/prometheus/client_model/go"

"github.com/alicebob/miniredis/v2"
"github.com/prometheus/client_golang/prometheus"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/assert"
)

var (
redisRequestCounter = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "argocd_redis_request_total",
},
[]string{"initiator", "failed"},
)
redisRequestHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "argocd_redis_request_duration",
Buckets: []float64{0.1, 0.25, .5, 1, 2},
},
[]string{"initiator"},
)
)

type MockMetricsServer struct {
registry *prometheus.Registry
redisRequestCounter *prometheus.CounterVec
redisRequestHistogram *prometheus.HistogramVec
}

func NewMockMetricsServer() *MockMetricsServer {
registry := prometheus.NewRegistry()
registry.MustRegister(redisRequestCounter)
registry.MustRegister(redisRequestHistogram)
return &MockMetricsServer{
registry: registry,
redisRequestCounter: redisRequestCounter,
redisRequestHistogram: redisRequestHistogram,
}
}

func (m *MockMetricsServer) IncRedisRequest(failed bool) {
m.redisRequestCounter.WithLabelValues("mock", strconv.FormatBool(failed)).Inc()
}

func (m *MockMetricsServer) ObserveRedisRequestDuration(duration time.Duration) {
m.redisRequestHistogram.WithLabelValues("mock").Observe(duration.Seconds())
}

func TestRedisSetCache(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
Expand Down Expand Up @@ -70,3 +115,50 @@ func TestRedisSetCacheCompressed(t *testing.T) {

assert.Equal(t, testValue, result)
}

func TestRedisMetrics(t *testing.T) {
mr, err := miniredis.Run()
if err != nil {
panic(err)
}
defer mr.Close()

metric := &promcm.Metric{}
ms := NewMockMetricsServer()
redisClient := redis.NewClient(&redis.Options{Addr: mr.Addr()})
faultyRedisClient := redis.NewClient(&redis.Options{Addr: "invalidredishost.invalid:12345"})
CollectMetrics(redisClient, ms)
CollectMetrics(faultyRedisClient, ms)

client := NewRedisCache(redisClient, 60*time.Second, RedisCompressionNone)
faultyClient := NewRedisCache(faultyRedisClient, 60*time.Second, RedisCompressionNone)
var res string

//client successful request
err = client.Set(&Item{Key: "foo", Object: "bar"})
assert.NoError(t, err)
err = client.Get("foo", &res)
assert.NoError(t, err)

c, err := ms.redisRequestCounter.GetMetricWithLabelValues("mock", "false")
assert.NoError(t, err)
err = c.Write(metric)
assert.NoError(t, err)
assert.Equal(t, metric.Counter.GetValue(), float64(2))

//faulty client failed request
err = faultyClient.Get("foo", &res)
assert.Error(t, err)
c, err = ms.redisRequestCounter.GetMetricWithLabelValues("mock", "true")
assert.NoError(t, err)
err = c.Write(metric)
assert.NoError(t, err)
assert.Equal(t, metric.Counter.GetValue(), float64(1))

//both clients histogram count
o, err := ms.redisRequestHistogram.GetMetricWithLabelValues("mock")
assert.NoError(t, err)
err = o.(prometheus.Metric).Write(metric)
assert.NoError(t, err)
assert.Equal(t, int(metric.Histogram.GetSampleCount()), 3)
}
Loading