diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go index 4bc387cafa..f900495f97 100644 --- a/registry/etcdv3/listener.go +++ b/registry/etcdv3/listener.go @@ -19,6 +19,7 @@ package etcdv3 import ( "strings" + "sync" ) import ( @@ -79,8 +80,9 @@ func (l *dataListener) DataChange(eventType remoting.Event) bool { } type configurationListener struct { - registry *etcdV3Registry - events chan *config_center.ConfigChangeEvent + registry *etcdV3Registry + events chan *config_center.ConfigChangeEvent + closeOnce sync.Once } // NewConfigurationListener for listening the event of etcdv3. @@ -120,5 +122,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) { // Close etcd registry center func (l *configurationListener) Close() { - l.registry.WaitGroup().Done() + l.closeOnce.Do(func() { + l.registry.WaitGroup().Done() + }) } diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go index f3cc379bd8..7ccf32661c 100644 --- a/registry/etcdv3/registry.go +++ b/registry/etcdv3/registry.go @@ -51,7 +51,7 @@ type etcdV3Registry struct { registry.BaseRegistry cltLock sync.Mutex client *etcdv3.Client - listenerLock sync.Mutex + listenerLock sync.RWMutex listener *etcdv3.EventListener dataListener *dataListener configListener *configurationListener @@ -150,14 +150,9 @@ func (r *etcdV3Registry) CreatePath(k string) error { // DoSubscribe actually subscribe the provider URL func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) { - - var ( - configListener *configurationListener - ) - - r.listenerLock.Lock() - configListener = r.configListener - r.listenerLock.Unlock() + r.listenerLock.RLock() + configListener := r.configListener + r.listenerLock.RUnlock() if r.listener == nil { r.cltLock.Lock() client := r.client @@ -165,12 +160,8 @@ func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) if client == nil { return nil, perrors.New("etcd client broken") } - - // new client & listener - listener := etcdv3.NewEventListener(r.client) - r.listenerLock.Lock() - r.listener = listener + r.listener = etcdv3.NewEventListener(r.client) // new client & listener r.listenerLock.Unlock() } diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go index c66928a636..fd6f958597 100644 --- a/remoting/etcdv3/listener.go +++ b/remoting/etcdv3/listener.go @@ -36,7 +36,7 @@ import ( // nolint type EventListener struct { client *Client - keyMapLock sync.Mutex + keyMapLock sync.RWMutex keyMap map[string]struct{} wg sync.WaitGroup } @@ -181,9 +181,9 @@ func timeSecondDuration(sec int) time.Duration { // --------> listenServiceNodeEvent func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataListener) { - l.keyMapLock.Lock() + l.keyMapLock.RLock() _, ok := l.keyMap[key] - l.keyMapLock.Unlock() + l.keyMapLock.RUnlock() if ok { logger.Warnf("etcdv3 key %s has already been listened.", key) return