Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Etcdv3: Watch API Improvements #66

Merged
merged 1 commit into from
Nov 26, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 24 additions & 7 deletions etcd/v3/kv_etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,11 @@ func (w *watchQ) start() {
if err != nil {
w.done = true
logrus.Infof("Watch cb for key %v returned err: %v", key, err)
// Indicate the caller that watch has been canceled
_ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped)
if err != kvdb.ErrWatchStopped {
// The caller returned an error. Indicate the caller
// that the watch has been stopped
_ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped)
} // else we stopped the watch and the caller has been notified
// Indicate that watch is returning.
close(w.watchRet)
break
Expand Down Expand Up @@ -905,8 +908,10 @@ func (et *etcdKV) watchStart(
}
sessionChan := make(chan int, 1)
var (
session *concurrency.Session
err error
session *concurrency.Session
err error
watchStopLock sync.Mutex
watchStopped bool
)
go func() {
session, err = concurrency.NewSession(
Expand All @@ -928,7 +933,7 @@ func (et *etcdKV) watchStart(
_ = cb(key, opaque, nil, kvdb.ErrWatchStopped)
return
}
ctx, watchCancel := context.WithCancel(context.Background())
ctx, watchCancel := context.WithCancel(getContextWithLeaderRequirement())
watchRet := make(chan error)
watchChan := et.kvClient.Watch(ctx, key, opts...)
watchQ := newWatchQ(opaque, cb, watchRet)
Expand Down Expand Up @@ -964,14 +969,26 @@ func (et *etcdKV) watchStart(
}
}
logrus.Errorf("Watch on key %v closed without a Cancel response.", key)
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopLock.Lock()
// Stop the watch only if it has not been stopped already
if !watchStopped {
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopped = true
}
watchStopLock.Unlock()
}()

select {
case <-session.Done(): // closed by etcd
// Indicate the caller that watch has been canceled
logrus.Errorf("Watch closing session for key: %v", key)
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopLock.Lock()
// Stop the watch only if it has not been stopped already
if !watchStopped {
watchQ.enqueue(key, nil, kvdb.ErrWatchStopped)
watchStopped = true
}
watchStopLock.Unlock()
watchCancel()
case <-watchRet: // error in watcher
// Close the context
Expand Down