From bffa8c9e2f67a0aa4264664721bcb99b288bcc02 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Thu, 18 Jan 2024 23:48:14 +0000 Subject: [PATCH] Make notify interval configurable Signed-off-by: Brad Davidson --- main.go | 6 ++++++ pkg/endpoint/endpoint.go | 4 +++- pkg/server/limited.go | 6 ++++-- pkg/server/server.go | 9 ++++++--- pkg/server/watch.go | 13 ++++++------- 5 files changed, 25 insertions(+), 13 deletions(-) diff --git a/main.go b/main.go index 5b4f944f..6a05e904 100644 --- a/main.go +++ b/main.go @@ -96,6 +96,12 @@ func main() { Usage: "Enable net/http/pprof handlers on the metrics bind address. Default is false.", Destination: &metricsConfig.EnableProfiling, }, + cli.DurationFlag{ + Name: "watch-progress-notify-interval", + Usage: "Interval between periodic watch progress notifications. Default is 10m.", + Destination: &config.NotifyInterval, + Value: time.Minute * 10, + }, cli.BoolFlag{Name: "debug"}, } app.Action = run diff --git a/pkg/endpoint/endpoint.go b/pkg/endpoint/endpoint.go index 0dfa7b1f..2a6abad7 100644 --- a/pkg/endpoint/endpoint.go +++ b/pkg/endpoint/endpoint.go @@ -6,6 +6,7 @@ import ( "net" "os" "strings" + "time" "github.com/k3s-io/kine/pkg/drivers/dqlite" "github.com/k3s-io/kine/pkg/drivers/generic" @@ -44,6 +45,7 @@ type Config struct { ServerTLSConfig tls.Config BackendTLSConfig tls.Config MetricsRegisterer prometheus.Registerer + NotifyInterval time.Duration } type ETCDConfig struct { @@ -80,7 +82,7 @@ func Listen(ctx context.Context, config Config) (ETCDConfig, error) { } // set up GRPC server and register services - b := server.New(backend, endpointScheme(config)) + b := server.New(backend, endpointScheme(config), config.NotifyInterval) grpcServer, err := grpcServer(config) if err != nil { return ETCDConfig{}, errors.Wrap(err, "creating GRPC server") diff --git a/pkg/server/limited.go b/pkg/server/limited.go index 5675a0b6..7957c84d 100644 --- a/pkg/server/limited.go +++ b/pkg/server/limited.go @@ -2,13 +2,15 @@ package server import ( "context" + "time" "go.etcd.io/etcd/api/v3/etcdserverpb" ) type LimitedServer struct { - backend Backend - scheme string + notifyInterval time.Duration + backend Backend + scheme string } func (l *LimitedServer) Range(ctx context.Context, r *etcdserverpb.RangeRequest) (*RangeResponse, error) { diff --git a/pkg/server/server.go b/pkg/server/server.go index 4823c62e..5aca5e57 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -1,6 +1,8 @@ package server import ( + "time" + "go.etcd.io/etcd/api/v3/etcdserverpb" "google.golang.org/grpc" "google.golang.org/grpc/health" @@ -12,11 +14,12 @@ type KVServerBridge struct { limited *LimitedServer } -func New(backend Backend, scheme string) *KVServerBridge { +func New(backend Backend, scheme string, notifyInterval time.Duration) *KVServerBridge { return &KVServerBridge{ limited: &LimitedServer{ - backend: backend, - scheme: scheme, + notifyInterval: notifyInterval, + backend: backend, + scheme: scheme, }, } } diff --git a/pkg/server/watch.go b/pkg/server/watch.go index c7239759..9db56222 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -15,18 +15,17 @@ import ( ) var watchID int64 -var progressReportInterval = 10 * time.Minute // explicit interface check var _ etcdserverpb.WatchServer = (*KVServerBridge)(nil) -// GetProgressReportInterval returns the current progress report interval, with some jitter -func GetProgressReportInterval() time.Duration { - // add rand(1/10*progressReportInterval) as jitter so that kine will not +// getProgressReportInterval returns the configured progress report interval, with some jitter +func (s *KVServerBridge) getProgressReportInterval() time.Duration { + // add rand(1/10*notifyInterval) as jitter so that kine will not // send progress notifications to watchers at the same time even when watchers // are created at the same time. - jitter := time.Duration(rand.Int63n(int64(progressReportInterval) / 10)) - return progressReportInterval + jitter + jitter := time.Duration(rand.Int63n(int64(s.limited.notifyInterval) / 10)) + return s.limited.notifyInterval + jitter } func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { @@ -40,7 +39,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { logrus.Tracef("WATCH SERVER CREATE") - go wait.PollInfiniteWithContext(ws.Context(), GetProgressReportInterval(), w.ProgressIfSynced) + go wait.PollInfiniteWithContext(ws.Context(), s.getProgressReportInterval(), w.ProgressIfSynced) for { msg, err := ws.Recv()