Skip to content

Commit

Permalink
Avoid ServiceCIDR flapping on agent start (antrea-io#5017)
Browse files Browse the repository at this point in the history
The previous implementation always generated intermediate values for
ServiceCIDR on agent start, which may interrupt the Service traffic and
causes difficulty for cleaning up stale routes as the value calculated
at one point may not be reliable to identify all stale routes.

This commit waits for the Service Informer to be synced first,
and calculates the ServiceCIDR based on all Services. Ideally the
Service route won't change in most cases, and hence avoid the above
issues.

Besides, it fixes an issue that stale routes on Linux were not cleaned
up correctly due to incorrect check.

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn authored and ceclinux committed May 30, 2023
1 parent eb37291 commit ea9b09c
Show file tree
Hide file tree
Showing 7 changed files with 275 additions and 36 deletions.
3 changes: 3 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,9 @@ func run(o *Options) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Must start after registering all event handlers.
go serviceCIDRProvider.Run(stopCh)

// Get all available NodePort addresses.
var nodePortAddressesIPv4, nodePortAddressesIPv6 []net.IP
if o.config.AntreaProxy.ProxyAll {
Expand Down
18 changes: 17 additions & 1 deletion pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
binding "antrea.io/antrea/pkg/ovs/openflow"
"antrea.io/antrea/pkg/ovs/ovsconfig"
"antrea.io/antrea/pkg/util/env"
utilip "antrea.io/antrea/pkg/util/ip"
)

const (
Expand Down Expand Up @@ -1363,7 +1364,22 @@ func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error {
return fmt.Errorf("error listing ip routes: %w", err)
}
for i := 0; i < len(routes); i++ {
if routes[i].Gw.Equal(gw) && !routes[i].Dst.IP.Equal(serviceCIDR.IP) && routes[i].Dst.Contains(serviceCIDR.IP) {
// Not the routes we are interested in.
if !routes[i].Gw.Equal(gw) {
continue
}
// It's the latest route we just installed.
if utilip.IPNetEqual(routes[i].Dst, serviceCIDR) {
continue
}
// The route covers the desired route. It was installed when the calculated ServiceCIDR was larger than the
// current one, which could happen after some Services are deleted.
if utilip.IPNetContains(routes[i].Dst, serviceCIDR) {
staleRoutes = append(staleRoutes, &routes[i])
}
// The desired route covers the route. It was installed when the calculated ServiceCIDR was smaller than the
// current one, which could happen after some Services are added.
if utilip.IPNetContains(serviceCIDR, routes[i].Dst) {
staleRoutes = append(staleRoutes, &routes[i])
}
}
Expand Down
24 changes: 24 additions & 0 deletions pkg/agent/route/route_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1332,6 +1332,30 @@ func TestAddServiceCIDRRoute(t *testing.T) {
})
},
},
{
name: "Add route for Service IPv4 CIDR and clean up stale routes",
curServiceIPv4CIDR: nil,
newServiceIPv4CIDR: ip.MustParseCIDR("10.96.0.0/28"),
expectedCalls: func(mockNetlink *netlinktest.MockInterfaceMockRecorder) {
mockNetlink.RouteReplace(&netlink.Route{
Dst: &net.IPNet{IP: net.ParseIP("10.96.0.0").To4(), Mask: net.CIDRMask(28, 32)},
Gw: config.VirtualServiceIPv4,
Scope: netlink.SCOPE_UNIVERSE,
LinkIndex: 10,
})
mockNetlink.RouteListFiltered(netlink.FAMILY_V4, &netlink.Route{LinkIndex: 10}, netlink.RT_FILTER_OIF).Return([]netlink.Route{
{Dst: ip.MustParseCIDR("10.96.0.0/24"), Gw: config.VirtualServiceIPv4},
{Dst: ip.MustParseCIDR("10.96.0.0/30"), Gw: config.VirtualServiceIPv4},
}, nil)
mockNetlink.RouteListFiltered(netlink.FAMILY_V6, &netlink.Route{LinkIndex: 10}, netlink.RT_FILTER_OIF).Return([]netlink.Route{}, nil)
mockNetlink.RouteDel(&netlink.Route{
Dst: ip.MustParseCIDR("10.96.0.0/24"), Gw: config.VirtualServiceIPv4,
})
mockNetlink.RouteDel(&netlink.Route{
Dst: ip.MustParseCIDR("10.96.0.0/30"), Gw: config.VirtualServiceIPv4,
})
},
},
{
name: "Update route for Service IPv4 CIDR",
curServiceIPv4CIDR: serviceIPv4CIDR1,
Expand Down
118 changes: 89 additions & 29 deletions pkg/agent/servicecidr/discoverer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,15 @@ import (
"time"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
coreinformers "k8s.io/client-go/informers/core/v1"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
utilnet "k8s.io/utils/net"
"k8s.io/utils/strings/slices"

"antrea.io/antrea/pkg/agent/util"
)
Expand All @@ -42,17 +47,22 @@ type Interface interface {
AddEventHandler(handler EventHandler)
}

type discoverer struct {
type Discoverer struct {
serviceInformer cache.SharedIndexInformer
serviceLister corelisters.ServiceLister
sync.RWMutex
serviceIPv4CIDR *net.IPNet
serviceIPv6CIDR *net.IPNet
eventHandlers []EventHandler
// queue maintains the Service objects that need to be synced.
queue workqueue.Interface
}

func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) Interface {
d := &discoverer{
func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) *Discoverer {
d := &Discoverer{
serviceInformer: serviceInformer.Informer(),
serviceLister: serviceInformer.Lister(),
queue: workqueue.New(),
}
d.serviceInformer.AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
Expand All @@ -64,7 +74,37 @@ func NewServiceCIDRDiscoverer(serviceInformer coreinformers.ServiceInformer) Int
return d
}

func (d *discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) {
func (d *Discoverer) Run(stopCh <-chan struct{}) {
defer d.queue.ShutDown()

klog.Info("Starting ServiceCIDRDiscoverer")
defer klog.Info("Stopping ServiceCIDRDiscoverer")
if !cache.WaitForCacheSync(stopCh, d.serviceInformer.HasSynced) {
return
}
svcs, _ := d.serviceLister.List(labels.Everything())
d.updateServiceCIDR(svcs...)

go func() {
for {
obj, quit := d.queue.Get()
if quit {
return
}
nn := obj.(types.NamespacedName)

svc, _ := d.serviceLister.Services(nn.Namespace).Get(nn.Name)
// Ignore it if not found.
if svc != nil {
d.updateServiceCIDR(svc)
}
d.queue.Done(obj)
}
}()
<-stopCh
}

func (d *Discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) {
d.RLock()
defer d.RUnlock()
if isIPv6 {
Expand All @@ -79,43 +119,48 @@ func (d *discoverer) GetServiceCIDR(isIPv6 bool) (*net.IPNet, error) {
return d.serviceIPv4CIDR, nil
}

func (d *discoverer) AddEventHandler(handler EventHandler) {
func (d *Discoverer) AddEventHandler(handler EventHandler) {
d.eventHandlers = append(d.eventHandlers, handler)
}

func (d *discoverer) addService(obj interface{}) {
svc := obj.(*corev1.Service)
d.updateServiceCIDR(svc)
}

func (d *discoverer) updateService(_, obj interface{}) {
func (d *Discoverer) addService(obj interface{}) {
svc := obj.(*corev1.Service)
d.updateServiceCIDR(svc)
klog.V(2).InfoS("Processing Service ADD event", "Service", klog.KObj(svc))
d.queue.Add(types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name})
}

func (d *discoverer) updateServiceCIDR(svc *corev1.Service) {
clusterIPs := svc.Spec.ClusterIPs
if len(clusterIPs) == 0 {
return
func (d *Discoverer) updateService(old, obj interface{}) {
oldSvc := old.(*corev1.Service)
curSvc := obj.(*corev1.Service)
klog.V(2).InfoS("Processing Service UPDATE event", "Service", klog.KObj(curSvc))
if !slices.Equal(oldSvc.Spec.ClusterIPs, curSvc.Spec.ClusterIPs) {
d.queue.Add(types.NamespacedName{Namespace: curSvc.Namespace, Name: curSvc.Name})
}
}

func (d *Discoverer) updateServiceCIDR(svcs ...*corev1.Service) {
var newServiceCIDRs []*net.IPNet
klog.V(2).InfoS("Processing Service ADD or UPDATE event", "Service", klog.KObj(svc))
func() {
d.Lock()
defer d.Unlock()
for _, clusterIPStr := range clusterIPs {

curServiceIPv4CIDR, curServiceIPv6CIDR := func() (*net.IPNet, *net.IPNet) {
d.RLock()
defer d.RUnlock()
return d.serviceIPv4CIDR, d.serviceIPv6CIDR
}()

updated := false
for _, svc := range svcs {
for _, clusterIPStr := range svc.Spec.ClusterIPs {
clusterIP := net.ParseIP(clusterIPStr)
if clusterIP == nil {
klog.V(2).InfoS("Skip invalid ClusterIP", "ClusterIP", clusterIPStr)
continue
}
isIPv6 := utilnet.IsIPv6(clusterIP)

curServiceCIDR := d.serviceIPv4CIDR
curServiceCIDR := curServiceIPv4CIDR
mask := net.IPv4len * 8
if isIPv6 {
curServiceCIDR = d.serviceIPv6CIDR
curServiceCIDR = curServiceIPv6CIDR
mask = net.IPv6len * 8
}

Expand All @@ -138,16 +183,31 @@ func (d *discoverer) updateServiceCIDR(svc *corev1.Service) {
}

if isIPv6 {
d.serviceIPv6CIDR = newServiceCIDR
klog.V(4).InfoS("Service IPv6 CIDR was updated", "ServiceCIDR", newServiceCIDR)
curServiceIPv6CIDR = newServiceCIDR
} else {
d.serviceIPv4CIDR = newServiceCIDR
klog.V(4).InfoS("Service IPv4 CIDR was updated", "ServiceCIDR", newServiceCIDR)
curServiceIPv4CIDR = newServiceCIDR
}
newServiceCIDRs = append(newServiceCIDRs, newServiceCIDR)
updated = true
}
}()
}

if !updated {
return
}
func() {
d.Lock()
defer d.Unlock()
if d.serviceIPv4CIDR != curServiceIPv4CIDR {
d.serviceIPv4CIDR = curServiceIPv4CIDR
klog.InfoS("Service IPv4 CIDR was updated", "ServiceCIDR", curServiceIPv4CIDR)
newServiceCIDRs = append(newServiceCIDRs, curServiceIPv4CIDR)
}
if d.serviceIPv6CIDR != curServiceIPv6CIDR {
d.serviceIPv6CIDR = curServiceIPv6CIDR
klog.InfoS("Service IPv6 CIDR was updated", "ServiceCIDR", curServiceIPv6CIDR)
newServiceCIDRs = append(newServiceCIDRs, curServiceIPv6CIDR)
}
}()
for _, handler := range d.eventHandlers {
handler(newServiceCIDRs)
}
Expand Down
15 changes: 9 additions & 6 deletions pkg/agent/servicecidr/discoverer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func TestServiceCIDRProvider(t *testing.T) {
defer close(stopCh)
informerFactory.Start(stopCh)
informerFactory.WaitForCacheSync(stopCh)
go serviceCIDRProvider.Run(stopCh)

check := func(expectedServiceCIDR string, isServiceCIDRUpdated, isIPv6 bool) {
if isServiceCIDRUpdated {
Expand All @@ -84,15 +85,18 @@ func TestServiceCIDRProvider(t *testing.T) {
}
}
serviceCIDR, err := serviceCIDRProvider.GetServiceCIDR(isIPv6)
assert.NoError(t, err)
assert.Equal(t, expectedServiceCIDR, serviceCIDR.String())
if expectedServiceCIDR != "" {
assert.NoError(t, err)
assert.Equal(t, expectedServiceCIDR, serviceCIDR.String())
} else {
assert.ErrorContains(t, err, "CIDR is not available yet")
}
}

svc := makeService("ns1", "svc0", "None", corev1.ProtocolTCP)
_, err := client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = serviceCIDRProvider.GetServiceCIDR(false)
assert.ErrorContains(t, err, "Service IPv4 CIDR is not available yet")
check("", false, false)

svc = makeService("ns1", "svc1", "10.10.0.1", corev1.ProtocolTCP)
_, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{})
Expand Down Expand Up @@ -121,8 +125,7 @@ func TestServiceCIDRProvider(t *testing.T) {
svc = makeService("ns1", "svc60", "None", corev1.ProtocolTCP)
_, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{})
assert.NoError(t, err)
_, err = serviceCIDRProvider.GetServiceCIDR(true)
assert.ErrorContains(t, err, "Service IPv6 CIDR is not available yet")
check("", false, true)

svc = makeService("ns1", "svc61", "10::1", corev1.ProtocolTCP)
_, err = client.CoreV1().Services("ns1").Create(context.TODO(), svc, metav1.CreateOptions{})
Expand Down
43 changes: 43 additions & 0 deletions pkg/util/ip/ip.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,49 @@ func MustParseCIDR(cidr string) *net.IPNet {
return ipNet
}

// IPNetEqual returns if the provided IPNets are the same subnet.
func IPNetEqual(ipNet1, ipNet2 *net.IPNet) bool {
if ipNet1 == nil && ipNet2 == nil {
return true
}
if ipNet1 == nil || ipNet2 == nil {
return false
}
if !bytes.Equal(ipNet1.Mask, ipNet2.Mask) {
return false
}
if !ipNet1.IP.Equal(ipNet2.IP) {
return false
}
return true
}

// IPNetContains returns if the first IPNet contains the second IPNet.
// For example:
//
// 10.0.0.0/24 contains 10.0.0.0/24.
// 10.0.0.0/24 contains 10.0.0.0/25.
// 10.0.0.0/24 contains 10.0.0.128/25.
// 10.0.0.0/24 does not contain 10.0.0.0/23.
// 10.0.0.0/24 does not contain 10.0.1.0/25.
func IPNetContains(ipNet1, ipNet2 *net.IPNet) bool {
if ipNet1 == nil || ipNet2 == nil {
return false
}
ones1, bits1 := ipNet1.Mask.Size()
ones2, bits2 := ipNet2.Mask.Size()
if bits1 != bits2 {
return false
}
if ones1 > ones2 {
return false
}
if !ipNet1.Contains(ipNet2.IP) {
return false
}
return true
}

func MustIPv6(s string) net.IP {
ip := net.ParseIP(s)
if !utilnet.IsIPv6(ip) {
Expand Down
Loading

0 comments on commit ea9b09c

Please sign in to comment.