diff --git a/config/reference_config.go b/config/reference_config.go index 895ab9df26..595a6717c0 100644 --- a/config/reference_config.go +++ b/config/reference_config.go @@ -258,6 +258,11 @@ func (c *ReferenceConfig) GenericLoad(id string) { c.Implement(genericService) } +// GetInvoker get invoker from ReferenceConfig +func (c *ReferenceConfig) GetInvoker() protocol.Invoker { + return c.invoker +} + func publishConsumerDefinition(url *common.URL) { if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil { remoteMetadataService.PublishServiceDefinition(url) diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go index 3232ee624d..55b0df7d56 100644 --- a/registry/zookeeper/registry.go +++ b/registry/zookeeper/registry.go @@ -223,23 +223,22 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error { // try to register the node zkPath, err = r.client.RegisterTemp(root, node) - if err != nil { - logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) - if perrors.Cause(err) == zk.ErrNodeExists { - // should delete the old node - logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node) - if err = r.client.Delete(zkPath); err == nil { - _, err = r.client.RegisterTemp(root, node) - } - if err != nil { - logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) - } + if err == nil { + return nil + } + + if perrors.Cause(err) == zk.ErrNodeExists { + if err = r.client.Delete(zkPath); err == nil { + _, err = r.client.RegisterTemp(root, node) + } + + if err == nil { + return nil } - return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node) } - logger.Debugf("Create a zookeeper node:%s", zkPath) - return nil + logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err)) + return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node) } func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) { diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go index 852a5564f9..54f2992693 100644 --- a/remoting/zookeeper/client.go +++ b/remoting/zookeeper/client.go @@ -38,7 +38,7 @@ const ( // ConnDelay connection delay interval ConnDelay = 3 // MaxFailTimes max fail times - MaxFailTimes = 15 + MaxFailTimes = 3 ) var ( @@ -259,7 +259,6 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { switch (int)(event.State) { case (int)(zk.StateDisconnected): logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name) - z.stop() z.Lock() conn := z.Conn z.Conn = nil @@ -267,6 +266,7 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) { if conn != nil { conn.Close() } + z.stop() return case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged): logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path) @@ -555,7 +555,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, if err == zk.ErrNoNode { return nil, nil, errNilNode } - logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err) + logger.Warnf("zk.ChildrenW(path{%s}) = error(%v)", path, err) return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path) } if stat == nil { @@ -637,6 +637,9 @@ func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int3 // getConn gets zookeeper connection safely func (z *ZookeeperClient) getConn() *zk.Conn { + if z == nil { + return nil + } z.RLock() defer z.RUnlock() return z.Conn diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go index 4dc0a549f2..3440d832e0 100644 --- a/remoting/zookeeper/facade.go +++ b/remoting/zookeeper/facade.go @@ -82,6 +82,7 @@ LOOP: break } failTimes++ + logger.Warnf("ZK reconnect failed %d times", failTimes) if MaxFailTimes <= failTimes { failTimes = MaxFailTimes } diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go index 3eda6d9698..f1ac954dc0 100644 --- a/remoting/zookeeper/listener.go +++ b/remoting/zookeeper/listener.go @@ -69,8 +69,11 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin go func(zkPath string, listener remoting.DataListener) { if l.listenServiceNodeEvent(zkPath, listener) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) + l.pathMapLock.Lock() + delete(l.pathMap, zkPath) + l.pathMapLock.Unlock() } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath) }(zkPath, listener) } @@ -87,7 +90,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo select { case zkEvent = <-keyEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) switch zkEvent.Type { case zk.EventNodeDataChanged: @@ -146,6 +149,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } else { logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err)) } + return } // a node was added -- listen the new node @@ -165,19 +169,21 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li newNode, perrors.WithStack(connErr)) } - if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) { + if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) { continue } // listen l service node l.wg.Add(1) - go func(node string, zkPath string, listener remoting.DataListener) { - logger.Infof("delete zkNode{%s}", node) + go func(node string, listener remoting.DataListener) { if l.listenServiceNodeEvent(node, listener) { - logger.Infof("delete content{%s}", node) - listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) + logger.Warnf("delete zkNode{%s}", node) + listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel}) + l.pathMapLock.Lock() + delete(l.pathMap, zkPath) + l.pathMapLock.Unlock() } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) - }(newNode, zkPath, listener) + logger.Warnf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", node) + }(newNode, listener) } // old node was deleted @@ -188,12 +194,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li } oldNode = path.Join(zkPath, n) - logger.Warnf("delete zkPath{%s}", oldNode) - - if err != nil { - logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err)) - continue - } + logger.Warnf("delete oldNode{%s}", oldNode) listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel}) } } @@ -304,10 +305,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen if l.listenServiceNodeEvent(zkPath) { listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel}) l.pathMapLock.Lock() - defer l.pathMapLock.Unlock() delete(l.pathMap, zkPath) + l.pathMapLock.Unlock() } - logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath) + logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath) }(dubboPath, listener) // listen sub path recursive @@ -329,7 +330,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen case <-ticker.C: l.handleZkNodeEvent(zkPath, children, listener) case zkEvent = <-childEventCh: - logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", + logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}", zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err) ticker.Stop() if zkEvent.Type != zk.EventNodeChildrenChanged { @@ -338,7 +339,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen l.handleZkNodeEvent(zkEvent.Path, children, listener) break WATCH case <-l.client.Done(): - logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath) + logger.Warnf("watch client.done(), listen(path{%s}) goroutine exit now...", zkPath) ticker.Stop() return } @@ -360,7 +361,7 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li l.wg.Add(1) go func(zkPath string, listener remoting.DataListener) { l.listenDirEvent(conf, zkPath, listener) - logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) + logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath) }(zkPath, listener) }