Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: consumer invoker cache set nil after the ZK connection is lost #985

Merged
merged 20 commits into from
Feb 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions config/reference_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,11 @@ func (c *ReferenceConfig) GenericLoad(id string) {
c.Implement(genericService)
}

// GetInvoker get invoker from ReferenceConfig
func (c *ReferenceConfig) GetInvoker() protocol.Invoker {
return c.invoker
}

jack15083 marked this conversation as resolved.
Show resolved Hide resolved
func publishConsumerDefinition(url *common.URL) {
if remoteMetadataService, err := extension.GetRemoteMetadataService(); err == nil && remoteMetadataService != nil {
remoteMetadataService.PublishServiceDefinition(url)
Expand Down
27 changes: 13 additions & 14 deletions registry/zookeeper/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,23 +223,22 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {

// try to register the node
zkPath, err = r.client.RegisterTemp(root, node)
if err != nil {
logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
if perrors.Cause(err) == zk.ErrNodeExists {
// should delete the old node
logger.Info("Register temp node failed, try to delete the old and recreate (root{%s}, node{%s}) , ignore!", root, node)
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
}
if err != nil {
logger.Errorf("Recreate the temp node failed, (root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
}
if err == nil {
return nil
}

if perrors.Cause(err) == zk.ErrNodeExists {
if err = r.client.Delete(zkPath); err == nil {
_, err = r.client.RegisterTemp(root, node)
}

if err == nil {
return nil
}
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}
logger.Debugf("Create a zookeeper node:%s", zkPath)

return nil
logger.Errorf("Register temp node(root{%s}, node{%s}) = error{%v}", root, node, perrors.WithStack(err))
return perrors.WithMessagef(err, "RegisterTempNode(root{%s}, node{%s})", root, node)
}

func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
Expand Down
9 changes: 6 additions & 3 deletions remoting/zookeeper/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ const (
// ConnDelay connection delay interval
ConnDelay = 3
// MaxFailTimes max fail times
MaxFailTimes = 15
MaxFailTimes = 3
)

var (
Expand Down Expand Up @@ -259,14 +259,14 @@ func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
switch (int)(event.State) {
case (int)(zk.StateDisconnected):
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
conn := z.Conn
z.Conn = nil
z.Unlock()
if conn != nil {
conn.Close()
}
z.stop()
jack15083 marked this conversation as resolved.
Show resolved Hide resolved
return
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
Expand Down Expand Up @@ -555,7 +555,7 @@ func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event,
if err == zk.ErrNoNode {
return nil, nil, errNilNode
}
logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
logger.Warnf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path)
}
if stat == nil {
Expand Down Expand Up @@ -637,6 +637,9 @@ func (z *ZookeeperClient) SetContent(zkPath string, content []byte, version int3

// getConn gets zookeeper connection safely
func (z *ZookeeperClient) getConn() *zk.Conn {
if z == nil {
return nil
}
z.RLock()
defer z.RUnlock()
return z.Conn
Expand Down
1 change: 1 addition & 0 deletions remoting/zookeeper/facade.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ LOOP:
break
}
failTimes++
logger.Warnf("ZK reconnect failed %d times", failTimes)
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
Expand Down
41 changes: 21 additions & 20 deletions remoting/zookeeper/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,11 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
go func(zkPath string, listener remoting.DataListener) {
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}

Expand All @@ -87,7 +90,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo

select {
case zkEvent = <-keyEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
logger.Warnf("get a zookeeper keyEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
switch zkEvent.Type {
case zk.EventNodeDataChanged:
Expand Down Expand Up @@ -146,6 +149,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
return
}

// a node was added -- listen the new node
Expand All @@ -165,19 +169,21 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode, perrors.WithStack(connErr))
}

if !listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
if !listener.DataChange(remoting.Event{Path: newNode, Action: remoting.EventTypeAdd, Content: string(content)}) {
continue
}
// listen l service node
l.wg.Add(1)
go func(node string, zkPath string, listener remoting.DataListener) {
logger.Infof("delete zkNode{%s}", node)
go func(node string, listener remoting.DataListener) {
if l.listenServiceNodeEvent(node, listener) {
logger.Infof("delete content{%s}", node)
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(newNode, zkPath, listener)
logger.Warnf("handleZkNodeEvent->listenSelf(zk path{%s}) goroutine exit now", node)
}(newNode, listener)
}

// old node was deleted
Expand All @@ -188,12 +194,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
}

oldNode = path.Join(zkPath, n)
logger.Warnf("delete zkPath{%s}", oldNode)

if err != nil {
logger.Errorf("NewURL(i{%s}) = error{%v}", n, perrors.WithStack(err))
continue
}
logger.Warnf("delete oldNode{%s}", oldNode)
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
Expand Down Expand Up @@ -304,10 +305,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
if l.listenServiceNodeEvent(zkPath) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()
defer l.pathMapLock.Unlock()
jack15083 marked this conversation as resolved.
Show resolved Hide resolved
delete(l.pathMap, zkPath)
l.pathMapLock.Unlock()
}
logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(dubboPath, listener)

// listen sub path recursive
Expand All @@ -329,7 +330,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
case <-ticker.C:
l.handleZkNodeEvent(zkPath, children, listener)
case zkEvent = <-childEventCh:
logger.Warnf("get a zookeeper zkEvent{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
logger.Warnf("get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, StateToString(zkEvent.State), zkEvent.Err)
ticker.Stop()
if zkEvent.Type != zk.EventNodeChildrenChanged {
Expand All @@ -338,7 +339,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
l.handleZkNodeEvent(zkEvent.Path, children, listener)
break WATCH
case <-l.client.Done():
logger.Warnf("client.done(), listen(path{%s}) goroutine exit now...", zkPath)
logger.Warnf("watch client.done(), listen(path{%s}) goroutine exit now...", zkPath)
ticker.Stop()
return
}
Expand All @@ -360,7 +361,7 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(conf, zkPath, listener)
logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}

Expand Down