Skip to content

Commit

Permalink
Add support for WatchProgressRequest
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Davidson <brad.davidson@rancher.com>
  • Loading branch information
brandond committed Jan 16, 2024
1 parent 4929f3e commit 5cab6d2
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
go.etcd.io/etcd/client/v3 v3.5.9
go.etcd.io/etcd/server/v3 v3.5.9
google.golang.org/grpc v1.56.3
k8s.io/apimachinery v0.25.4
k8s.io/client-go v0.25.4
)

Expand Down Expand Up @@ -90,7 +91,6 @@ require (
google.golang.org/protobuf v1.30.0 // indirect
gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
k8s.io/apimachinery v0.25.4 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
73 changes: 47 additions & 26 deletions pkg/server/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"go.etcd.io/etcd/api/v3/etcdserverpb"
"go.etcd.io/etcd/api/v3/mvccpb"
clientv3 "go.etcd.io/etcd/client/v3"
"k8s.io/apimachinery/pkg/util/wait"
)

var watchID int64
Expand All @@ -32,10 +33,6 @@ func GetProgressReportInterval() time.Duration {
}

func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
interval := GetProgressReportInterval()
progressTicker := time.NewTicker(interval)
defer progressTicker.Stop()

w := watcher{
server: ws,
backend: s.limited.backend,
Expand All @@ -46,7 +43,7 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {

logrus.Tracef("WATCH SERVER CREATE")

go w.DoProgress(ws.Context(), progressTicker)
go wait.NonSlidingUntilWithContext(ws.Context(), w.ProgressIfSynced, GetProgressReportInterval())

for {
msg, err := ws.Recv()
Expand All @@ -61,11 +58,14 @@ func (s *KVServerBridge) Watch(ws etcdserverpb.Watch_WatchServer) error {
logrus.Tracef("WATCH CANCEL REQ id=%d", cr.WatchId)
w.Cancel(cr.WatchId, 0, 0, nil)
}
if pr := msg.GetProgressRequest(); pr != nil {
w.Progress(ws.Context())
}
}
}

type watcher struct {
sync.Mutex
sync.RWMutex

wg sync.WaitGroup
backend Backend
Expand Down Expand Up @@ -226,28 +226,49 @@ func (w *watcher) Close() {
w.wg.Wait()
}

func (w *watcher) DoProgress(ctx context.Context, ticker *time.Ticker) {
for {
select {
case <-ctx.Done():
// Progress sends a progress report if all watchers are synced.
// Ref: https://github.com/etcd-io/etcd/blob/v3.5.11/server/mvcc/watchable_store.go#L500-L504
func (w *watcher) Progress(ctx context.Context) {
w.RLock()
defer w.RUnlock()

// Are any watchers unsynced?
for _, r := range w.progress {
if r != w.progressRev {
return
case <-ticker.C:
rev, err := w.backend.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err)
continue
}
}
}

w.Lock()
for id, r := range w.progress {
if r == w.progressRev {
logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev)
go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id})
}
w.progress[id] = rev
}
w.progressRev = rev
w.Unlock()
// If all watchers are synced, send a progress notification with the latest revision.
id := int64(clientv3.InvalidWatchID)
rev, err := w.backend.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err)
return
}

logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev)
go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id})
}

// ProgressIfSynced sends a progress report on any channels that are synced.
func (w *watcher) ProgressIfSynced(ctx context.Context) {
rev, err := w.backend.CurrentRevision(ctx)
if err != nil {
logrus.Errorf("Failed to get current revision for ProgressNotify: %v", err)
return
}

w.Lock()
defer w.Unlock()

for id, r := range w.progress {
if r == w.progressRev {
logrus.Tracef("WATCH SEND PROGRESS id=%d, revision=%d", id, rev)
go w.server.Send(&etcdserverpb.WatchResponse{Header: txnHeader(rev), WatchId: id})
}
w.progress[id] = rev
}
w.progressRev = rev
return
}

0 comments on commit 5cab6d2

Please sign in to comment.