From 5cab6d2bab54faf45de4ec7d4110088aadc2a7b7 Mon Sep 17 00:00:00 2001 From: Brad Davidson Date: Tue, 16 Jan 2024 00:36:58 +0000 Subject: [PATCH] Add support for WatchProgressRequest Signed-off-by: Brad Davidson --- go.mod | 2 +- pkg/server/watch.go | 73 +++++++++++++++++++++++++++++---------------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index 82a83897..91a83a1c 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( go.etcd.io/etcd/client/v3 v3.5.9 go.etcd.io/etcd/server/v3 v3.5.9 google.golang.org/grpc v1.56.3 + k8s.io/apimachinery v0.25.4 k8s.io/client-go v0.25.4 ) @@ -90,7 +91,6 @@ require ( google.golang.org/protobuf v1.30.0 // indirect gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect - k8s.io/apimachinery v0.25.4 // indirect k8s.io/klog/v2 v2.70.1 // indirect k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/pkg/server/watch.go b/pkg/server/watch.go index b20bbb8f..86841a06 100644 --- a/pkg/server/watch.go +++ b/pkg/server/watch.go @@ -11,6 +11,7 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "k8s.io/apimachinery/pkg/util/wait" ) var watchID int64 @@ -32,10 +33,6 @@ func GetProgressReportInterval() time.Duration { } func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { - interval := GetProgressReportInterval() - progressTicker := time.NewTicker(interval) - defer progressTicker.Stop() - w := watcher{ server: ws, backend: s.limited.backend, @@ -46,7 +43,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { logrus.Tracef("WATCH SERVER CREATE") - go w.DoProgress(ws.Context(), progressTicker) + go wait.NonSlidingUntilWithContext(ws.Context(), w.ProgressIfSynced, GetProgressReportInterval()) for { msg, err := ws.Recv() @@ -61,11 +58,14 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error { logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId) w.Cancel(cr.WatchId, 0, 0, nil) } + if pr := msg.GetProgressRequest(); pr != nil { + w.Progress(ws.Context()) + } } } type watcher struct { - sync.Mutex + sync.RWMutex wg sync.WaitGroup backend Backend @@ -226,28 +226,49 @@ func (w *watcher) Close() { w.wg.Wait() } -func (w *watcher) DoProgress(ctx context.Context, ticker *time.Ticker) { - for { - select { - case <-ctx.Done(): +// Progress sends a progress report if all watchers are synced. +// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504 +func (w *watcher) Progress(ctx context.Context) { + w.RLock() + defer w.RUnlock() + + // Are any watchers unsynced? + for _, r := range w.progress { + if r != w.progressRev { return - case <-ticker.C: - rev, err := w.backend.CurrentRevision(ctx) - if err != nil { - logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) - continue - } + } + } - w.Lock() - for id, r := range w.progress { - if r == w.progressRev { - logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) - go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) - } - w.progress[id] = rev - } - w.progressRev = rev - w.Unlock() + // If all watchers are synced, send a progress notification with the latest revision. + id := int64(clientv3.InvalidWatchID) + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return + } + + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) +} + +// ProgressIfSynced sends a progress report on any channels that are synced. +func (w *watcher) ProgressIfSynced(ctx context.Context) { + rev, err := w.backend.CurrentRevision(ctx) + if err != nil { + logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err) + return + } + + w.Lock() + defer w.Unlock() + + for id, r := range w.progress { + if r == w.progressRev { + logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev) + go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id}) } + w.progress[id] = rev } + w.progressRev = rev + return }