Skip to content

Commit

Permalink
Make memberlist cluster rejoin dead nodes periodically (antrea-io#4491)
Browse files Browse the repository at this point in the history
The patch periodically rejoins Nodes that were removed from the member
list by memberlist because they were unreachable for more than 15
seconds (the GossipToTheDeadTime we are using). Without it, once there
is a network downtime lasting more than 15 seconds, the agent wouldn't
try to reach any other Node and would think it's the only alive Node
until it's restarted.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Dec 18, 2022
1 parent c9c1b2c commit 18011b8
Show file tree
Hide file tree
Showing 4 changed files with 298 additions and 68 deletions.
1 change: 1 addition & 0 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ function generate_mocks {
"pkg/agent/cniserver/ipam IPAMDriver testing"
"pkg/agent/flowexporter/connections ConnTrackDumper,NetFilterConnTrack testing"
"pkg/agent/interfacestore InterfaceStore testing"
"pkg/agent/memberlist Memberlist testing"
"pkg/agent/multicast RouteInterface testing"
"pkg/agent/types McastNetworkPolicyController testing"
"pkg/agent/nodeportlocal/portcache LocalPortOpener testing"
Expand Down
138 changes: 97 additions & 41 deletions pkg/agent/memberlist/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,20 @@ type Interface interface {
AddClusterEventHandler(handler ClusterNodeEventHandler)
}

type Memberlist interface {
Join(existing []string) (int, error)
Members() []*memberlist.Node
Leave(timeout time.Duration) error
Shutdown() error
}

// Cluster implements ClusterInterface.
type Cluster struct {
bindPort int
// Name of local Node. Node name must be unique in the cluster.
nodeName string

mList *memberlist.Memberlist
mList Memberlist
// consistentHash hold the consistentHashMap, when a Node join cluster, use method Add() to add a key to the hash.
// when a Node leave the cluster, the consistentHashMap should be update.
consistentHashMap map[string]*consistenthash.Map
Expand Down Expand Up @@ -129,14 +136,15 @@ func NewCluster(
nodeName string,
nodeInformer coreinformers.NodeInformer,
externalIPPoolInformer crdinformers.ExternalIPPoolInformer,
transport memberlist.Transport, // Parameterized for testing, could be left nil for production code.
ml Memberlist, // Parameterized for testing, could be left nil for production code.
) (*Cluster, error) {
// The Node join/leave events will be notified via it.
nodeEventCh := make(chan memberlist.NodeEvent, 1024)
c := &Cluster{
bindPort: clusterBindPort,
nodeName: nodeName,
consistentHashMap: make(map[string]*consistenthash.Map),
mList: ml,
nodeEventsCh: nodeEventCh,
nodeInformer: nodeInformer,
nodeLister: nodeInformer.Lister(),
Expand All @@ -147,21 +155,24 @@ func NewCluster(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "externalIPPool"),
}

conf := memberlist.DefaultLocalConfig()
conf.Name = c.nodeName
conf.Transport = transport
conf.BindPort = c.bindPort
conf.AdvertisePort = c.bindPort
conf.AdvertiseAddr = nodeIP.String()
conf.Events = &memberlist.ChannelEventDelegate{Ch: nodeEventCh}
conf.LogOutput = io.Discard
klog.V(1).InfoS("New memberlist cluster", "config", conf)

mList, err := memberlist.Create(conf)
if err != nil {
return nil, fmt.Errorf("failed to create memberlist cluster: %v", err)
if ml == nil {
conf := memberlist.DefaultLocalConfig()
conf.Name = c.nodeName
conf.BindPort = c.bindPort
conf.AdvertisePort = c.bindPort
conf.AdvertiseAddr = nodeIP.String()
// Setting it to a non-zero value to allow reclaiming Nodes with different addresses for Node IP update case.
conf.DeadNodeReclaimTime = 10 * time.Millisecond
conf.Events = &memberlist.ChannelEventDelegate{Ch: nodeEventCh}
conf.LogOutput = io.Discard
klog.V(1).InfoS("New memberlist cluster", "config", conf)

mList, err := memberlist.Create(conf)
if err != nil {
return nil, fmt.Errorf("failed to create memberlist cluster: %v", err)
}
c.mList = mList
}
c.mList = mList

nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -190,13 +201,16 @@ func NewCluster(

func (c *Cluster) handleCreateNode(obj interface{}) {
node := obj.(*corev1.Node)
if member, err := c.newClusterMember(node); err == nil {
_, err := c.mList.Join([]string{member})
if err != nil {
klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member)
// Ignore the Node itself.
if node.Name != c.nodeName {
if member, err := c.newClusterMember(node); err == nil {
_, err := c.mList.Join([]string{member})
if err != nil {
klog.ErrorS(err, "Processing Node CREATE event error, join cluster failed", "member", member)
}
} else {
klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name)
}
} else {
klog.ErrorS(err, "Processing Node CREATE event error", "nodeName", node.Name)
}

affectedEIPs := c.filterEIPsFromNodeLabels(node)
Expand Down Expand Up @@ -263,8 +277,7 @@ func (c *Cluster) enqueueExternalIPPool(obj interface{}) {
c.queue.Add(eip.Name)
}

// newClusterMember gets the Node's IP and returns a cluster member "<IP>:<clusterMemberlistPort>"
// representing that Node in the memberlist cluster.
// newClusterMember gets the Node's IP and returns it as a cluster member for memberlist cluster to join.
func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) {
nodeAddrs, err := k8s.GetNodeAddrs(node)
if err != nil {
Expand All @@ -279,11 +292,7 @@ func (c *Cluster) newClusterMember(node *corev1.Node) (string, error) {

func (c *Cluster) filterEIPsFromNodeLabels(node *corev1.Node) sets.String {
pools := sets.NewString()
eips, err := c.externalIPPoolLister.List(labels.Everything())
if err != nil {
klog.ErrorS(err, "Filter ExternalIPPools from nodeLabels failed")
return pools
}
eips, _ := c.externalIPPoolLister.List(labels.Everything())
for _, eip := range eips {
nodeSelector, _ := metav1.LabelSelectorAsSelector(&eip.Spec.NodeSelector)
if nodeSelector.Matches(labels.Set(node.GetLabels())) {
Expand Down Expand Up @@ -314,14 +323,64 @@ func (c *Cluster) Run(stopCh <-chan struct{}) {
go wait.Until(c.worker, time.Second, stopCh)
}

for {
select {
case <-stopCh:
return
case nodeEvent := <-c.nodeEventsCh:
c.handleClusterNodeEvents(&nodeEvent)
go func() {
for {
select {
case <-stopCh:
return
case nodeEvent := <-c.nodeEventsCh:
c.handleClusterNodeEvents(&nodeEvent)
}
}
}()

// Rejoin Nodes periodically in case some Nodes are removed from the member list because of long downtime.
go func() {
ticker := time.NewTicker(1 * time.Minute)
for {
select {
case <-stopCh:
return
case <-ticker.C:
c.RejoinNodes()
}
}
}()

<-stopCh
}

// RejoinNodes rejoins Nodes that were removed from the member list by memberlist because they were unreachable for more
// than 15 seconds (the GossipToTheDeadTime we are using). Without it, once there is a network downtime lasting more
// than 15 seconds, the agent wouldn't try to reach any other Node and would think it's the only alive Node until it's
// restarted.
func (c *Cluster) RejoinNodes() {
nodes, _ := c.nodeLister.List(labels.Everything())
aliveNodes := c.AliveNodes()
var membersToJoin []string
for _, node := range nodes {
if !aliveNodes.Has(node.Name) {
member, err := c.newClusterMember(node)
if err != nil {
klog.ErrorS(err, "Failed to generate cluster member to join", "Node", node.Name)
continue
}
membersToJoin = append(membersToJoin, member)
}
}
// Every known Node is alive, do nothing.
if len(membersToJoin) == 0 {
return
}
// The Join method returns an error only when none could be reached.
numSuccess, err := c.mList.Join(membersToJoin)
if err != nil {
klog.ErrorS(err, "Failed to rejoin any members", "members", membersToJoin)
} else if numSuccess != len(membersToJoin) {
klog.ErrorS(err, "Failed to rejoin some members", "members", membersToJoin, "numSuccess", numSuccess)
} else {
klog.InfoS("Rejoined all members", "members", membersToJoin)
}
}

func (c *Cluster) worker() {
Expand Down Expand Up @@ -421,12 +480,9 @@ func (c *Cluster) handleClusterNodeEvents(nodeEvent *memberlist.NodeEvent) {
// if the Node has failed, ExternalIPPools consistentHash maybe changed, and affected ExternalIPPool should be enqueued.
coreNode, err := c.nodeLister.Get(node.Name)
if err != nil {
if apierrors.IsNotFound(err) {
// Node has been deleted, and deleteNode handler has been executed.
klog.ErrorS(err, "Processing Node event, not found", "eventType", event)
return
}
klog.ErrorS(err, "Processing Node event, get Node failed", "eventType", event)
// It means the Node has been deleted, no further processing is needed as handleDeleteNode has enqueued
// related ExternalIPPools.
klog.InfoS("Received a Node event but did not find the Node object", "eventType", mapNodeEventType[event], "nodeName", node.Name)
return
}
affectedEIPs := c.filterEIPsFromNodeLabels(coreNode)
Expand Down
Loading

0 comments on commit 18011b8

Please sign in to comment.