diff --git a/etcd/v3/kv_etcd.go b/etcd/v3/kv_etcd.go index a90d87af..ed7ac2d5 100644 --- a/etcd/v3/kv_etcd.go +++ b/etcd/v3/kv_etcd.go @@ -15,6 +15,7 @@ import ( "golang.org/x/net/context" e "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/clientv3/concurrency" "github.com/coreos/etcd/etcdserver" "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes" "github.com/coreos/etcd/mvcc/mvccpb" @@ -29,6 +30,9 @@ const ( // Name is the name of this kvdb implementation. Name = "etcdv3-kv" defaultRequestTimeout = 10 * time.Second + // defaultSessionTimeout in seconds is used for etcd watch + // to detect connectivity issues + defaultSessionTimeout = 120 urlPrefix = "http://" ) @@ -129,7 +133,7 @@ func (et *etcdKV) Get(key string) (*kvdb.KVPair, error) { switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[get %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -282,7 +286,7 @@ func (et *etcdKV) Enumerate(prefix string) (kvdb.KVPairs, error) { switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[enumerate %v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -390,7 +394,7 @@ func (et *etcdKV) Keys(prefix, sep string) ([]string, error) { cancel() switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[%v.%v]: kvdb deadline exceeded error: %v, retry count: %v\n", prefix, sep, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -715,7 +719,7 @@ func (et *etcdKV) setWithRetry(key, value string, ttl uint64) (*kvdb.KVPair, err handle_error: switch err { case context.DeadlineExceeded: - logrus.Errorf("kvdb deadline exceeded error: %v, retry count: %v\n", err, i) + logrus.Errorf("[set %v]: kvdb deadline exceeded error: %v, retry count: %v\n", key, err, i) time.Sleep(ec.DefaultIntervalBetweenRetries) case etcdserver.ErrTimeout: logrus.Errorf("kvdb error: %v, retry count: %v \n", err, i) @@ -803,43 +807,65 @@ func (et *etcdKV) watchStart( if waitIndex != 0 { opts = append(opts, e.WithRev(int64(waitIndex+1))) } + session, err := concurrency.NewSession(et.kvClient, concurrency.WithTTL(defaultSessionTimeout)) + if err != nil { + logrus.Errorf("Failed to establish session for etcd client watch: %v", err) + } watcher := e.NewWatcher(et.kvClient) - watchChan := watcher.Watch(context.Background(), key, opts...) - for wresp := range watchChan { - if wresp.Created == true { - continue - } - if wresp.Canceled == true { - // Watch is canceled. Notify the watcher - logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err()) - _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) - } else { - for _, ev := range wresp.Events { - var action string - if ev.Type == mvccpb.PUT { - if ev.Kv.Version == 1 { - action = "create" + ctx, watchCancel := context.WithCancel(context.Background()) + + watchRet := make(chan error) + watchChan := watcher.Watch(ctx, key, opts...) + go func() { + for wresp := range watchChan { + if wresp.Created == true { + continue + } + if wresp.Canceled == true { + // Watch is canceled. Notify the watcher + logrus.Errorf("Watch on key %v cancelled. Error: %v", key, wresp.Err()) + _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) + } else { + for _, ev := range wresp.Events { + var action string + if ev.Type == mvccpb.PUT { + if ev.Kv.Version == 1 { + action = "create" + } else { + action = "set" + } + } else if ev.Type == mvccpb.DELETE { + action = "delete" } else { - action = "set" + action = "unknown" } - } else if ev.Type == mvccpb.DELETE { - action = "delete" - } else { - action = "unknown" - } - err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil) - if err != nil { - closeErr := watcher.Close() - // etcd server might close the context before us. - if closeErr != context.Canceled && closeErr != nil { - logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr) + err := cb(key, opaque, et.resultToKv(ev.Kv, action), nil) + if err != nil { + closeErr := watcher.Close() + // etcd server might close the context before us. + if closeErr != context.Canceled && closeErr != nil { + logrus.Errorf("Unable to close the watcher channel for key %v : %v", key, closeErr) + } + // Indicate the caller that watch has been canceled + _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) + watchRet <- err + break } - // Indicate the caller that watch has been canceled - _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) - break } } } + }() + + select { + case <-session.Done(): // closed by etcd + // Close the context + watchCancel() + // Close the watcher + watcher.Close() + // Indicate the caller that watch has been canceled + _ = cb(key, opaque, nil, kvdb.ErrWatchStopped) + case err := <-watchRet: // error in watcher + logrus.Errorf("Watch for %v stopped: %v",key, err) } } @@ -1138,7 +1164,7 @@ func (e *etcdKV) ListMembers() (map[string]*kvdb.MemberUrls, error) { resp := make(map[string]*kvdb.MemberUrls) for _, member := range memberListResponse.Members { resp[member.Name] = &kvdb.MemberUrls{ - PeerUrls: member.PeerURLs, + PeerUrls: member.PeerURLs, ClientUrls: member.ClientURLs, } }