Skip to content

Commit

Permalink
Fix Egress IP scheduling
Browse files Browse the repository at this point in the history
PR #4593 introduced maxEgressIPsPerNode to limit the number of Egress IPs
that can be assigned to a Node. However, it used the EgressInformer cache
to check whether a Node can accommodate new Egress IPs and did the
calculation for different Egresses concurrently, which may cause
inconsistent schedule results among agents. For instance:

When Nodes' capacity is 1 and two Egresses, e1 and e2, are created
concurrently, different agents may process them in different orders, with
different contexts:

- agent a1 may process Egress e1 first and assign it to Node n1; it then
  processes Egress e2 and think it should be assigned to Node n2 by agent
  a2 because n1 is out of space.
- agent a2 may process Egress e1 and e2 faster, before any of their
  status is updated in Egress API, and would think both Egresses should
  be assigned to Node n1 by agent a1.

As a result, Egress e2 will be left unassigned.

To fix the problem, the Egress IP scheduling should be deterministic
accross agents and time. This patch adds an egressIPScheduler, which
takes the spec of Egress and ExternalIPPool and the state of memberlist
cluster as inputs, generates scheduling results deterministically.

According to the benchmark test, scheduling 1,000 Egresses among 1,000
Nodes once takes less than 3ms.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Feb 15, 2023
1 parent 97e790b commit a7597c4
Show file tree
Hide file tree
Showing 6 changed files with 626 additions and 88 deletions.
119 changes: 40 additions & 79 deletions pkg/agent/controller/egress/egress_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,7 @@ const (
// maxEgressMark is the maximum mark of Egress IPs can be configured on a Node.
maxEgressMark = 255

egressIPIndex = "egressIP"
externalIPPoolIndex = "externalIPPool"
egressNodeIndex = "egressNode"
egressIPIndex = "egressIP"

// egressDummyDevice is the dummy device that holds the Egress IPs configured to the system by antrea-agent.
egressDummyDevice = "antrea-egress0"
Expand Down Expand Up @@ -147,7 +145,7 @@ type EgressController struct {
cluster memberlist.Interface
ipAssigner ipassigner.IPAssigner

maxEgressIPsPerNode int
egressIPScheduler *egressIPScheduler
}

func NewEgressController(
Expand Down Expand Up @@ -181,14 +179,15 @@ func NewEgressController(
localIPDetector: ipassigner.NewLocalIPDetector(),
idAllocator: newIDAllocator(minEgressMark, maxEgressMark),
cluster: cluster,
maxEgressIPsPerNode: maxEgressIPsPerNode,
}
ipAssigner, err := newIPAssigner(nodeTransportInterface, egressDummyDevice)
if err != nil {
return nil, fmt.Errorf("initializing egressIP assigner failed: %v", err)
}
c.ipAssigner = ipAssigner

c.egressIPScheduler = NewEgressIPScheduler(cluster, egressInformer, maxEgressIPsPerNode)

c.egressInformer.AddIndexers(
cache.Indexers{
// egressIPIndex will be used to get all Egresses sharing the same Egress IP.
Expand All @@ -199,28 +198,6 @@ func NewEgressController(
}
return []string{egress.Spec.EgressIP}, nil
},
// externalIPPoolIndex will be used to get all Egresses associated with a given ExternalIPPool.
externalIPPoolIndex: func(obj interface{}) (strings []string, e error) {
egress, ok := obj.(*crdv1a2.Egress)
if !ok {
return nil, fmt.Errorf("obj is not Egress: %+v", obj)
}
if egress.Spec.ExternalIPPool == "" {
return nil, nil
}
return []string{egress.Spec.ExternalIPPool}, nil
},
// egressNodeIndex will be used to get all Egresses assigned to a given Node.
egressNodeIndex: func(obj interface{}) ([]string, error) {
egress, ok := obj.(*crdv1a2.Egress)
if !ok {
return nil, fmt.Errorf("obj is not Egress: %+v", obj)
}
if egress.Status.EgressNode == "" {
return nil, nil
}
return []string{egress.Status.EgressNode}, nil
},
})
c.egressInformer.AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand All @@ -234,10 +211,15 @@ func NewEgressController(
// reported to kube-apiserver and processed by antrea-controller.
podUpdateSubscriber.Subscribe(c.processPodUpdate)
c.localIPDetector.AddEventHandler(c.onLocalIPUpdate)
c.cluster.AddClusterEventHandler(c.enqueueEgressesByExternalIPPool)
c.egressIPScheduler.AddEventHandler(c.onEgressIPSchedule)
return c, nil
}

// onEgressIPSchedule will be called when EgressIPScheduler reschedules an Egress's IP.
func (c *EgressController) onEgressIPSchedule(egress string) {
c.queue.Add(egress)
}

// processPodUpdate will be called when CNIServer publishes a Pod update event.
// It triggers reconciling the effective Egress of the Pod.
func (c *EgressController) processPodUpdate(e interface{}) {
Expand All @@ -263,11 +245,13 @@ func (c *EgressController) addEgress(obj interface{}) {
}

// updateEgress processes Egress UPDATE events.
func (c *EgressController) updateEgress(_, cur interface{}) {
func (c *EgressController) updateEgress(old, cur interface{}) {
oldEgress := old.(*crdv1a2.Egress)
curEgress := cur.(*crdv1a2.Egress)
// We need to sync the Egress once even if its spec doesn't change as a Node's EgressIP capacity may be exceeded
// when multiple Egresses were processed in parallel and were assigned to the same Node.
// Re-sync after status change could correct the assignment eventually.
// Ignore handling the Egress Status change if Egress IP already has been assigned on current node.
if curEgress.Status.EgressNode == c.nodeName && oldEgress.GetGeneration() == curEgress.GetGeneration() {
return
}
c.queue.Add(curEgress.Name)
klog.V(2).InfoS("Processed Egress UPDATE event", "egress", klog.KObj(curEgress))
}
Expand Down Expand Up @@ -307,18 +291,6 @@ func (c *EgressController) onLocalIPUpdate(ip string, added bool) {
}
}

// enqueueEgressesByExternalIPPool enqueues all Egresses that refer to the provided ExternalIPPool,
// the ExternalIPPool is affected by a Node update/create/delete event or
// Node leaves/join cluster event or ExternalIPPool changed.
func (c *EgressController) enqueueEgressesByExternalIPPool(eipName string) {
objects, _ := c.egressInformer.GetIndexer().ByIndex(externalIPPoolIndex, eipName)
for _, object := range objects {
egress := object.(*crdv1a2.Egress)
c.queue.Add(egress.Name)
}
klog.InfoS("Detected ExternalIPPool event", "ExternalIPPool", eipName, "enqueueEgressNum", len(objects))
}

// Run will create defaultWorkers workers (go routines) which will process the Egress events from the
// workqueue.
func (c *EgressController) Run(stopCh <-chan struct{}) {
Expand All @@ -328,9 +300,9 @@ func (c *EgressController) Run(stopCh <-chan struct{}) {
defer klog.Infof("Shutting down %s", controllerName)

go c.localIPDetector.Run(stopCh)

go c.egressIPScheduler.Run(stopCh)
go c.ipAssigner.Run(stopCh)
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced, c.localIPDetector.HasSynced) {
if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.egressListerSynced, c.localIPDetector.HasSynced, c.egressIPScheduler.HasScheduled) {
return
}

Expand Down Expand Up @@ -632,61 +604,50 @@ func (c *EgressController) syncEgress(egressName string) error {
return err
}

var desiredEgressIP string
var desiredNode string
// Only check whether the Egress IP should be assigned to this Node when ExternalIPPool is set.
// If ExternalIPPool is empty, users are responsible for assigning the Egress IPs to Nodes.
if egress.Spec.ExternalIPPool != "" {
egressIP, egressNode, scheduled := c.egressIPScheduler.GetEgressIPAndNode(egressName)
if scheduled {
desiredEgressIP = egressIP
desiredNode = egressNode
}
} else {
desiredEgressIP = egress.Spec.EgressIP
}

eState, exist := c.getEgressState(egressName)
// If the EgressIP changes, uninstalls this Egress first.
if exist && eState.egressIP != egress.Spec.EgressIP {
if exist && eState.egressIP != desiredEgressIP {
if err := c.uninstallEgress(egressName, eState); err != nil {
return err
}
exist = false
}
// Do not proceed if EgressIP is empty.
if egress.Spec.EgressIP == "" {
if desiredEgressIP == "" {
return nil
}
if !exist {
eState = c.newEgressState(egressName, egress.Spec.EgressIP)
eState = c.newEgressState(egressName, desiredEgressIP)
}

localNodeSelected := false
// Only check whether the Egress IP should be assigned to this Node when ExternalIPPool is set.
// If ExternalIPPool is empty, users are responsible for assigning the Egress IPs to Nodes.
if egress.Spec.ExternalIPPool != "" {
maxEgressIPsFilter := func(node string) bool {
// Assuming this Egress IP is assigned to this Node.
egressIPsOnNode := sets.NewString(egress.Spec.EgressIP)
// Add the Egress IPs that are already assigned to this Node.
egressesOnNode, _ := c.egressInformer.GetIndexer().ByIndex(egressNodeIndex, node)
for _, obj := range egressesOnNode {
egressOnNode := obj.(*crdv1a2.Egress)
// We don't count manually managed Egress IPs.
if egressOnNode.Spec.ExternalIPPool == "" {
continue
}
egressIPsOnNode.Insert(egressOnNode.Spec.EgressIP)
}
// Check if this Node can accommodate all Egress IPs.
return egressIPsOnNode.Len() <= c.maxEgressIPsPerNode
}
localNodeSelected, err = c.cluster.ShouldSelectIP(egress.Spec.EgressIP, egress.Spec.ExternalIPPool, maxEgressIPsFilter)
if err != nil {
return err
}
}
if localNodeSelected {
if desiredNode == c.nodeName {
// Ensure the Egress IP is assigned to the system.
if err := c.ipAssigner.AssignIP(egress.Spec.EgressIP); err != nil {
if err := c.ipAssigner.AssignIP(desiredEgressIP); err != nil {
return err
}
} else {
// Unassign the Egress IP from the local Node if it was assigned by the agent.
if err := c.ipAssigner.UnassignIP(egress.Spec.EgressIP); err != nil {
if err := c.ipAssigner.UnassignIP(desiredEgressIP); err != nil {
return err
}
}

// Realize the latest EgressIP and get the desired mark.
mark, err := c.realizeEgressIP(egressName, egress.Spec.EgressIP)
mark, err := c.realizeEgressIP(egressName, desiredEgressIP)
if err != nil {
return err
}
Expand All @@ -701,7 +662,7 @@ func (c *EgressController) syncEgress(egressName string) error {
eState.mark = mark
}

if err := c.updateEgressStatus(egress, c.localIPDetector.IsLocalIP(egress.Spec.EgressIP)); err != nil {
if err := c.updateEgressStatus(egress, c.localIPDetector.IsLocalIP(desiredEgressIP)); err != nil {
return fmt.Errorf("update Egress %s status error: %v", egressName, err)
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/agent/controller/egress/egress_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ func TestSyncEgress(t *testing.T) {
defer c.mockController.Finish()

if tt.maxEgressIPsPerNode > 0 {
c.maxEgressIPsPerNode = tt.maxEgressIPsPerNode
c.egressIPScheduler.maxEgressIPsPerNode = tt.maxEgressIPsPerNode
}

stopCh := make(chan struct{})
Expand All @@ -649,6 +649,7 @@ func TestSyncEgress(t *testing.T) {
c.addEgressGroup(tt.existingEgressGroup)

tt.expectedCalls(c.mockOFClient, c.mockRouteClient, c.mockIPAssigner)
c.egressIPScheduler.schedule()
err := c.syncEgress(tt.existingEgress.Name)
assert.NoError(t, err)

Expand All @@ -666,6 +667,7 @@ func TestSyncEgress(t *testing.T) {
egress, _ := c.egressLister.Get(tt.newEgress.Name)
return reflect.DeepEqual(egress, tt.newEgress), nil
}))
c.egressIPScheduler.schedule()
err = c.syncEgress(tt.newEgress.Name)
assert.NoError(t, err)
// Call it one more time to ensure it's idempotent, no extra datapath calls are supposed to be made.
Expand Down Expand Up @@ -796,9 +798,6 @@ func TestSyncOverlappingEgress(t *testing.T) {
err = c.syncEgress(egress3.Name)
assert.NoError(t, err)

// egress1 and egress3 are expected to be triggered for resync because their status is updated.
checkQueueItemExistence(t, c.queue, egress1.Name, egress3.Name)

// After deleting egress1, pod1 and pod2 no longer enforces egress1. The Egress IP shouldn't be released as egress3
// is still referring to it.
// egress2 and egress3 are expected to be triggered for resync.
Expand Down
Loading

0 comments on commit a7597c4

Please sign in to comment.