From ff78c5bae28a2f50aace40a45da8b67dff5eb730 Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Wed, 8 Mar 2023 11:24:52 +0800 Subject: [PATCH] Multi-cluster support with more modes In order to support multi-cluster traffic when the member cluster is deployed with networkPolicyOnly, noEcap or hybrid mode, antrea-agent will be responsible for the following things: 1. Create tunnel interface `antrea-tun0` for cross-cluster traffic 2. Watch all Pods on the Gateway and set up one rule per Pod in L3Fowarding table as long as the Pod is running on a regular Node instead of the Gateway. 3. Update container interface's MTU with the tunnel header size deducted. Signed-off-by: Lan Luo --- build/yamls/antrea-prometheus.yml | 2 - cmd/antrea-agent/agent.go | 58 ++- cmd/antrea-agent/options.go | 25 +- cmd/antrea-agent/options_linux_test.go | 23 +- .../leader/resourceexport_controller.go | 8 +- pkg/agent/agent.go | 37 +- pkg/agent/agent_test.go | 5 +- .../interface_configuration_linux.go | 38 ++ .../interface_configuration_linux_test.go | 117 ++++- .../interface_configuration_windows.go | 6 + pkg/agent/cniserver/interfaces.go | 1 + .../cniserver/pod_configuration_linux_test.go | 4 + pkg/agent/cniserver/server.go | 17 +- pkg/agent/cniserver/server_linux_test.go | 1 + pkg/agent/cniserver/server_test.go | 1 + pkg/agent/config/node_config.go | 46 +- pkg/agent/config/node_config_test.go | 71 +++ pkg/agent/multicluster/mc_route_controller.go | 112 ++-- .../multicluster/mc_route_controller_test.go | 97 ++-- .../multicluster/pod_route_controller.go | 478 ++++++++++++++++++ .../multicluster/pod_route_controller_test.go | 349 +++++++++++++ ...stretched_networkpolicy_controller_test.go | 23 +- pkg/agent/openflow/client.go | 55 +- pkg/agent/openflow/multicluster.go | 21 + pkg/agent/openflow/testing/mock_openflow.go | 28 + test/integration/agent/cniserver_test.go | 8 +- 26 files changed, 1421 insertions(+), 210 deletions(-) create mode 100644 pkg/agent/multicluster/pod_route_controller.go create mode 100644 pkg/agent/multicluster/pod_route_controller_test.go diff --git a/build/yamls/antrea-prometheus.yml b/build/yamls/antrea-prometheus.yml index 84a5bbbb7b7..8a94b9185e7 100644 --- a/build/yamls/antrea-prometheus.yml +++ b/build/yamls/antrea-prometheus.yml @@ -154,7 +154,6 @@ spec: configMap: defaultMode: 420 name: prometheus-server-conf - - name: prometheus-storage-volume emptyDir: {} --- @@ -166,7 +165,6 @@ metadata: annotations: prometheus.io/scrape: 'true' prometheus.io/port: '9090' - spec: selector: app: prometheus-server diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index 03418e5ccd6..dbfc01e8e5c 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -75,7 +75,6 @@ import ( "antrea.io/antrea/pkg/ovs/ovsctl" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/channel" - "antrea.io/antrea/pkg/util/env" "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/version" ) @@ -177,6 +176,7 @@ func run(o *Options) error { encryptionMode = config.TrafficEncryptionModeIPSec } _, ipsecAuthenticationMode := config.GetIPsecAuthenticationModeFromStr(o.config.IPsec.AuthenticationMode) + networkConfig := &config.NetworkConfig{ TunnelType: ovsconfig.TunnelType(o.config.TunnelType), TunnelPort: o.config.TunnelPort, @@ -188,6 +188,7 @@ func run(o *Options) error { IPsecConfig: config.IPsecConfig{ AuthenticationMode: ipsecAuthenticationMode, }, + EnableMulticlusterGW: enableMulticlusterGW, } wireguardConfig := &config.WireGuardConfig{ @@ -323,31 +324,48 @@ func run(o *Options) error { ) } - var mcRouteController *mcroute.MCRouteController + var mcDefaultRouteController *mcroute.MCDefaultRouteController var mcStrechedNetworkPolicyController *mcroute.StretchedNetworkPolicyController + var mcPodRouteController *mcroute.MCPodRouteController var mcInformerFactory mcinformers.SharedInformerFactory + var mcInformerFactoryWithOption mcinformers.SharedInformerFactory + if enableMulticlusterGW { - mcNamespace := env.GetPodNamespace() - if o.config.Multicluster.Namespace != "" { - mcNamespace = o.config.Multicluster.Namespace + if !networkConfig.IPv4Enabled { + return fmt.Errorf("Antrea Mutli-cluster doesn't not support IPv6 only cluster") } - mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync) - gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() - ciImportInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports() - mcRouteController = mcroute.NewMCRouteController( + mcInformerFactoryWithOption = mcinformers.NewSharedInformerFactoryWithOptions(mcClient, + informerDefaultResync, + mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = fields.Set{"metadata.namespace": o.config.Multicluster.Namespace}.String() + }), + ) + mcDefaultRouteController = mcroute.NewMCDefaultRouteController( mcClient, - gwInformer, - ciImportInformer, + mcInformerFactoryWithOption, ofClient, ovsBridgeClient, ifaceStore, nodeConfig, - mcNamespace, + o.config.Multicluster.Namespace, o.config.Multicluster.EnableStretchedNetworkPolicy, o.config.Multicluster.EnablePodToPodConnectivity, ) + if networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap { + mcPodRouteController = mcroute.NewMCPodRouteController( + k8sClient, + mcClient, + mcInformerFactoryWithOption.Multicluster().V1alpha1().Gateways().Informer(), + ofClient, + ovsBridgeClient, + ifaceStore, + nodeConfig, + o.config.Multicluster.Namespace, + ) + } } if enableMulticlusterNP { + mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync) labelIDInformer := mcInformerFactory.Multicluster().V1alpha1().LabelIdentities() mcStrechedNetworkPolicyController = mcroute.NewMCAgentStretchedNetworkPolicyController( ofClient, @@ -490,11 +508,9 @@ func run(o *Options) error { var cniPodInfoStore cnipodcache.CNIPodInfoStore var externalNodeController *externalnode.ExternalNodeController var localExternalNodeInformer cache.SharedIndexInformer + if o.nodeType == config.K8sNode { - isChaining := false - if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { - isChaining = true - } + isChaining := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() cniServer = cniserver.New( o.config.CNISocket, o.config.HostProcPathPrefix, @@ -505,6 +521,7 @@ func run(o *Options) error { enableBridgingMode, enableAntreaIPAM, o.config.DisableTXChecksumOffload, + networkConfig, networkReadyCh) if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { @@ -751,10 +768,15 @@ func run(o *Options) error { } if enableMulticlusterGW { - mcInformerFactory.Start(stopCh) - go mcRouteController.Run(stopCh) + mcInformerFactoryWithOption.Start(stopCh) + go mcDefaultRouteController.Run(stopCh) + if mcPodRouteController != nil { + go mcPodRouteController.Run(stopCh) + } } + if enableMulticlusterNP { + mcInformerFactory.Start(stopCh) go mcStrechedNetworkPolicyController.Run(stopCh) } diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index b1f09c1552d..cdfe606a480 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -307,7 +307,7 @@ func (o *Options) validateAntreaIPAMConfig() error { return nil } -func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeType) error { +func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeType, encryptionMode config.TrafficEncryptionModeType) error { if !o.config.Multicluster.EnableGateway && !o.config.Multicluster.EnableStretchedNetworkPolicy { return nil } @@ -321,9 +321,8 @@ func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeTy return fmt.Errorf("Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy") } - if encapMode != config.TrafficEncapModeEncap { - // Only Encap mode is supported for Multi-cluster Gateway. - return fmt.Errorf("Multicluster is only applicable to the %s mode", config.TrafficEncapModeEncap) + if encapMode.SupportsEncap() && encryptionMode == config.TrafficEncryptionModeWireGuard { + return fmt.Errorf("Multi-cluster Gateway doesn't support in-cluster WireGuard encryption") } return nil } @@ -404,11 +403,17 @@ func (o *Options) setK8sNodeDefaultOptions() { } } - if features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable { - // Multicluster.Enable is deprecated but it may be set by an earlier version - // deployment manifest. If it is set to true, pass the value to - // Multicluster.EnableGateway. - o.config.Multicluster.EnableGateway = true + if features.DefaultFeatureGate.Enabled(features.Multicluster) { + if o.config.Multicluster.Enable { + // Multicluster.Enable is deprecated but it may be set by an earlier version + // deployment manifest. If it is set to true, pass the value to + // Multicluster.EnableGateway. + o.config.Multicluster.EnableGateway = true + } + + if o.config.Multicluster.EnableGateway && o.config.Multicluster.Namespace == "" { + o.config.Multicluster.Namespace = env.GetPodNamespace() + } } if features.DefaultFeatureGate.Enabled(features.Egress) { @@ -488,7 +493,7 @@ func (o *Options) validateK8sNodeOptions() error { } } } - if err := o.validateMulticlusterConfig(encapMode); err != nil { + if err := o.validateMulticlusterConfig(encapMode, encryptionMode); err != nil { return err } diff --git a/cmd/antrea-agent/options_linux_test.go b/cmd/antrea-agent/options_linux_test.go index 19f4e1444b1..dfa7e10bb35 100644 --- a/cmd/antrea-agent/options_linux_test.go +++ b/cmd/antrea-agent/options_linux_test.go @@ -28,11 +28,12 @@ import ( func TestMulticlusterOptions(t *testing.T) { tests := []struct { - name string - mcConfig agentconfig.MulticlusterConfig - featureGate bool - encapMode string - expectedErr string + name string + mcConfig agentconfig.MulticlusterConfig + featureGate bool + encapMode string + encryptionMode string + expectedErr string }{ { name: "empty input", @@ -80,13 +81,14 @@ func TestMulticlusterOptions(t *testing.T) { expectedErr: "Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy", }, { - name: "NoEncap", + name: "Multicluster with in-cluster WireGuard Encryption", mcConfig: agentconfig.MulticlusterConfig{ EnableGateway: true, }, - featureGate: true, - encapMode: "NoEncap", - expectedErr: "Multicluster is only applicable to the encap mode", + featureGate: true, + encapMode: "encap", + encryptionMode: "wireguard", + expectedErr: "Multi-cluster Gateway doesn't support in-cluster WireGuard encryption", }, { name: "NoEncap and feature disabled", @@ -104,6 +106,9 @@ func TestMulticlusterOptions(t *testing.T) { TrafficEncapMode: tt.encapMode, Multicluster: tt.mcConfig, } + if tt.encryptionMode != "" { + config.TrafficEncryptionMode = tt.encryptionMode + } o := &Options{config: config} features.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates) o.setDefaults() diff --git a/multicluster/controllers/multicluster/leader/resourceexport_controller.go b/multicluster/controllers/multicluster/leader/resourceexport_controller.go index d57e57c0a7f..d76b51a4adb 100644 --- a/multicluster/controllers/multicluster/leader/resourceexport_controller.go +++ b/multicluster/controllers/multicluster/leader/resourceexport_controller.go @@ -356,12 +356,12 @@ func (r *ResourceExportReconciler) refreshEndpointsResourceImport( } if len(svcResExport.Status.Conditions) > 0 { if svcResExport.Status.Conditions[0].Status != corev1.ConditionTrue { - return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() + - "has not been converged successfully, retry later") + err := fmt.Errorf("the Service type of ResourceExport %s has not been converged successfully, retry later", svcResExportName.String()) + return newResImport, false, err } } else { - return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() + - "has not been converged yet, retry later") + err := fmt.Errorf("the Service type of ResourceExport %s has not been converged yet, retry later", svcResExportName.String()) + return newResImport, false, err } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index f32c41c0b37..c6fe9a055e1 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -716,12 +716,12 @@ func (i *Initializer) setupGatewayInterface() error { // Idempotent operation to set the gateway's MTU: we perform this operation regardless of // whether the gateway interface already exists, as the desired MTU may change across // restarts. - klog.V(4).Infof("Setting gateway interface %s MTU to %d", i.hostGateway, i.nodeConfig.NodeMTU) + klog.V(4).Infof("Setting gateway interface %s MTU to %d", i.hostGateway, i.networkConfig.InterfaceMTU) if err := i.configureGatewayInterface(gatewayIface); err != nil { return err } - if err := i.setInterfaceMTU(i.hostGateway, i.nodeConfig.NodeMTU); err != nil { + if err := i.setInterfaceMTU(i.hostGateway, i.networkConfig.InterfaceMTU); err != nil { return err } @@ -819,10 +819,11 @@ func (i *Initializer) setupDefaultTunnelInterface() error { // It's not necessary for new Linux kernel versions with the following patch: // https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063. shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel) + createTunnelInterface := i.networkConfig.NeedsTunnelInterface() // Check the default tunnel port. if portExists { - if i.networkConfig.TrafficEncapMode.SupportsEncap() && + if createTunnelInterface && tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType && tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort && tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) { @@ -839,7 +840,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { } if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil { - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + if createTunnelInterface { return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err) } klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err) @@ -850,7 +851,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { } // Create the default tunnel port and interface. - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + if createTunnelInterface { if tunnelPortName != defaultTunInterfaceName { // Reset the tunnel interface name to the desired name before // recreating the tunnel port and interface. @@ -1012,12 +1013,11 @@ func (i *Initializer) initK8sNodeLocalConfig(nodeName string) error { WireGuardConfig: i.wireGuardConfig, } - mtu, err := i.getNodeMTU(transportInterface) + i.networkConfig.InterfaceMTU, err = i.getInterfaceMTU(transportInterface) if err != nil { return err } - i.nodeConfig.NodeMTU = mtu - klog.InfoS("Setting Node MTU", "MTU", mtu) + klog.InfoS("Getting Interface MTU", "MTU", i.networkConfig.InterfaceMTU) if i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { return nil @@ -1164,27 +1164,20 @@ func getRoundInfo(bridgeClient ovsconfig.OVSBridgeClient) types.RoundInfo { return roundInfo } -func (i *Initializer) getNodeMTU(transportInterface *net.Interface) (int, error) { +func (i *Initializer) getInterfaceMTU(transportInterface *net.Interface) (int, error) { if i.mtu != 0 { return i.mtu, nil } mtu := transportInterface.MTU - // Make sure mtu is set on the interface. + // Make sure MTU is set on the interface. if mtu <= 0 { return 0, fmt.Errorf("Failed to fetch Node MTU : %v", mtu) } - if i.networkConfig.TrafficEncapMode.SupportsEncap() { - if i.networkConfig.TunnelType == ovsconfig.VXLANTunnel { - mtu -= config.VXLANOverhead - } else if i.networkConfig.TunnelType == ovsconfig.GeneveTunnel { - mtu -= config.GeneveOverhead - } else if i.networkConfig.TunnelType == ovsconfig.GRETunnel { - mtu -= config.GREOverhead - } - if i.nodeConfig.NodeIPv6Addr != nil { - mtu -= config.IPv6ExtraOverhead - } - } + + isIPv6 := i.nodeConfig.NodeIPv6Addr != nil + i.networkConfig.CalculateMTUDeduction(isIPv6) + mtu -= i.networkConfig.MTUDeduction + if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec { mtu -= config.IPSecESPOverhead } diff --git a/pkg/agent/agent_test.go b/pkg/agent/agent_test.go index 3309a6c897f..e4edb15dca2 100644 --- a/pkg/agent/agent_test.go +++ b/pkg/agent/agent_test.go @@ -385,7 +385,6 @@ func TestInitNodeLocalConfig(t *testing.T) { NodeTransportInterfaceName: ipDevice.Name, NodeTransportIPv4Addr: nodeIPNet, NodeTransportInterfaceMTU: tt.expectedNodeLocalIfaceMTU, - NodeMTU: tt.expectedMTU, UplinkNetConfig: new(config.AdapterNetConfig), } @@ -596,12 +595,12 @@ func TestSetupGatewayInterface(t *testing.T) { Type: config.K8sNode, OVSBridge: "br-int", PodIPv4CIDR: podCIDR, - NodeMTU: 1450, } networkConfig := &config.NetworkConfig{ TrafficEncapMode: config.TrafficEncapModeEncap, TunnelType: ovsconfig.GeneveTunnel, TunnelCsum: false, + InterfaceMTU: 1450, } mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(controller) @@ -624,7 +623,7 @@ func TestSetupGatewayInterface(t *testing.T) { mockOVSBridgeClient.EXPECT().CreateInternalPort(initializer.hostGateway, ofport, mock.Any(), mock.Any()).Return(portUUID, nil) mockOVSBridgeClient.EXPECT().SetInterfaceMAC(initializer.hostGateway, fakeMAC).Return(nil) mockOVSBridgeClient.EXPECT().GetOFPort(initializer.hostGateway, false).Return(ofport, nil) - mockOVSBridgeClient.EXPECT().SetInterfaceMTU(initializer.hostGateway, nodeConfig.NodeMTU).Return(nil) + mockOVSBridgeClient.EXPECT().SetInterfaceMTU(initializer.hostGateway, networkConfig.InterfaceMTU).Return(nil) err := initializer.setupGatewayInterface() assert.NoError(t, err) } diff --git a/pkg/agent/cniserver/interface_configuration_linux.go b/pkg/agent/cniserver/interface_configuration_linux.go index 80094723bb0..d20724087fe 100644 --- a/pkg/agent/cniserver/interface_configuration_linux.go +++ b/pkg/agent/cniserver/interface_configuration_linux.go @@ -52,6 +52,7 @@ var ( ethtoolTXHWCsumOff = ethtool.EthtoolTXHWCsumOff renameInterface = util.RenameInterface netInterfaceByName = net.InterfaceByName + netInterfaceByIndex = net.InterfaceByIndex arpingGratuitousARPOverIface = arping.GratuitousARPOverIface ndpGratuitousNDPOverIface = ndp.GratuitousNDPOverIface ipValidateExpectedInterfaceIPs = ip.ValidateExpectedInterfaceIPs @@ -372,6 +373,43 @@ func (ic *ifConfigurator) configureContainerLink( } } +func (ic *ifConfigurator) changeContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error { + var peerIdx int + if err := nsWithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error { + link, err := ic.netlink.LinkByName(containerIFDev) + if err != nil { + return fmt.Errorf("failed to find interface %s in container %s: %v", containerIFDev, containerNetNS, err) + } + _, peerIdx, err = ipGetVethPeerIfindex(containerIFDev) + if err != nil { + return fmt.Errorf("failed to get peer index for dev %s in container %s: %w", containerIFDev, containerNetNS, err) + } + err = ic.netlink.LinkSetMTU(link, link.Attrs().MTU-mtuDeduction) + if err != nil { + return fmt.Errorf("failed to set MTU for interface %s in container %s: %v", containerIFDev, containerNetNS, err) + } + return nil + }); err != nil { + return err + } + + peerIntf, err := netInterfaceByIndex(peerIdx) + if err != nil { + return fmt.Errorf("failed to get host interface for index %d: %w", peerIdx, err) + } + + hostInterfaceName := peerIntf.Name + link, err := ic.netlink.LinkByName(hostInterfaceName) + if err != nil { + return fmt.Errorf("failed to find host interface %s: %v", hostInterfaceName, err) + } + err = ic.netlink.LinkSetMTU(link, link.Attrs().MTU-mtuDeduction) + if err != nil { + return fmt.Errorf("failed to set MTU for host interface %s: %v", hostInterfaceName, err) + } + return nil +} + func (ic *ifConfigurator) removeContainerLink(containerID, hostInterfaceName string) error { klog.V(2).Infof("Deleting veth devices for container %s", containerID) // Don't return an error if the device is already removed as CniDel can be called multiple times. diff --git a/pkg/agent/cniserver/interface_configuration_linux_test.go b/pkg/agent/cniserver/interface_configuration_linux_test.go index 1d5bef45c51..abd0d56cd54 100644 --- a/pkg/agent/cniserver/interface_configuration_linux_test.go +++ b/pkg/agent/cniserver/interface_configuration_linux_test.go @@ -264,6 +264,110 @@ func TestConfigureContainerLink(t *testing.T) { } } +func TestChangeContainerMTU(t *testing.T) { + controller := gomock.NewController(t) + defer controller.Finish() + + fakeNetlink := netlinktest.NewMockInterface(controller) + hostIfaceName := "pair0" + containerIfaceName := "eth0" + + containerInterfaceLink := &netlink.Dummy{LinkAttrs: netlink.LinkAttrs{Index: 2, MTU: mtu, HardwareAddr: containerVethMac, Name: containerIfaceName, Flags: net.FlagUp}} + hostInterfaceLink := &netlink.Dummy{ + LinkAttrs: netlink.LinkAttrs{Index: 2, MTU: mtu, HardwareAddr: hostVethMac, Name: hostIfaceName, Flags: net.FlagUp}, + } + notFoundErr := fmt.Errorf("not found") + + tests := []struct { + name string + containerLink netlink.Link + hostLink netlink.Link + getPeerErr error + getContainerLinkErr error + getHostLinkErr error + getInfByIdxErr error + setContainerMTUErr error + setPairInterfaceMTUErr error + expectedErrStr string + }{ + { + name: "change MTU successfully", + containerLink: containerInterfaceLink, + hostLink: hostInterfaceLink, + }, + { + name: "failed to change MTU due to interface not found", + getContainerLinkErr: notFoundErr, + expectedErrStr: "failed to find interface eth0 in container", + }, + { + name: "failed to change MTU due to peer interface not found", + getPeerErr: notFoundErr, + expectedErrStr: "failed to get peer index for dev", + }, + { + name: "failed to change MTU due to host interface not found by index", + containerLink: containerInterfaceLink, + getInfByIdxErr: notFoundErr, + expectedErrStr: "failed to get host interface for index", + }, + { + name: "failed to change MTU due to host link not found by name", + containerLink: containerInterfaceLink, + getHostLinkErr: notFoundErr, + expectedErrStr: "failed to find host interface pair0", + }, + { + name: "failed to change MTU due to container interface MTU update failure", + containerLink: containerInterfaceLink, + setContainerMTUErr: fmt.Errorf("failed to set MTU"), + expectedErrStr: "failed to set MTU for interface eth0 in container", + }, + { + name: "failed to change MTU due to pair interface MTU update failure", + containerLink: containerInterfaceLink, + hostLink: hostInterfaceLink, + setPairInterfaceMTUErr: fmt.Errorf("failed to set MTU"), + expectedErrStr: "failed to set MTU for host interface", + }, + } + + defer mockWithNetNSPath()() + testIfConfigurator := newTestIfConfigurator(false, fakeNetlink, nil) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + containerNS := createNS(t, false) + defer containerNS.clear() + defer mockGetInterfaceByName(nil, 2)() + defer mockIpGetVethPeerIfindex(2, tc.getPeerErr)() + defer mockGetInterfaceByIndex(tc.getInfByIdxErr, 2, hostIfaceName)() + + if tc.containerLink != nil { + fakeNetlink.EXPECT().LinkByName(containerIfaceName).Return(tc.containerLink, nil).Times(1) + fakeNetlink.EXPECT().LinkSetMTU(tc.containerLink, gomock.Any()).Return(tc.setContainerMTUErr).Times(1) + } else { + fakeNetlink.EXPECT().LinkByName(containerIfaceName).Return(nil, tc.getContainerLinkErr).Times(1) + } + + if tc.hostLink != nil { + fakeNetlink.EXPECT().LinkByName(hostIfaceName).Return(tc.hostLink, nil).Times(1) + fakeNetlink.EXPECT().LinkSetMTU(tc.hostLink, gomock.Any()).Return(tc.setPairInterfaceMTUErr).Times(1) + } + if tc.getHostLinkErr != nil { + fakeNetlink.EXPECT().LinkByName(hostIfaceName).Return(nil, tc.getHostLinkErr).Times(1) + } + + err := testIfConfigurator.changeContainerMTU(containerNS.Path(), containerIfaceName, 50) + if tc.expectedErrStr != "" { + assert.Error(t, err) + assert.Contains(t, err.Error(), tc.expectedErrStr) + } else { + assert.NoError(t, err) + } + }) + } +} + func TestAdvertiseContainerAddr(t *testing.T) { controller := gomock.NewController(t) defer controller.Finish() @@ -400,7 +504,6 @@ func TestCheckContainerInterface(t *testing.T) { containerIPs []*current.IPConfig containerIface *current.Interface containerLink netlink.Link - peerIndex int getPeerErr error getNetDevice bool getDeviceErr error @@ -487,7 +590,7 @@ func TestCheckContainerInterface(t *testing.T) { t.Run(tc.name, func(t *testing.T) { defer mockIpValidateExpectedInterfaceIPs(tc.validateIPErr)() defer mockIpValidateExpectedRoute(tc.validateRouteErr)() - defer mockIpGetVethPeerIfindex(tc.peerIndex, tc.getPeerErr)() + defer mockIpGetVethPeerIfindex(0, tc.getPeerErr)() containerNS := createNS(t, false) defer containerNS.clear() @@ -762,6 +865,16 @@ func mockGetInterfaceByName(netInterfaceError error, ifaceIndex int) func() { } } +func mockGetInterfaceByIndex(netInterfaceError error, ifaceIndex int, name string) func() { + originalNetInterfaceByIndex := netInterfaceByIndex + netInterfaceByIndex = func(idx int) (*net.Interface, error) { + return &net.Interface{Index: ifaceIndex, MTU: mtu, HardwareAddr: containerVethMac, Name: name, Flags: net.FlagUp}, netInterfaceError + } + return func() { + netInterfaceByIndex = originalNetInterfaceByIndex + } +} + func mockIpValidateExpectedInterfaceIPs(validateIPErr error) func() { originalIpValidateExpectedInterfaceIPs := ipValidateExpectedInterfaceIPs ipValidateExpectedInterfaceIPs = func(ifName string, resultIPs []*current.IPConfig) error { diff --git a/pkg/agent/cniserver/interface_configuration_windows.go b/pkg/agent/cniserver/interface_configuration_windows.go index 5f7a10e8095..8579f5a8469 100644 --- a/pkg/agent/cniserver/interface_configuration_windows.go +++ b/pkg/agent/cniserver/interface_configuration_windows.go @@ -189,6 +189,12 @@ func (ic *ifConfigurator) configureContainerLink( return nil } +// changeContainerMTU is only used for Antrea Multi-cluster with networkPolicyOnly +// mode, and this mode doesn't support Windows platform yet. +func (ic *ifConfigurator) changeContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error { + return nil +} + // createContainerLink creates HNSEndpoint using the IP configuration in the IPAM result. func (ic *ifConfigurator) createContainerLink(endpointName string, result *current.Result, containerID, podName, podNamespace string) (hostLink *hcsshim.HNSEndpoint, err error) { containerIP, err := findContainerIPConfig(result.IPs) diff --git a/pkg/agent/cniserver/interfaces.go b/pkg/agent/cniserver/interfaces.go index 204c2ad03e8..181aa4ac5eb 100644 --- a/pkg/agent/cniserver/interfaces.go +++ b/pkg/agent/cniserver/interfaces.go @@ -31,6 +31,7 @@ type podInterfaceConfigurator interface { getInterceptedInterfaces(sandbox string, containerNetNS string, containerIFDev string) (*current.Interface, *current.Interface, error) checkContainerInterface(containerNetns, containerID string, containerIface *current.Interface, containerIPs []*current.IPConfig, containerRoutes []*cnitypes.Route, sriovVFDeviceID string) (interface{}, error) addPostInterfaceCreateHook(containerID, endpointName string, containerAccess *containerAccessArbitrator, hook postInterfaceCreateHook) error + changeContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error } type SriovNet interface { diff --git a/pkg/agent/cniserver/pod_configuration_linux_test.go b/pkg/agent/cniserver/pod_configuration_linux_test.go index 917ae301784..a4e0b998aad 100644 --- a/pkg/agent/cniserver/pod_configuration_linux_test.go +++ b/pkg/agent/cniserver/pod_configuration_linux_test.go @@ -120,6 +120,10 @@ func (c *fakeInterfaceConfigurator) checkContainerInterface(containerNetns, cont return c.containerVethPair, nil } +func (c *fakeInterfaceConfigurator) changeContainerMTU(containerNetNS string, containerIFDev string, mtuDeduction int) error { + return nil +} + func newTestInterfaceConfigurator() *fakeInterfaceConfigurator { return &fakeInterfaceConfigurator{ containerMAC: "01:02:03:04:05:06", diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 0748bbc6910..72081b79e91 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -117,6 +117,7 @@ type CNIServer struct { enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool secondaryNetworkEnabled bool + networkConfig *config.NetworkConfig // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. networkReadyCh <-chan struct{} } @@ -192,7 +193,7 @@ func (s *CNIServer) loadNetworkConfig(request *cnipb.CniCmdRequest) (*CNIConfig, return nil, err } if cniConfig.MTU == 0 { - cniConfig.MTU = s.nodeConfig.NodeMTU + cniConfig.MTU = s.networkConfig.InterfaceMTU } cniConfig.CniCmdArgs = request.CniArgs klog.V(3).Infof("Load network configurations: %v", cniConfig) @@ -629,6 +630,7 @@ func New( kubeClient clientset.Interface, routeClient route.Interface, isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, + networkConfig *config.NetworkConfig, networkReadyCh <-chan struct{}, ) *CNIServer { return &CNIServer{ @@ -644,6 +646,7 @@ func New( enableBridgingMode: enableBridgingMode, disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, + networkConfig: networkConfig, networkReadyCh: networkReadyCh, } } @@ -720,6 +723,18 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to connect container %s to ovs: %w", cniConfig.ContainerId, err) } + // Packets for multi-cluster traffic will always be encapsulated and sent through + // tunnels. So here we need to reduce interface MTU for different tunnel types. + if s.networkConfig.MTUDeduction != 0 { + if err := s.podConfigurator.ifConfigurator.changeContainerMTU( + s.hostNetNsPath(cniConfig.Netns), + cniConfig.Ifname, + s.networkConfig.MTUDeduction, + ); err != nil { + return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to change container %s's MTU: %w", cniConfig.ContainerId, err) + } + } + // we return prevResult, which should be exactly what we received from // the runtime, potentially converted to the current CNI version used by // Antrea. diff --git a/pkg/agent/cniserver/server_linux_test.go b/pkg/agent/cniserver/server_linux_test.go index 7181e37dc4f..96deabdc481 100644 --- a/pkg/agent/cniserver/server_linux_test.go +++ b/pkg/agent/cniserver/server_linux_test.go @@ -211,6 +211,7 @@ func newMockCNIServer(t *testing.T, controller *gomock.Controller, ipamDriver ip if secondaryNetworkEnabled { cniServer.podConfigurator.podInfoStore = cnipodcache.NewCNIPodInfoStore() } + cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer } diff --git a/pkg/agent/cniserver/server_test.go b/pkg/agent/cniserver/server_test.go index 40f1ca56eae..0e171f16e84 100644 --- a/pkg/agent/cniserver/server_test.go +++ b/pkg/agent/cniserver/server_test.go @@ -628,6 +628,7 @@ func newCNIServer(t *testing.T) *CNIServer { } close(networkReadyCh) cniServer.supportedCNIVersions = buildVersionSet() + cniServer.networkConfig = &config.NetworkConfig{InterfaceMTU: 1450} return cniServer } diff --git a/pkg/agent/config/node_config.go b/pkg/agent/config/node_config.go index 044ac60d706..eae2635aa5e 100644 --- a/pkg/agent/config/node_config.go +++ b/pkg/agent/config/node_config.go @@ -34,14 +34,15 @@ const ( ) const ( - VXLANOverhead = 50 - GeneveOverhead = 50 - GREOverhead = 38 + vxlanOverhead = 50 + geneveOverhead = 50 + greOverhead = 38 + ipv6ExtraOverhead = 20 + WireGuardOverhead = 80 // IPsec ESP can add a maximum of 38 bytes to the packet including the ESP // header and trailer. - IPSecESPOverhead = 38 - IPv6ExtraOverhead = 20 + IPSecESPOverhead = 38 ) const ( @@ -161,10 +162,6 @@ type NodeConfig struct { NodeTransportIPv6Addr *net.IPNet // The original MTU of the Node's transport interface. NodeTransportInterfaceMTU int - // Set either via defaultMTU config in antrea.yaml or auto discovered. - // Auto discovery will use MTU value of the Node's primary interface. - // For Encap and Hybrid mode, Node MTU will be adjusted to account for encap header. - NodeMTU int // TunnelOFPort is the OpenFlow port number of tunnel interface allocated by OVS. With noEncap mode, the value is 0. TunnelOFPort uint32 // HostInterfaceOFPort is the OpenFlow port number of the host interface allocated by OVS. The host interface is the @@ -204,6 +201,14 @@ type NetworkConfig struct { TransportIfaceCIDRs []string IPv4Enabled bool IPv6Enabled bool + // MTUDeduction only counts IPv4 tunnel overhead, no IPsec and WireGuard overhead. + MTUDeduction int + // Set by the defaultMTU config option or auto discovered. + // Auto discovery will use MTU value of the Node's transport interface. + // For Encap and Hybrid mode, InterfaceMTU will be adjusted to account for + // encap header. + InterfaceMTU int + EnableMulticlusterGW bool } // IsIPv4Enabled returns true if the cluster network supports IPv4. Legal cases are: @@ -250,11 +255,34 @@ func (nc *NetworkConfig) NeedsTunnelToPeer(peerIP net.IP, localIP *net.IPNet) bo return nc.TrafficEncapMode == TrafficEncapModeEncap || (nc.TrafficEncapMode == TrafficEncapModeHybrid && !localIP.Contains(peerIP)) } +func (nc *NetworkConfig) NeedsTunnelInterface() bool { + return nc.TrafficEncapMode.SupportsEncap() || nc.EnableMulticlusterGW +} + // NeedsDirectRoutingToPeer returns true if Pod traffic to peer Node needs a direct route installed to the routing table. func (nc *NetworkConfig) NeedsDirectRoutingToPeer(peerIP net.IP, localIP *net.IPNet) bool { return (nc.TrafficEncapMode == TrafficEncapModeNoEncap || nc.TrafficEncapMode == TrafficEncapModeHybrid) && localIP.Contains(peerIP) } +func (nc *NetworkConfig) CalculateMTUDeduction(isIPv6 bool) { + var mtuDeduction int + // When Multi-cluster Gateway is enabled, we need to reduce MTU for potential cross-cluster traffic. + if nc.TrafficEncapMode.SupportsEncap() || nc.EnableMulticlusterGW { + if nc.TunnelType == ovsconfig.VXLANTunnel { + mtuDeduction = vxlanOverhead + } else if nc.TunnelType == ovsconfig.GeneveTunnel { + mtuDeduction = geneveOverhead + } else if nc.TunnelType == ovsconfig.GRETunnel { + mtuDeduction = greOverhead + } + } + + if nc.TrafficEncapMode.SupportsEncap() && isIPv6 { + mtuDeduction += ipv6ExtraOverhead + } + nc.MTUDeduction = mtuDeduction +} + // ServiceConfig includes K8s Service CIDR and available IP addresses for NodePort. type ServiceConfig struct { ServiceCIDR *net.IPNet // K8s Service ClusterIP CIDR diff --git a/pkg/agent/config/node_config_test.go b/pkg/agent/config/node_config_test.go index a290be143da..081ce053c7f 100644 --- a/pkg/agent/config/node_config_test.go +++ b/pkg/agent/config/node_config_test.go @@ -19,6 +19,8 @@ import ( "testing" "github.com/stretchr/testify/assert" + + "antrea.io/antrea/pkg/ovs/ovsconfig" ) func TestNetworkConfig_NeedsTunnelToPeer(t *testing.T) { @@ -275,3 +277,72 @@ func TestIsIPv6Enabled(t *testing.T) { }) } } + +func TestCalculateMTUDeduction(t *testing.T) { + tests := []struct { + name string + nc *NetworkConfig + isIPv6 bool + expectedMTUDeduction int + }{ + { + name: "VXLan encap without IPv6", + nc: &NetworkConfig{TunnelType: ovsconfig.VXLANTunnel}, + expectedMTUDeduction: 50, + }, + { + name: "Geneve encap without IPv6", + nc: &NetworkConfig{TunnelType: ovsconfig.GeneveTunnel}, + expectedMTUDeduction: 50, + }, + { + name: "GRE encap without IPv6", + nc: &NetworkConfig{TunnelType: ovsconfig.GRETunnel}, + expectedMTUDeduction: 38, + }, + { + name: "Default encap with IPv6", + nc: &NetworkConfig{TunnelType: ovsconfig.GeneveTunnel}, + isIPv6: true, + expectedMTUDeduction: 70, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + tt.nc.CalculateMTUDeduction(tt.isIPv6) + assert.Equal(t, tt.expectedMTUDeduction, tt.nc.MTUDeduction) + }) + } +} + +func TestNeedsTunnelInterface(t *testing.T) { + tests := []struct { + name string + nc *NetworkConfig + expected bool + }{ + { + name: "Default encap mode", + nc: &NetworkConfig{TunnelType: ovsconfig.GeneveTunnel}, + expected: true, + }, + { + name: "networkPolicyOnly with Multicluster enabled", + nc: &NetworkConfig{TrafficEncapMode: TrafficEncapModeNetworkPolicyOnly, EnableMulticlusterGW: true}, + expected: true, + }, + { + name: "networkPolicyOnly without Multicluster enabled", + nc: &NetworkConfig{TrafficEncapMode: TrafficEncapModeNetworkPolicyOnly, EnableMulticlusterGW: false}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + actual := tt.nc.NeedsTunnelInterface() + assert.Equal(t, tt.expected, actual) + }) + } +} diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 0cc39a9e740..a3f5bd8ae5f 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -15,6 +15,7 @@ package multicluster import ( + "errors" "fmt" "net" "time" @@ -28,7 +29,8 @@ import ( mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" mcclientset "antrea.io/antrea/multicluster/pkg/client/clientset/versioned" - mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions/multicluster/v1alpha1" + mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" + mcinformersv1alpha1 "antrea.io/antrea/multicluster/pkg/client/informers/externalversions/multicluster/v1alpha1" mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" "antrea.io/antrea/pkg/agent/config" "antrea.io/antrea/pkg/agent/interfacestore" @@ -37,7 +39,7 @@ import ( ) const ( - controllerName = "AntreaAgentMCRouteController" + controllerName = "AntreaAgentMCDefaultRouteController" // Set resyncPeriod to 0 to disable resyncing resyncPeriod = 0 * time.Second @@ -50,24 +52,24 @@ const ( workerItemKey = "key" ) -// MCRouteController watches Gateway and ClusterInfoImport events. +// MCDefaultRouteController watches Gateway and ClusterInfoImport events. // It is responsible for setting up necessary Openflow entries for multi-cluster // traffic on a Gateway or a regular Node. -type MCRouteController struct { +type MCDefaultRouteController struct { mcClient mcclientset.Interface ovsBridgeClient ovsconfig.OVSBridgeClient ofClient openflow.Client interfaceStore interfacestore.InterfaceStore nodeConfig *config.NodeConfig - gwInformer mcinformers.GatewayInformer + gwInformer mcinformersv1alpha1.GatewayInformer gwLister mclisters.GatewayLister gwListerSynced cache.InformerSynced - ciImportInformer mcinformers.ClusterInfoImportInformer + ciImportInformer mcinformersv1alpha1.ClusterInfoImportInformer ciImportLister mclisters.ClusterInfoImportLister ciImportListerSynced cache.InformerSynced queue workqueue.RateLimitingInterface // installedCIImports is for saving ClusterInfos which have been processed - // in MCRouteController. Need to use mutex to protect 'installedCIImports' if + // in MCDefaultRouteController. Need to use mutex to protect 'installedCIImports' if // we change the number of 'defaultWorkers'. installedCIImports map[string]*mcv1alpha1.ClusterInfoImport // Need to use mutex to protect 'installedActiveGW' if we change @@ -80,10 +82,9 @@ type MCRouteController struct { enablePodToPodConnectivity bool } -func NewMCRouteController( +func NewMCDefaultRouteController( mcClient mcclientset.Interface, - gwInformer mcinformers.GatewayInformer, - ciImportInformer mcinformers.ClusterInfoImportInformer, + mcInformerFactoryWithOption mcinformers.SharedInformerFactory, client openflow.Client, ovsBridgeClient ovsconfig.OVSBridgeClient, interfaceStore interfacestore.InterfaceStore, @@ -91,8 +92,10 @@ func NewMCRouteController( namespace string, enableStretchedNetworkPolicy bool, enablePodToPodConnectivity bool, -) *MCRouteController { - controller := &MCRouteController{ +) *MCDefaultRouteController { + gwInformer := mcInformerFactoryWithOption.Multicluster().V1alpha1().Gateways() + ciImportInformer := mcInformerFactoryWithOption.Multicluster().V1alpha1().ClusterInfoImports() + controller := &MCDefaultRouteController{ mcClient: mcClient, ovsBridgeClient: ovsBridgeClient, ofClient: client, @@ -141,22 +144,10 @@ func NewMCRouteController( return controller } -func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { +func (c *MCDefaultRouteController) enqueueGateway(obj interface{}, isDelete bool) { gw, isGW := obj.(*mcv1alpha1.Gateway) if !isGW { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", obj) - return - } - gw, ok = deletedState.Obj.(*mcv1alpha1.Gateway) - if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-Gateway object", "object", deletedState.Obj) - return - } - } - - if gw.Namespace != c.namespace { + klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj) return } @@ -169,22 +160,10 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { c.queue.Add(workerItemKey) } -func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) { +func (c *MCDefaultRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) { ciImp, isciImp := obj.(*mcv1alpha1.ClusterInfoImport) if !isciImp { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", obj) - return - } - ciImp, ok = deletedState.Obj.(*mcv1alpha1.ClusterInfoImport) - if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-ClusterInfoImport object", "object", deletedState.Obj) - return - } - } - - if ciImp.Namespace != c.namespace { + klog.ErrorS(errors.New("received unexpected object"), "enqueueClusterInfoImport can't process event", "obj", obj) return } @@ -204,7 +183,7 @@ func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete b // Run will create defaultWorkers workers (go routines) which will process // the Gateway events from the workqueue. -func (c *MCRouteController) Run(stopCh <-chan struct{}) { +func (c *MCDefaultRouteController) Run(stopCh <-chan struct{}) { defer c.queue.ShutDown() cacheSyncs := []cache.InformerSynced{c.gwListerSynced, c.ciImportListerSynced} klog.InfoS("Starting controller", "controller", controllerName) @@ -221,12 +200,12 @@ func (c *MCRouteController) Run(stopCh <-chan struct{}) { // worker is a long-running function that will continually call the processNextWorkItem // function in order to read and process a message on the workqueue. -func (c *MCRouteController) worker() { +func (c *MCDefaultRouteController) worker() { for c.processNextWorkItem() { } } -func (c *MCRouteController) processNextWorkItem() bool { +func (c *MCDefaultRouteController) processNextWorkItem() bool { obj, quit := c.queue.Get() if quit { return false @@ -247,7 +226,7 @@ func (c *MCRouteController) processNextWorkItem() bool { return true } -func (c *MCRouteController) syncMCFlows() error { +func (c *MCDefaultRouteController) syncMCFlows() error { startTime := time.Now() defer func() { klog.V(4).InfoS("Finished syncing flows for Multi-cluster", "time", time.Since(startTime)) @@ -290,13 +269,13 @@ func (c *MCRouteController) syncMCFlows() error { return nil } -func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { +func (c *MCDefaultRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { desiredCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything()) if err != nil { return err } - activeGWChanged := c.checkGateWayIPChange(activeGW) + activeGWChanged := c.checkGatewayIPChange(activeGW) installedCIImportNames := sets.StringKeySet(c.installedCIImports) for _, ciImp := range desiredCIImports { if err = c.addMCFlowsForSingleCIImp(activeGW, ciImp, c.installedCIImports[ciImp.Name], activeGWChanged); err != nil { @@ -313,7 +292,7 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return nil } -func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) bool { +func (c *MCDefaultRouteController) checkGatewayIPChange(activeGW *mcv1alpha1.Gateway) bool { var activeGWChanged bool if activeGW.Name == c.nodeConfig.Name { // On a Gateway Node, the GatewayIP of the active Gateway will impact the Openflow rules. @@ -325,7 +304,7 @@ func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) b return activeGWChanged } -func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { +func (c *MCDefaultRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) error { allCIImports, err := c.ciImportLister.ClusterInfoImports(c.namespace).List(labels.Everything()) if err != nil { return err @@ -343,7 +322,7 @@ func (c *MCRouteController) addMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway) return nil } -func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, +func (c *MCDefaultRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gateway, ciImport *mcv1alpha1.ClusterInfoImport, installedCIImp *mcv1alpha1.ClusterInfoImport, activeGWChanged bool) error { tunnelPeerIPToRemoteGW := getPeerGatewayIP(ciImport.Spec) if tunnelPeerIPToRemoteGW == nil { @@ -403,7 +382,7 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa return nil } -func (c *MCRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error { +func (c *MCDefaultRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error { if err := c.ofClient.UninstallMulticlusterFlows(ciImpName); err != nil { return fmt.Errorf("failed to uninstall multi-cluster flows to remote Gateway Node %s: %v", ciImpName, err) } @@ -411,34 +390,37 @@ func (c *MCRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error return nil } -func (c *MCRouteController) deleteMCFlowsForAllCIImps() error { +func (c *MCDefaultRouteController) deleteMCFlowsForAllCIImps() error { for _, ciImp := range c.installedCIImports { c.deleteMCFlowsForSingleCIImp(ciImp.Name) } return nil } -// getActiveGateway compares Gateway's CreationTimestamp to get the active Gateway, -// The last created Gateway will be the active Gateway. -func (c *MCRouteController) getActiveGateway() (*mcv1alpha1.Gateway, error) { - gws, err := c.gwLister.Gateways(c.namespace).List(labels.Everything()) +func (c *MCDefaultRouteController) getActiveGateway() (*mcv1alpha1.Gateway, error) { + activeGW, err := getActiveGateway(c.gwLister, c.namespace) if err != nil { return nil, err } - if len(gws) == 0 { + if activeGW == nil { return nil, nil } - // Comparing Gateway's CreationTimestamp to get the last created Gateway. - lastCreatedGW := gws[0] - for _, gw := range gws { - if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) { - lastCreatedGW = gw - } + if net.ParseIP(activeGW.GatewayIP) == nil || net.ParseIP(activeGW.InternalIP) == nil { + return nil, fmt.Errorf("the active Gateway %s has no valid GatewayIP or InternalIP", activeGW.Name) + } + return activeGW, nil +} + +func getActiveGateway(gwLister mclisters.GatewayLister, namespace string) (*mcv1alpha1.Gateway, error) { + gws, err := gwLister.Gateways(namespace).List(labels.Everything()) + if err != nil { + return nil, err } - if net.ParseIP(lastCreatedGW.GatewayIP) == nil || net.ParseIP(lastCreatedGW.InternalIP) == nil { - return nil, fmt.Errorf("the last created Gateway %s has no valid GatewayIP or InternalIP", lastCreatedGW.Name) + if len(gws) == 0 { + return nil, nil } - return lastCreatedGW, nil + // The Gateway webhook guarantees there will be at most one Gateway in a cluster. + return gws[0], nil } func generatePeerConfigs(subnets []string, gatewayIP net.IP) (map[*net.IPNet]net.IP, error) { diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index c6ff4fab5ad..ecb38c5c279 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -22,6 +22,7 @@ import ( "github.com/golang/mock/gomock" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" @@ -33,7 +34,7 @@ import ( ) type fakeRouteController struct { - *MCRouteController + *MCDefaultRouteController mcClient *mcfake.Clientset informerFactory mcinformers.SharedInformerFactory ofClient *oftest.MockClient @@ -41,20 +42,22 @@ type fakeRouteController struct { interfaceStore interfacestore.InterfaceStore } -func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRouteController, func()) { +func newMCDefaultRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRouteController, func()) { mcClient := mcfake.NewSimpleClientset() - mcInformerFactory := mcinformers.NewSharedInformerFactory(mcClient, 60*time.Second) - gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() - ciImpInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports() + mcInformerFactory := mcinformers.NewSharedInformerFactoryWithOptions(mcClient, + 60*time.Second, + mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = fields.Set{"metadata.namespace": defaultNs}.String() + }), + ) ctrl := gomock.NewController(t) ofClient := oftest.NewMockClient(ctrl) ovsClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl) interfaceStore := interfacestore.NewInterfaceStore() - c := NewMCRouteController( + c := NewMCDefaultRouteController( mcClient, - gwInformer, - ciImpInformer, + mcInformerFactory, ofClient, ovsClient, interfaceStore, @@ -64,12 +67,12 @@ func newMCRouteController(t *testing.T, nodeConfig *config.NodeConfig) (*fakeRou true, ) return &fakeRouteController{ - MCRouteController: c, - mcClient: mcClient, - informerFactory: mcInformerFactory, - ofClient: ofClient, - ovsClient: ovsClient, - interfaceStore: interfaceStore, + MCDefaultRouteController: c, + mcClient: mcClient, + informerFactory: mcInformerFactory, + ofClient: ofClient, + ovsClient: ovsClient, + interfaceStore: interfaceStore, }, ctrl.Finish } @@ -97,6 +100,16 @@ var ( gw1GatewayIP = net.ParseIP(gateway1.GatewayIP) gw2InternalIP = net.ParseIP(gateway2.InternalIP) + gateway3 = mcv1alpha1.Gateway{ + ObjectMeta: metav1.ObjectMeta{ + Name: "node-3", + Namespace: "kube-system", + CreationTimestamp: gw2CreationTime, + }, + GatewayIP: "172.17.0.13", + InternalIP: "192.17.0.13", + } + clusterInfoImport1 = mcv1alpha1.ClusterInfoImport{ ObjectMeta: metav1.ObjectMeta{ Name: "cluster-b-default-clusterinfo", @@ -131,7 +144,7 @@ var ( ) func TestMCRouteControllerAsGateway(t *testing.T) { - c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-1"}) + c, closeFn := newMCDefaultRouteController(t, &config.NodeConfig{Name: "node-1"}) defer closeFn() defer c.queue.ShutDown() @@ -196,28 +209,22 @@ func TestMCRouteControllerAsGateway(t *testing.T) { updatedGateway1b, metav1.UpdateOptions{}) c.processNextWorkItem() + // Delete Gateway1 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), + gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) + c.processNextWorkItem() + + // Create Gateway3 which is not in the default Namespace. + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.GetNamespace()).Create(context.TODO(), + &gateway3, metav1.CreateOptions{}) + // Create Gateway2 as active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), gw2InternalIP, true).Times(1) c.processNextWorkItem() - - // Delete Gateway2, then Gateway1 become active Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), - gateway2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) - c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, updatedGateway1aIP, true).Times(1) - c.processNextWorkItem() - - // Delete last Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), - gateway1.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() }() select { case <-time.After(5 * time.Second): @@ -227,7 +234,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { } func TestMCRouteControllerAsRegularNode(t *testing.T) { - c, closeFn := newMCRouteController(t, &config.NodeConfig{Name: "node-3"}) + c, closeFn := newMCDefaultRouteController(t, &config.NodeConfig{Name: "node-3"}) defer closeFn() defer c.queue.ShutDown() @@ -292,27 +299,21 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { gomock.Any(), updatedGateway1bIP, true).Times(1) c.processNextWorkItem() + // Delete Gateway1 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), + gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) + c.processNextWorkItem() + + // Create Gateway3 which is not in the default Namespace. + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.GetNamespace()).Create(context.TODO(), + &gateway3, metav1.CreateOptions{}) + // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP2, true).Times(1) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() - - // Delete Gateway2, then Gateway1 become active Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), - gateway2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) - c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), updatedGateway1bIP, true).Times(1) - c.processNextWorkItem() - - // Delete last Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), - gateway1.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.processNextWorkItem() }() select { diff --git a/pkg/agent/multicluster/pod_route_controller.go b/pkg/agent/multicluster/pod_route_controller.go new file mode 100644 index 00000000000..dcf0cbf330a --- /dev/null +++ b/pkg/agent/multicluster/pod_route_controller.go @@ -0,0 +1,478 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "errors" + "net" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/kubernetes" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcclientset "antrea.io/antrea/multicluster/pkg/client/clientset/versioned" + mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +const ( + // The number of workers processing a Pod change + podWorkerNum = 5 + dummyKey = "key" + + podRouteControllerName = "AntreaAgentMulticlusterPodRouteController" +) + +// MCPodRouteController generates L3 forwarding flows to forward cross-cluster +// traffic from MC Gateway to Pods on other Nodes inside a member cluster. It is +// required when networkPolicyOnly, noEncap or hybrid mode are configured, to forward +// the traffic through tunnels between Gateway and other Nodes, as otherwise the +// traffic will not go through tunnels in those modes. +type MCPodRouteController struct { + k8sClient kubernetes.Interface + mcClient mcclientset.Interface + ovsBridgeClient ovsconfig.OVSBridgeClient + ofClient openflow.Client + interfaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + podQueue workqueue.RateLimitingInterface + gwQueue workqueue.RateLimitingInterface + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + gwInformer cache.SharedIndexInformer + gwLister mclisters.GatewayLister + namespace string + // podIPsCache stores Pods' latest IP and Node IP. + podIPsCache map[string]string + podIPsCacheMutex sync.RWMutex + // podNameIPCache stores Pod's NamespacedName and IP. + podNameIPCache map[string]string + podNameIPCacheMutex sync.RWMutex + // ipToPods stores NamespacedNames of all Pods with the same IP. + ipToPods map[string]sets.String + ipToPodsMutex sync.RWMutex + // podWorkersStarted is a boolean which tracks if the Pod flow controller has been started. + podWorkersStarted bool + podWorkersStartedMutex sync.RWMutex + podWorkerStopCh chan struct{} +} + +func NewMCPodRouteController( + k8sClient kubernetes.Interface, + mcClient mcclientset.Interface, + gwInformer cache.SharedIndexInformer, + client openflow.Client, + ovsBridgeClient ovsconfig.OVSBridgeClient, + interfaceStore interfacestore.InterfaceStore, + nodeConfig *config.NodeConfig, + namespace string, +) *MCPodRouteController { + controller := &MCPodRouteController{ + k8sClient: k8sClient, + mcClient: mcClient, + ovsBridgeClient: ovsBridgeClient, + ofClient: client, + interfaceStore: interfaceStore, + nodeConfig: nodeConfig, + podQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "MCPodRouteControllerForPod"), + gwQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "MCPodRouteControllerForGateway"), + gwInformer: gwInformer, + gwLister: mclisters.NewGatewayLister(gwInformer.GetIndexer()), + namespace: namespace, + podIPsCache: make(map[string]string), + podNameIPCache: make(map[string]string), + ipToPods: make(map[string]sets.String), + podWorkerStopCh: make(chan struct{}), + } + + controller.gwInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + controller.enqueueGateway(cur) + }, + // Gateway UPDATE event doesn't impact Pod flows, so ignore it. + DeleteFunc: func(old interface{}) { + controller.enqueueGateway(old) + }, + }, + resyncPeriod, + ) + return controller +} + +func (c *MCPodRouteController) createPodInformer() { + listOptions := func(options *metav1.ListOptions) { + options.FieldSelector = fields.OneTermNotEqualSelector("spec.nodeName", c.nodeConfig.Name).String() + } + c.podInformer = coreinformers.NewFilteredPodInformer( + c.k8sClient, + metav1.NamespaceAll, + 0, + nil, + listOptions, + ) + c.podInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + c.enqueuePod(cur, false) + }, + UpdateFunc: func(old, cur interface{}) { + c.enqueuePod(cur, false) + }, + DeleteFunc: func(old interface{}) { + c.enqueuePod(old, true) + }, + }, + resyncPeriod, + ) + c.podLister = corelisters.NewPodLister(c.podInformer.GetIndexer()) +} + +func (c *MCPodRouteController) enqueueGateway(obj interface{}) { + _, isGW := obj.(*mcv1alpha1.Gateway) + if !isGW { + klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj) + return + } + c.gwQueue.Add(dummyKey) +} + +// checkPod checks the Pod info to determine if it's a valid Pod which should +// have Openflow flows installed or updated for cross-cluster traffic. +func (c *MCPodRouteController) checkPod(pod *corev1.Pod) bool { + // pod.Status.PodIP and HostIP should be an IPv4 address for IPv4 and dual stack clusters. + // IPv6 only cluster is not supported by Antrea Multi-cluster yet. + podIP := pod.Status.PodIP + podNodeIP := pod.Status.HostIP + + cachedPodNodeIP, exists := c.getPodNodeIPs(podIP) + if exists && cachedPodNodeIP == podNodeIP { + klog.V(2).InfoS("No Pod change impacts installed Openflow rules", "name", klog.KObj(pod)) + podName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + if _, ok := c.getPodNameIPCache(podName); !ok { + c.addPodNameIPCache(podName, podIP) + c.addPodIPsCache(podIP, podName) + } + return false + } + + if !exists && (podNodeIP == "" || pod.Spec.HostNetwork) { + klog.V(2).InfoS("Pod has no valid Node IP or it is in HostNetwork, skip it", "name", klog.KObj(pod), + "nodeIP", podNodeIP, "hostNetwork", pod.Spec.HostNetwork) + return false + } + + return true +} + +func (c *MCPodRouteController) enqueuePod(obj interface{}, isDelete bool) { + pod, isPod := obj.(*corev1.Pod) + if !isPod { + klog.ErrorS(errors.New("received unexpected object"), "enqueuePod can't process event", "obj", obj) + return + } + + createOrUpdatePodFlow := func(podName types.NamespacedName, podIP string) { + podNodeIP := pod.Status.HostIP + c.addPodNameIPCache(podName, podIP) + c.addPodIPsCache(podIP, podName) + c.addOrUpdatePodNodeIPs(podIP, podNodeIP) + c.podQueue.Add(podIP) + } + + deletePodFlow := func(podName types.NamespacedName, podIP string) { + podSet := c.getPodIPsCache(podIP) + if podSet == nil { + return + } + + if podSet.Has(podName.String()) { + if podSet.Len() == 1 { + c.deletePodNodeIP(podIP) + } + c.deletePodIPsCache(podIP, podName) + c.deletePodNameIPCache(podName) + c.podQueue.Add(podIP) + } + } + + podName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name} + podIP := pod.Status.PodIP + + if isDelete { + deletePodFlow(podName, podIP) + return + } + + cachedPodIP, exists := c.getPodNameIPCache(podName) + if !exists && podIP == "" { + return + } + + if exists { + if podIP == "" { + klog.InfoS("cacheIP", "ip", cachedPodIP, "pod", podName.String()) + deletePodFlow(podName, cachedPodIP) + return + } else if cachedPodIP != podIP { + deletePodFlow(podName, cachedPodIP) + createOrUpdatePodFlow(podName, podIP) + return + } + } + + needCreateOrUpdate := c.checkPod(pod) + if !needCreateOrUpdate { + return + } + + createOrUpdatePodFlow(podName, podIP) +} + +func (c *MCPodRouteController) Run(stopCh <-chan struct{}) { + defer c.gwQueue.ShutDown() + defer c.podQueue.ShutDown() + + klog.InfoS("Starting controller", "controller", podRouteControllerName) + defer klog.InfoS("Shutting down controller", "controller", podRouteControllerName) + if !cache.WaitForNamedCacheSync(podRouteControllerName, stopCh, c.gwInformer.HasSynced) { + return + } + // Run a single routine to handle Gateway events. + go wait.Until(c.gatewayWorker, time.Second, stopCh) + <-stopCh +} + +func (c *MCPodRouteController) gatewayWorker() { + for c.processGatewayNextWorkItem() { + } +} + +func (c *MCPodRouteController) processGatewayNextWorkItem() bool { + key, quit := c.gwQueue.Get() + if quit { + return false + } + defer c.gwQueue.Done(key) + + if k, ok := key.(string); !ok { + c.gwQueue.Forget(k) + klog.InfoS("Expected string in work queue but got", "object", k) + return true + } else if err := c.syncGateway(); err == nil { + c.gwQueue.Forget(key) + } else { + c.gwQueue.AddRateLimited(key) + klog.ErrorS(err, "Error syncing Gateway, requeuing", "key", key) + } + return true +} + +func (c *MCPodRouteController) syncGateway() error { + activeGW, err := getActiveGateway(c.gwLister, c.namespace) + if err != nil { + klog.ErrorS(err, "Failed to get an active Gateway") + return err + } + + c.podWorkersStartedMutex.Lock() + defer c.podWorkersStartedMutex.Unlock() + + isMyselfGateway := activeGW != nil && c.nodeConfig.Name == activeGW.Name + // Stop Pod flow controller and clean up all installed Multi-cluster Pod flows, + // if the Node was a Gateway before. + if !isMyselfGateway { + if c.podWorkersStarted { + klog.InfoS("Shutting down Multi-cluster PodFlowController") + close(c.podWorkerStopCh) + c.podWorkerStopCh = nil + c.podInformer = nil + c.podLister = nil + c.podWorkersStarted = false + } + } + + if len(c.podIPsCache) > 0 && (!isMyselfGateway || !c.podWorkersStarted) { + err := c.ofClient.UninstallMulticlusterPodFlows("") + if err != nil { + return err + } + c.podIPsCache = map[string]string{} + return nil + } + + if isMyselfGateway { + if !c.podWorkersStarted { + klog.InfoS("Starting Multi-cluster PodFlowController") + c.podWorkerStopCh = make(chan struct{}) + c.createPodInformer() + go c.podInformer.Run(c.podWorkerStopCh) + if !cache.WaitForNamedCacheSync(podRouteControllerName, c.podWorkerStopCh, c.podInformer.HasSynced) { + close(c.podWorkerStopCh) + c.podWorkerStopCh = nil + c.podInformer = nil + c.podLister = nil + return errors.New("failed to sync Pod cache") + } + + for i := 0; i < podWorkerNum; i++ { + go wait.Until(c.podWorker, time.Second, c.podWorkerStopCh) + } + c.podWorkersStarted = true + return nil + } + // Do nothing when the Pod flow controller is already started since + // Pod flow controller will be responsible for handling Pod events to install flows. + } + return nil +} + +func (c *MCPodRouteController) podWorker() { + for c.processPodNextWorkItem() { + } +} + +func (c *MCPodRouteController) processPodNextWorkItem() bool { + obj, quit := c.podQueue.Get() + if quit { + return false + } + defer c.podQueue.Done(obj) + + if k, ok := obj.(string); !ok { + c.podQueue.Forget(obj) + klog.InfoS("Expected string in work queue but got", "object", obj) + return true + } else if err := c.syncPod(k); err == nil { + c.podQueue.Forget(k) + } else { + c.podQueue.AddRateLimited(k) + klog.ErrorS(err, "Error syncing key, requeuing", "key", k) + } + return true +} + +func (c *MCPodRouteController) syncPod(podIP string) error { + c.podWorkersStartedMutex.RLock() + defer c.podWorkersStartedMutex.RUnlock() + if !c.podWorkersStarted { + return nil + } + + podSet := c.getPodIPsCache(podIP) + if podSet.Len() == 0 { + klog.V(2).InfoS("Deleting Multi-cluster flows for Pod", "podIP", podIP) + if err := c.ofClient.UninstallMulticlusterPodFlows(podIP); err != nil { + klog.ErrorS(err, "Failed to uninstall Multi-cluster flows for Pod", "podIP", podIP) + return err + } + return nil + } + + podNodeIP, exists := c.getPodNodeIPs(podIP) + if !exists { + return nil + } + + klog.V(2).InfoS("Adding Multi-cluster flows for Pod", "podIP", podIP, "nodeIP", podNodeIP) + if err := c.ofClient.InstallMulticlusterPodFlows(net.ParseIP(podIP), net.ParseIP(podNodeIP)); err != nil { + klog.ErrorS(err, "Failed to install Multi-cluster flows for Pod", "podIP", podIP, "nodeIP", podNodeIP) + return err + } + return nil +} + +func (c *MCPodRouteController) getPodIPsCache(ip string) sets.String { + c.ipToPodsMutex.RLock() + defer c.ipToPodsMutex.RUnlock() + podSet, ok := c.ipToPods[ip] + if ok { + return podSet + } + return nil +} + +func (c *MCPodRouteController) addPodIPsCache(ip string, podName types.NamespacedName) { + c.ipToPodsMutex.Lock() + defer c.ipToPodsMutex.Unlock() + if pods, ok := c.ipToPods[ip]; !ok { + c.ipToPods[ip] = sets.NewString(podName.String()) + } else { + pods.Insert(podName.String()) + } +} + +func (c *MCPodRouteController) deletePodIPsCache(ip string, podName types.NamespacedName) { + c.ipToPodsMutex.Lock() + defer c.ipToPodsMutex.Unlock() + podSet := c.ipToPods[ip] + podSet.Delete(podName.String()) + if podSet.Len() == 0 { + delete(c.ipToPods, ip) + } +} + +func (c *MCPodRouteController) getPodNodeIPs(podIP string) (string, bool) { + c.podIPsCacheMutex.RLock() + defer c.podIPsCacheMutex.RUnlock() + podNodeIP, exists := c.podIPsCache[podIP] + return podNodeIP, exists +} + +func (c *MCPodRouteController) addOrUpdatePodNodeIPs(podIP, podNodeIP string) { + c.podIPsCacheMutex.Lock() + defer c.podIPsCacheMutex.Unlock() + c.podIPsCache[podIP] = podNodeIP +} + +func (c *MCPodRouteController) deletePodNodeIP(podIP string) { + c.ipToPodsMutex.Lock() + defer c.ipToPodsMutex.Unlock() + delete(c.podIPsCache, podIP) +} + +func (c *MCPodRouteController) addPodNameIPCache(podName types.NamespacedName, podIP string) { + c.podNameIPCacheMutex.Lock() + defer c.podNameIPCacheMutex.Unlock() + c.podNameIPCache[podName.String()] = podIP +} + +func (c *MCPodRouteController) getPodNameIPCache(podName types.NamespacedName) (string, bool) { + c.podNameIPCacheMutex.Lock() + defer c.podNameIPCacheMutex.Unlock() + podIP, exists := c.podNameIPCache[podName.String()] + return podIP, exists +} + +func (c *MCPodRouteController) deletePodNameIPCache(podName types.NamespacedName) { + c.podNameIPCacheMutex.Lock() + defer c.podNameIPCacheMutex.Unlock() + delete(c.podNameIPCache, podName.String()) +} diff --git a/pkg/agent/multicluster/pod_route_controller_test.go b/pkg/agent/multicluster/pod_route_controller_test.go new file mode 100644 index 00000000000..c354de839a0 --- /dev/null +++ b/pkg/agent/multicluster/pod_route_controller_test.go @@ -0,0 +1,349 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "context" + "net" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + v1 "k8s.io/client-go/listers/core/v1" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" + mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" + mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + oftest "antrea.io/antrea/pkg/agent/openflow/testing" + ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" +) + +var ( + defaultNs = "default" + node1Name = "node-1" + ctx = context.TODO() + + nginx1NoIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx1", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + } + + nginx2WithIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx2", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + Status: corev1.PodStatus{ + PodIP: "192.168.1.12", + HostIP: "10.170.10.11", + }, + } + + nginx2PodIP = net.ParseIP("192.168.1.12") + nginx2HostIP = net.ParseIP("10.170.10.11") + + nginxWithHostNetwork = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx-hostnetwork", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + HostNetwork: true, + }, + Status: corev1.PodStatus{ + PodIP: "10.170.10.15", + HostIP: "10.170.10.15", + }, + } +) + +type fakeMCPodRouteController struct { + *MCPodRouteController + mcClient *mcfake.Clientset + k8sClient *k8sfake.Clientset + informerFactory informers.SharedInformerFactory + mcInformerFactory mcinformers.SharedInformerFactory + ofClient *oftest.MockClient + ovsClient *ovsconfigtest.MockOVSBridgeClient + interfaceStore interfacestore.InterfaceStore +} + +func newMCPodRouteController(t *testing.T, nodeConfig *config.NodeConfig, mcClient *mcfake.Clientset, + k8sClient *k8sfake.Clientset) (*fakeMCPodRouteController, func()) { + mcInformerFactory := mcinformers.NewSharedInformerFactoryWithOptions(mcClient, + 0, + mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) { + lo.FieldSelector = fields.Set{"metadata.namespace": defaultNs}.String() + }), + ) + gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() + + informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + + ctrl := gomock.NewController(t) + ofClient := oftest.NewMockClient(ctrl) + ovsClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl) + interfaceStore := interfacestore.NewInterfaceStore() + c := NewMCPodRouteController( + k8sClient, + mcClient, + gwInformer.Informer(), + ofClient, + ovsClient, + interfaceStore, + nodeConfig, + defaultNs, + ) + return &fakeMCPodRouteController{ + MCPodRouteController: c, + mcClient: mcClient, + k8sClient: k8sClient, + mcInformerFactory: mcInformerFactory, + informerFactory: informerFactory, + ofClient: ofClient, + ovsClient: ovsClient, + interfaceStore: interfaceStore, + }, ctrl.Finish +} + +func TestGatewayEvent(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset([]runtime.Object{nginx1NoIPs, nginx2WithIPs}...) + c, closeFn := newMCPodRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.podQueue.ShutDown() + defer c.gwQueue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + c.createPodInformer() + go c.podInformer.Run(stopCh) + c.podWorkersStarted = true + + for _, pod := range []*corev1.Pod{nginx1NoIPs, nginx2WithIPs} { + if err := waitForPodRealized(c.podLister, pod); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod.Namespace, pod.Name, err) + } + } + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + + // Create a Gateway node-2 in the default Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway2, metav1.CreateOptions{}) + // Delete a Gateway node-2 in the default Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Delete(ctx, gateway2.Name, metav1.DeleteOptions{}) + + // Create a Gateway node-3 in the kube-system Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.Namespace).Create(ctx, &gateway3, metav1.CreateOptions{}) + // Delete a Gateway node-3 in the kube-system Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.Namespace).Delete(ctx, gateway3.Name, metav1.DeleteOptions{}) + + // Create a Gateway node-1 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway1, metav1.CreateOptions{}) + if err := waitForGatewayRealized(c.gwLister, &gateway1); err != nil { + t.Errorf("Error when waiting for Gateway '%s/%s' to be realized, err: %v", gateway1.Namespace, gateway1.Name, err) + } + c.processGatewayNextWorkItem() + + c.ofClient.EXPECT().InstallMulticlusterPodFlows(nginx2PodIP, nginx2HostIP) + c.processPodNextWorkItem() + + // Delete a Gateway node-1 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Delete(ctx, gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterPodFlows("") + c.processGatewayNextWorkItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func TestPodEvent(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset([]runtime.Object{nginx2WithIPs}...) + c, closeFn := newMCPodRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.podQueue.ShutDown() + defer c.gwQueue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + c.createPodInformer() + go c.podInformer.Run(stopCh) + c.podWorkersStarted = true + + if err := waitForPodRealized(c.podLister, nginx2WithIPs); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", nginx2WithIPs.Namespace, nginx2WithIPs.Name, err) + } + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + // Create a Gateway in kube-system Namespace + c.mcClient.MulticlusterV1alpha1().Gateways(gateway3.Namespace).Create(ctx, &gateway3, metav1.CreateOptions{}) + + // Create a Gateway + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(nginx2PodIP, nginx2HostIP) + c.processPodNextWorkItem() + + // Create a Pod without IPs + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginx1NoIPs, metav1.CreateOptions{}) + if err := waitForPodRealized(c.podLister, nginx1NoIPs); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", nginx1NoIPs.Namespace, nginx1NoIPs.Name, err) + } + + // Create a Pod with hostNetwork + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginxWithHostNetwork, metav1.CreateOptions{}) + if err := waitForPodRealized(c.podLister, nginxWithHostNetwork); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", nginxWithHostNetwork.Namespace, nginxWithHostNetwork.Name, err) + } + + // Update a Pod with IPs + nginx1Updated := nginx1NoIPs.DeepCopy() + nginx1Updated.Status.PodIP = "192.168.10.11" + nginx1Updated.Status.HostIP = "172.16.10.11" + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1Updated, metav1.UpdateOptions{}) + if err := waitForPodIPUpdate(c.podLister, nginx1Updated); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be updated, err: %v", nginx1Updated.Namespace, nginx1Updated.Name, err) + } + c.ofClient.EXPECT().InstallMulticlusterPodFlows(net.ParseIP("192.168.10.11"), net.ParseIP("172.16.10.11")) + c.processPodNextWorkItem() + + // Update a Pod's label + nginx1UpdatedLabel := nginx1Updated.DeepCopy() + nginx1UpdatedLabel.Labels = map[string]string{"env": "test"} + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1UpdatedLabel, metav1.UpdateOptions{}) + if err := waitForPodLabelUpdate(c.podLister, nginx1UpdatedLabel); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be updated, err: %v", nginx1UpdatedLabel.Namespace, nginx1UpdatedLabel.Name, err) + } + + // Update the old Pod with a new IP + nginx1UpdatedWithNewIP := nginx1NoIPs.DeepCopy() + nginx1UpdatedWithNewIP.Status.PodIP = "192.168.110.10" + nginx1UpdatedWithNewIP.Status.HostIP = "172.16.10.11" + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1UpdatedWithNewIP, metav1.UpdateOptions{}) + if err := waitForPodIPUpdate(c.podLister, nginx1UpdatedWithNewIP); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be updated, err: %v", nginx1UpdatedWithNewIP.Namespace, nginx1UpdatedWithNewIP.Name, err) + } + c.ofClient.EXPECT().UninstallMulticlusterPodFlows("192.168.10.11") + c.ofClient.EXPECT().InstallMulticlusterPodFlows(net.ParseIP("192.168.110.10"), net.ParseIP("172.16.10.11")) + c.processPodNextWorkItem() + c.processPodNextWorkItem() + assert.Equal(t, 2, len(c.ipToPods)) + assert.Equal(t, 1, c.ipToPods["192.168.110.10"].Len()) + _, ok := c.ipToPods["192.168.10.11"] + assert.Equal(t, false, ok) + assert.Equal(t, 2, len(c.podNameIPCache)) + + // Create a Pod with the same Pod IP + nginx1DupIP := nginx1UpdatedWithNewIP.DeepCopy() + nginx1DupIP.Name = "nginx-1-dup-ip" + nginx1DupIP.Status.HostIP = "172.16.10.11" + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginx1DupIP, metav1.CreateOptions{}) + if err := waitForPodRealized(c.podLister, nginx1DupIP); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", nginx1DupIP.Namespace, nginx1DupIP.Name, err) + } + assert.Equal(t, 2, len(c.ipToPods)) + assert.Equal(t, 2, c.ipToPods["192.168.110.10"].Len()) + assert.Equal(t, 3, len(c.podNameIPCache)) + + // Update the old Pod with an empty IP + nginx1UpdatedWithEmptyIP := nginx1NoIPs.DeepCopy() + nginx1UpdatedWithEmptyIP.Status.HostIP = "172.16.10.11" + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1UpdatedWithEmptyIP, metav1.UpdateOptions{}) + if err := waitForPodIPUpdate(c.podLister, nginx1UpdatedWithEmptyIP); err != nil { + t.Errorf("Error when waiting for Pod '%s/%s' to be updated, err: %v", nginx1UpdatedWithEmptyIP.Namespace, nginx1UpdatedWithEmptyIP.Name, err) + } + c.ofClient.EXPECT().InstallMulticlusterPodFlows(net.ParseIP("192.168.110.10"), net.ParseIP("172.16.10.11")) + c.processPodNextWorkItem() + assert.Equal(t, 2, len(c.ipToPods)) + assert.Equal(t, 1, c.ipToPods["192.168.110.10"].Len()) + assert.Equal(t, 2, len(c.podNameIPCache)) + + // Delete the old Pod + c.k8sClient.CoreV1().Pods(defaultNs).Delete(ctx, nginx1UpdatedWithEmptyIP.Name, metav1.DeleteOptions{}) + assert.Equal(t, 2, len(c.ipToPods)) + assert.Equal(t, 1, c.ipToPods["192.168.110.10"].Len()) + assert.Equal(t, 2, len(c.podNameIPCache)) + + // Delete the new Pod + c.k8sClient.CoreV1().Pods(defaultNs).Delete(ctx, nginx1DupIP.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterPodFlows("192.168.110.10") + c.processPodNextWorkItem() + assert.Equal(t, 1, len(c.ipToPods)) + _, ok = c.ipToPods["192.168.110.10"] + assert.Equal(t, false, ok) + assert.Equal(t, 1, len(c.podNameIPCache)) + + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func waitForGatewayRealized(gwLister mclisters.GatewayLister, gateway *mcv1alpha1.Gateway) error { + return wait.Poll(interval, timeout, func() (bool, error) { + _, err := gwLister.Gateways(gateway.Namespace).Get(gateway.Name) + if err != nil { + return false, nil + } + return true, err + }) +} + +func waitForPodIPUpdate(podLister v1.PodLister, pod *corev1.Pod) error { + return wait.Poll(interval, timeout, func() (bool, error) { + getPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) + if err != nil || pod.Status.PodIP != getPod.Status.PodIP || pod.Status.HostIP != getPod.Status.HostIP { + return false, nil + } + return true, err + }) +} diff --git a/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go b/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go index eb91d016864..156cb3167bd 100644 --- a/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go +++ b/pkg/agent/multicluster/stretched_networkpolicy_controller_test.go @@ -31,6 +31,7 @@ import ( "k8s.io/client-go/informers" coreinformers "k8s.io/client-go/informers/core/v1" "k8s.io/client-go/kubernetes/fake" + v1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" @@ -159,7 +160,7 @@ func TestEnqueueAllPods(t *testing.T) { c.mcInformerFactory.Start(stopCh) c.mcInformerFactory.WaitForCacheSync(stopCh) go c.podInformer.Run(stopCh) - if err := waitForPodRealized(c, pod); err != nil { + if err := waitForPodRealized(c.podLister, pod); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod.Namespace, pod.Name, err) } if err := waitForLabelIdentityRealized(c, labelIdentity); err != nil { @@ -248,7 +249,7 @@ func TestStretchedNetworkPolicyControllerPodEvent(t *testing.T) { // Create a Pod whose LabelIdentity doesn't exist. c.clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err := waitForPodRealized(c, pod); err != nil { + if err := waitForPodRealized(c.podLister, pod); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod.Namespace, pod.Name, err) } c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) @@ -268,7 +269,7 @@ func TestStretchedNetworkPolicyControllerPodEvent(t *testing.T) { } c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) c.clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err := waitForPodRealized(c, pod); err != nil { + if err := waitForPodRealized(c.podLister, pod); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod.Namespace, pod.Name, err) } c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity1.Spec.ID)).Times(1) @@ -285,7 +286,7 @@ func TestStretchedNetworkPolicyControllerPodEvent(t *testing.T) { c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) pod.Labels["foo"] = "bar2" c.clientset.CoreV1().Pods(pod.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) - if err := waitForPodLabelUpdate(c, pod); err != nil { + if err := waitForPodLabelUpdate(c.podLister, pod); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be updated, err: %v", pod.Namespace, pod.Name, err) } c.ofClient.EXPECT().InstallPodFlows(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Eq(&labelIdentity2.Spec.ID)).Times(1) @@ -412,11 +413,11 @@ func TestStretchedNetworkPolicyControllerNSEvent(t *testing.T) { } c.clientset.CoreV1().Pods(pod1.Namespace).Create(context.TODO(), pod1, metav1.CreateOptions{}) - if err := waitForPodRealized(c, pod1); err != nil { + if err := waitForPodRealized(c.podLister, pod1); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod1.Namespace, pod1.Name, err) } c.clientset.CoreV1().Pods(pod2.Namespace).Create(context.TODO(), pod2, metav1.CreateOptions{}) - if err := waitForPodRealized(c, pod2); err != nil { + if err := waitForPodRealized(c.podLister, pod2); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod2.Namespace, pod2.Name, err) } c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod1.Name, pod1.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) @@ -507,7 +508,7 @@ func TestStretchedNetworkPolicyControllerLabelIdentityEvent(t *testing.T) { } c.clientset.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) - if err := waitForPodRealized(c, pod); err != nil { + if err := waitForPodRealized(c.podLister, pod); err != nil { t.Errorf("Error when waiting for Pod '%s/%s' to be realized, err: %v", pod.Namespace, pod.Name, err) } c.interfaceStore.EXPECT().GetContainerInterfacesByPod(pod.Name, pod.Namespace).Return([]*interfacestore.InterfaceConfig{&interfaceConfig}).Times(1) @@ -560,9 +561,9 @@ func toPodAddEvent(pod *corev1.Pod) antreatypes.PodUpdate { } } -func waitForPodRealized(c *fakeStretchedNetworkPolicyController, pod *corev1.Pod) error { +func waitForPodRealized(podLister v1.PodLister, pod *corev1.Pod) error { return wait.Poll(interval, timeout, func() (bool, error) { - _, err := c.podLister.Pods(pod.Namespace).Get(pod.Name) + _, err := podLister.Pods(pod.Namespace).Get(pod.Name) if err != nil { return false, nil } @@ -570,9 +571,9 @@ func waitForPodRealized(c *fakeStretchedNetworkPolicyController, pod *corev1.Pod }) } -func waitForPodLabelUpdate(c *fakeStretchedNetworkPolicyController, pod *corev1.Pod) error { +func waitForPodLabelUpdate(podLister v1.PodLister, pod *corev1.Pod) error { return wait.Poll(interval, timeout, func() (bool, error) { - getPod, err := c.podLister.Pods(pod.Namespace).Get(pod.Name) + getPod, err := podLister.Pods(pod.Namespace).Get(pod.Name) if err != nil || !reflect.DeepEqual(pod.Labels, getPod.Labels) { return false, nil } diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 3f6f8178be1..efc5f539ffd 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -342,10 +342,18 @@ type Client interface { // InstallMulticlusterClassifierFlows installs flows to classify cross-cluster packets. InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error - // UninstallMulticlusterFlows removes cross-cluster flows matching the given clusterID on + // InstallMulticlusterPodFlows installs flows to handle cross-cluster packets from Multi-cluster Gateway to + // regular Nodes. + InstallMulticlusterPodFlows(podIP net.IP, tunnelPeerIP net.IP) error + + // UninstallMulticlusterFlows removes cross-cluster flows matching the given cache key on // a regular Node or a Gateway. UninstallMulticlusterFlows(clusterID string) error + // UninstallMulticlusterPodFlows removes Pod flows matching the given cache key on + // a Gateway. When the podIP is empty, all Pod flows will be removed. + UninstallMulticlusterPodFlows(podIP string) error + // InstallVMUplinkFlows installs flows to forward packet between uplinkPort and hostPort. On a VM, the // uplink and host internal port are paired directly, and no layer 2/3 forwarding flow is installed. InstallVMUplinkFlows(hostInterfaceName string, hostPort int32, uplinkPort int32) error @@ -470,6 +478,23 @@ func (c *client) deleteFlows(cache *flowCategoryCache, flowCacheKey string) erro return nil } +func (c *client) deleteAllFlows(cache *flowCategoryCache) error { + var delAllFlows []binding.Flow + cache.Range(func(key, value any) bool { + fCache := value.(flowCache) + delFlows := make([]binding.Flow, 0, len(fCache)) + for _, flow := range fCache { + delFlows = append(delFlows, flow) + } + delAllFlows = append(delAllFlows, delFlows...) + return true + }) + if err := c.ofEntryOperations.DeleteAll(delAllFlows); err != nil { + return err + } + return nil +} + // InstallNodeFlows installs flows for peer Nodes. Parameter remoteGatewayMAC is only for Windows. func (c *client) InstallNodeFlows(hostname string, peerConfigs map[*net.IPNet]net.IP, @@ -1393,6 +1418,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string, // InstallMulticlusterClassifierFlows adds the following flows: // - One flow in L2ForwardingCalcTable for the global virtual multicluster MAC 'aa:bb:cc:dd:ee:f0' // to set its target output port as 'antrea-tun0'. This flow will be on both Gateway and regular Node. +// - One flow in ClassifierTable for the tunnel traffic if it's networkPolicyOnly mode. // - One flow to match MC virtual MAC 'aa:bb:cc:dd:ee:f0' in ClassifierTable for Gateway only. // - One flow in L2ForwardingOutTable to allow multicluster hairpin traffic for Gateway only. func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error { @@ -1403,6 +1429,10 @@ func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGatew c.featurePodConnectivity.l2ForwardCalcFlow(GlobalVirtualMACForMulticluster, tunnelOFPort), } + if c.networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap { + flows = append(flows, c.featurePodConnectivity.tunnelClassifierFlow(tunnelOFPort)) + } + if isGateway { flows = append(flows, c.featureMulticluster.tunnelClassifierFlow(tunnelOFPort), @@ -1412,9 +1442,32 @@ func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGatew return c.modifyFlows(c.featureMulticluster.cachedFlows, "multicluster-classifier", flows) } +func (c *client) InstallMulticlusterPodFlows(podIP net.IP, tunnelPeerIP net.IP) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + localGatewayMAC := c.nodeConfig.GatewayConfig.MAC + flows := []binding.Flow{c.featureMulticluster.l3FwdFlowToPodViaTun(localGatewayMAC, podIP, tunnelPeerIP)} + return c.modifyFlows(c.featureMulticluster.cachedPodFlows, podIP.String(), flows) +} + func (c *client) UninstallMulticlusterFlows(clusterID string) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() cacheKey := fmt.Sprintf("cluster_%s", clusterID) return c.deleteFlows(c.featureMulticluster.cachedFlows, cacheKey) } + +func (c *client) UninstallMulticlusterPodFlows(podIP string) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + if podIP == "" { + // Clean up all flows. + err := c.deleteAllFlows(c.featureMulticluster.cachedPodFlows) + if err != nil { + return err + } + c.featureMulticluster.cachedPodFlows = newFlowCategoryCache() + return nil + } + return c.deleteFlows(c.featureMulticluster.cachedPodFlows, podIP) +} diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index 7761836ef43..985f7343633 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -33,6 +33,7 @@ const UnknownLabelIdentity = uint32(0xffffff) type featureMulticluster struct { cookieAllocator cookie.Allocator cachedFlows *flowCategoryCache + cachedPodFlows *flowCategoryCache category cookie.Category ipProtocols []binding.Protocol dnatCtZones map[binding.Protocol]int @@ -51,6 +52,7 @@ func newFeatureMulticluster(cookieAllocator cookie.Allocator, ipProtocols []bind return &featureMulticluster{ cookieAllocator: cookieAllocator, cachedFlows: newFlowCategoryCache(), + cachedPodFlows: newFlowCategoryCache(), category: cookie.Multicluster, ipProtocols: ipProtocols, snatCtZones: snatCtZones, @@ -184,3 +186,22 @@ func (f *featureMulticluster) snatConntrackFlows(serviceCIDR net.IPNet, localGat ) return flows } + +func (f *featureMulticluster) l3FwdFlowToPodViaTun( + localGatewayMAC net.HardwareAddr, + podIP net.IP, + tunnelPeer net.IP) binding.Flow { + ipProtocol := getIPProtocol(podIP) + // This generates the flow to forward cross-cluster request packets based + // on Pod IP. + return L3ForwardingTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(ipProtocol). + MatchDstIP(podIP). + MatchDstMAC(GlobalVirtualMACForMulticluster). + Action().SetSrcMAC(localGatewayMAC). // Rewrite src MAC to local gateway MAC. + Action().SetTunnelDst(tunnelPeer). // Flow based tunnel. Set tunnel destination. + Action().LoadRegMark(ToTunnelRegMark). + Action().GotoTable(L3DecTTLTable.GetID()). + Done() +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index fbf9b8d468f..04388c5895e 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -368,6 +368,20 @@ func (mr *MockClientMockRecorder) InstallMulticlusterNodeFlows(arg0, arg1, arg2, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterNodeFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterNodeFlows), arg0, arg1, arg2, arg3) } +// InstallMulticlusterPodFlows mocks base method +func (m *MockClient) InstallMulticlusterPodFlows(arg0, arg1 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticlusterPodFlows", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticlusterPodFlows indicates an expected call of InstallMulticlusterPodFlows +func (mr *MockClientMockRecorder) InstallMulticlusterPodFlows(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterPodFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterPodFlows), arg0, arg1) +} + // InstallNodeFlows mocks base method func (m *MockClient) InstallNodeFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2 *ip.DualStackIPs, arg3 uint32, arg4 net.HardwareAddr) error { m.ctrl.T.Helper() @@ -838,6 +852,20 @@ func (mr *MockClientMockRecorder) UninstallMulticlusterFlows(arg0 interface{}) * return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticlusterFlows", reflect.TypeOf((*MockClient)(nil).UninstallMulticlusterFlows), arg0) } +// UninstallMulticlusterPodFlows mocks base method +func (m *MockClient) UninstallMulticlusterPodFlows(arg0 string) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "UninstallMulticlusterPodFlows", arg0) + ret0, _ := ret[0].(error) + return ret0 +} + +// UninstallMulticlusterPodFlows indicates an expected call of UninstallMulticlusterPodFlows +func (mr *MockClientMockRecorder) UninstallMulticlusterPodFlows(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UninstallMulticlusterPodFlows", reflect.TypeOf((*MockClient)(nil).UninstallMulticlusterPodFlows), arg0) +} + // UninstallNodeFlows mocks base method func (m *MockClient) UninstallNodeFlows(arg0 string) error { m.ctrl.T.Helper() diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index e140b07e79a..40de77a5eba 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -564,14 +564,13 @@ func (tester *cmdAddDelTester) cmdDelTest(tc testCase, dataDir string) { func newTester() *cmdAddDelTester { tester := &cmdAddDelTester{} ifaceStore := interfacestore.NewInterfaceStore() - testNodeConfig.NodeMTU = 1450 tester.networkReadyCh = make(chan struct{}) tester.server = cniserver.New(testSock, "", testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, - false, false, false, false, + false, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, tester.networkReadyCh) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) ctx := context.Background() @@ -735,7 +734,7 @@ func setupChainTest( testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, - true, false, false, false, + true, false, false, false, &config.NetworkConfig{InterfaceMTU: 1450}, networkReadyCh) } else { server = inServer @@ -859,7 +858,6 @@ func init() { gwMAC, _ = net.ParseMAC("11:11:11:11:11:11") nodeGateway := &config.GatewayConfig{IPv4: gwIP, MAC: gwMAC, Name: ""} _, nodePodCIDR, _ := net.ParseCIDR("192.168.1.0/24") - nodeMTU := 1500 - testNodeConfig = &config.NodeConfig{Name: nodeName, PodIPv4CIDR: nodePodCIDR, NodeMTU: nodeMTU, GatewayConfig: nodeGateway} + testNodeConfig = &config.NodeConfig{Name: nodeName, PodIPv4CIDR: nodePodCIDR, GatewayConfig: nodeGateway} }