From 1b0ac1bef66410f4209625a47ae79e26f68ea9bd Mon Sep 17 00:00:00 2001 From: Aditya Dani Date: Fri, 23 Nov 2018 13:16:32 -0800 Subject: [PATCH] Etcdv3: Watch API Improvements 1. Send an ErrWatchStopped to the caller only once. - Currently ErrWatchStopped gets sent to the caller multiple times causing a resubscribing watch to fail as well. 2. Use context with leader requirement for Watch API. - By default the etcd watchers will hang in case of a network partition and they are connected to the minority. - As mentioned here - https://github.com/etcd-io/etcd/issues/7247#issuecomment-276154894 setting the leader requirement for watchers allows them to switch to the majority partition. --- etcd/v3/kv_etcd.go | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index 43723ef5..367bb7c9 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -84,8 +84,11 @@ func (w *watchQ) start() { if err != nil { w.done = true logrus.Infof("Watch cb for key %v returned err: %v", key, err) - // Indicate the caller that watch has been canceled - _ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped) + if err != kvdb.ErrWatchStopped { + // The caller returned an error. Indicate the caller + // that the watch has been stopped + _ = w.cb(key, w.opaque, nil, kvdb.ErrWatchStopped) + } // else we stopped the watch and the caller has been notified // Indicate that watch is returning. close(w.watchRet) break @@ -905,8 +908,10 @@ func (et *etcdKV) watchStart( } sessionChan := make(chan int, 1) var ( - session *concurrency.Session - err error + session *concurrency.Session + err error + watchStopLock sync.Mutex + watchStopped bool ) go func() { session, err = concurrency.NewSession( @@ -928,7 +933,7 @@ func (et *etcdKV) watchStart( _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) return } - ctx, watchCancel := context.WithCancel(context.Background()) + ctx, watchCancel := context.WithCancel(getContextWithLeaderRequirement()) watchRet := make(chan error) watchChan := et.kvClient.Watch(ctx, key, opts...) watchQ := newWatchQ(opaque, cb, watchRet) @@ -964,14 +969,26 @@ func (et *etcdKV) watchStart( } } logrus.Errorf("Watch on key %v closed without a Cancel response.", key) - watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopLock.Lock() + // Stop the watch only if it has not been stopped already + if !watchStopped { + watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopped = true + } + watchStopLock.Unlock() }() select { case <-session.Done(): // closed by etcd // Indicate the caller that watch has been canceled logrus.Errorf("Watch closing session for key: %v", key) - watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopLock.Lock() + // Stop the watch only if it has not been stopped already + if !watchStopped { + watchQ.enqueue(key, nil, kvdb.ErrWatchStopped) + watchStopped = true + } + watchStopLock.Unlock() watchCancel() case <-watchRet: // error in watcher // Close the context