From 120d9e7699552ac3815edf7dd185908f5e4f602e Mon Sep 17 00:00:00 2001 From: Lan Luo Date: Wed, 23 Nov 2022 14:41:33 +0800 Subject: [PATCH] Multi-cluster support with networkPolicyOnly mode In order to support multi-cluster traffic when the member cluster is deployed with networkPolicyOnly mode, antrea-agent will be responsible to do following things: 1. Create tunnel interface `antrea-tun0` for cross-cluster traffic 2. Watch all Pods on the Gateway and set up one rule per Pod in L3Fowarding table as long as the Pod is running in a general Node instead of the Gateway. 3. Update container interface's MTU to minus tunnel overload. Signed-off-by: Lan Luo --- cmd/antrea-agent/agent.go | 35 +- cmd/antrea-agent/options.go | 6 +- .../multicluster/resourceexport_controller.go | 4 +- pkg/agent/agent.go | 11 +- .../interface_configuration_linux.go | 36 +++ .../interface_configuration_windows.go | 18 ++ pkg/agent/cniserver/pod_configuration.go | 15 + pkg/agent/cniserver/server.go | 20 +- pkg/agent/multicluster/mc_route_controller.go | 62 ++-- .../multicluster/mc_route_controller_test.go | 51 +-- .../policy_only_route_controller.go | 306 ++++++++++++++++++ .../policy_only_route_controller_test.go | 257 +++++++++++++++ pkg/agent/openflow/client.go | 25 +- pkg/agent/openflow/client_test.go | 2 +- pkg/agent/openflow/multicluster.go | 19 ++ pkg/agent/openflow/testing/mock_openflow.go | 14 + test/integration/agent/cniserver_test.go | 4 +- 17 files changed, 789 insertions(+), 96 deletions(-) create mode 100644 pkg/agent/multicluster/policy_only_route_controller.go create mode 100644 pkg/agent/multicluster/policy_only_route_controller_test.go diff --git a/cmd/antrea-agent/agent.go b/cmd/antrea-agent/agent.go index a2439be3cd9..d33fb519952 100644 --- a/cmd/antrea-agent/agent.go +++ b/cmd/antrea-agent/agent.go @@ -111,6 +111,7 @@ func run(o *Options) error { serviceInformer := informerFactory.Core().V1().Services() endpointsInformer := informerFactory.Core().V1().Endpoints() namespaceInformer := informerFactory.Core().V1().Namespaces() + podInformer := informerFactory.Core().V1().Pods() // Create Antrea Clientset for the given config. antreaClientProvider := agent.NewAntreaClientProvider(o.config.AntreaClientConnection, k8sClient) @@ -134,6 +135,7 @@ func run(o *Options) error { enableBridgingMode := enableAntreaIPAM && o.config.EnableBridgingMode // Bridging mode will connect the uplink interface to the OVS bridge. connectUplinkToBridge := enableBridgingMode + multiclusterEnabled := features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable ovsDatapathType := ovsconfig.OVSDatapathType(o.config.OVSDatapathType) ovsBridgeClient := ovsconfig.NewOVSBridge(o.config.OVSBridge, ovsDatapathType, ovsdbConnection) @@ -148,7 +150,7 @@ func run(o *Options) error { connectUplinkToBridge, multicastEnabled, features.DefaultFeatureGate.Enabled(features.TrafficControl), - features.DefaultFeatureGate.Enabled(features.Multicluster), + multiclusterEnabled, ) var serviceCIDRNet *net.IPNet @@ -248,7 +250,8 @@ func run(o *Options) error { o.config.ExternalNode.ExternalNodeNamespace, features.DefaultFeatureGate.Enabled(features.AntreaProxy), o.config.AntreaProxy.ProxyAll, - connectUplinkToBridge) + connectUplinkToBridge, + multiclusterEnabled) err = agentInitializer.Initialize() if err != nil { return fmt.Errorf("error initializing agent: %v", err) @@ -281,8 +284,10 @@ func run(o *Options) error { var mcRouteController *mcroute.MCRouteController var mcInformerFactory mcinformers.SharedInformerFactory + var mcPolicyOnlyRouteController *mcroute.MCWithPolicyOnlyNodeRouteController - if features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable { + isNetworkPolicyOnly := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() + if multiclusterEnabled { mcNamespace := env.GetPodNamespace() if o.config.Multicluster.Namespace != "" { mcNamespace = o.config.Multicluster.Namespace @@ -300,6 +305,19 @@ func run(o *Options) error { nodeConfig, mcNamespace, ) + + if isNetworkPolicyOnly { + mcPolicyOnlyRouteController = mcroute.NewMCWithPolicyOnlyNodeRouteController( + mcClient, + podInformer.Informer(), + gwInformer.Informer(), + ofClient, + ovsBridgeClient, + ifaceStore, + nodeConfig, + mcNamespace, + ) + } } var groupCounters []proxytypes.GroupCounter groupIDUpdates := make(chan string, 100) @@ -441,10 +459,7 @@ func run(o *Options) error { var externalNodeController *externalnode.ExternalNodeController var localExternalNodeInformer cache.SharedIndexInformer if o.nodeType == config.K8sNode { - isChaining := false - if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { - isChaining = true - } + isChaining := isNetworkPolicyOnly cniServer = cniserver.New( o.config.CNISocket, o.config.HostProcPathPrefix, @@ -455,6 +470,7 @@ func run(o *Options) error { enableBridgingMode, enableAntreaIPAM, o.config.DisableTXChecksumOffload, + multiclusterEnabled, networkReadyCh) if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) { @@ -724,9 +740,12 @@ func run(o *Options) error { go mcastController.Run(stopCh) } - if features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable { + if multiclusterEnabled { mcInformerFactory.Start(stopCh) go mcRouteController.Run(stopCh) + if isNetworkPolicyOnly { + go mcPolicyOnlyRouteController.Run(stopCh) + } } // statsCollector collects stats and reports to the antrea-controller periodically. For now it's only used for diff --git a/cmd/antrea-agent/options.go b/cmd/antrea-agent/options.go index 5c9cce17e59..0bb61d43903 100644 --- a/cmd/antrea-agent/options.go +++ b/cmd/antrea-agent/options.go @@ -454,9 +454,9 @@ func (o *Options) validateK8sNodeOptions() error { } } if (features.DefaultFeatureGate.Enabled(features.Multicluster) || o.config.Multicluster.Enable) && - encapMode != config.TrafficEncapModeEncap { - // Only Encap mode is supported for Multi-cluster feature. - return fmt.Errorf("Multicluster is only applicable to the %s mode", config.TrafficEncapModeEncap) + !(encapMode == config.TrafficEncapModeEncap || encapMode == config.TrafficEncapModeNetworkPolicyOnly) { + // Only Encap or networkPolicyOnly is supported for Multi-cluster feature. + return fmt.Errorf("Multicluster is only applicable to the %s mode or %s mode", config.TrafficEncapModeEncap, config.TrafficEncapModeNetworkPolicyOnly) } if features.DefaultFeatureGate.Enabled(features.NodePortLocal) { startPort, endPort, err := parsePortRange(o.config.NodePortLocal.PortRange) diff --git a/multicluster/controllers/multicluster/resourceexport_controller.go b/multicluster/controllers/multicluster/resourceexport_controller.go index 47f738265ec..39d7bb8df50 100644 --- a/multicluster/controllers/multicluster/resourceexport_controller.go +++ b/multicluster/controllers/multicluster/resourceexport_controller.go @@ -356,11 +356,11 @@ func (r *ResourceExportReconciler) refreshEndpointsResourceImport( if len(svcResExport.Status.Conditions) > 0 { if svcResExport.Status.Conditions[0].Status != corev1.ConditionTrue { return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() + - "has not been converged successfully, retry later") + " has not been converged successfully, retry later") } } else { return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() + - "has not been converged yet, retry later") + " has not been converged yet, retry later") } } diff --git a/pkg/agent/agent.go b/pkg/agent/agent.go index 3eca082724a..3f998702a3a 100644 --- a/pkg/agent/agent.go +++ b/pkg/agent/agent.go @@ -105,6 +105,7 @@ type Initializer struct { // networkReadyCh should be closed once the Node's network is ready. // The CNI server will wait for it before handling any CNI Add requests. proxyAll bool + enableMulticluster bool networkReadyCh chan<- struct{} stopCh <-chan struct{} nodeType config.NodeType @@ -132,6 +133,7 @@ func NewInitializer( enableProxy bool, proxyAll bool, connectUplinkToBridge bool, + enableMulticluster bool, ) *Initializer { return &Initializer{ ovsBridgeClient: ovsBridgeClient, @@ -154,6 +156,7 @@ func NewInitializer( enableProxy: enableProxy, proxyAll: proxyAll, connectUplinkToBridge: connectUplinkToBridge, + enableMulticluster: enableMulticluster, } } @@ -755,10 +758,12 @@ func (i *Initializer) setupDefaultTunnelInterface() error { // It's not necessary for new Linux kernel versions with the following patch: // https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063. shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel) + tunnelInterfaceSupported := i.networkConfig.TrafficEncapMode.SupportsEncap() || + (i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() && i.enableMulticluster) // Check the default tunnel port. if portExists { - if i.networkConfig.TrafficEncapMode.SupportsEncap() && + if tunnelInterfaceSupported && tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType && tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort && tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) { @@ -775,7 +780,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { } if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil { - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + if tunnelInterfaceSupported { return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err) } klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err) @@ -786,7 +791,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error { } // Create the default tunnel port and interface. - if i.networkConfig.TrafficEncapMode.SupportsEncap() { + if tunnelInterfaceSupported { if tunnelPortName != defaultTunInterfaceName { // Reset the tunnel interface name to the desired name before // recreating the tunnel port and interface. diff --git a/pkg/agent/cniserver/interface_configuration_linux.go b/pkg/agent/cniserver/interface_configuration_linux.go index eced5fe74ae..1517a6f02d5 100644 --- a/pkg/agent/cniserver/interface_configuration_linux.go +++ b/pkg/agent/cniserver/interface_configuration_linux.go @@ -371,6 +371,42 @@ func (ic *ifConfigurator) configureContainerLink( } } +func (ic *ifConfigurator) configureContainerMTU( + podName string, + podNameSpace string, + containerID string, + containerNetNS string, + containerIFDev string, + hostInterfaceName string, + mtu int, +) error { + updateMTU := func(interfaceName string) error { + link, err := netlink.LinkByName(interfaceName) + if err != nil { + return fmt.Errorf("failed to find interface %s: %v", interfaceName, err) + } + err = netlink.LinkSetMTU(link, mtu) + if err != nil { + return fmt.Errorf("failed to set MTU for interface %s: %v", interfaceName, err) + } + return nil + } + + if err := ns.WithNetNSPath(containerNetNS, func(hostNS ns.NetNS) error { + if err := updateMTU(containerIFDev); err != nil { + return fmt.Errorf("error when updating container interface MTU: %v", err) + } + return nil + }); err != nil { + return err + } + + if err := updateMTU(hostInterfaceName); err != nil { + return fmt.Errorf("error when updating host interface MTU: %v", err) + } + return nil +} + func (ic *ifConfigurator) removeContainerLink(containerID, hostInterfaceName string) error { klog.V(2).Infof("Deleting veth devices for container %s", containerID) // Don't return an error if the device is already removed as CniDel can be called multiple times. diff --git a/pkg/agent/cniserver/interface_configuration_windows.go b/pkg/agent/cniserver/interface_configuration_windows.go index 40042db1256..eb3b033649a 100644 --- a/pkg/agent/cniserver/interface_configuration_windows.go +++ b/pkg/agent/cniserver/interface_configuration_windows.go @@ -173,6 +173,24 @@ func (ic *ifConfigurator) configureContainerLink( return nil } +func (ic *ifConfigurator) configureContainerMTU( + podName string, + podNameSpace string, + containerID string, + containerNetNS string, + containerIFDev string, + hostInterfaceName string, + mtu int, +) error { + infraContainerID := getInfraContainer(containerID, containerNetNS) + epName := util.GenerateContainerInterfaceName(podName, podNameSpace, infraContainerID) + ifaceName := util.VirtualAdapterName(epName) + if err := util.SetInterfaceMTU(ifaceName, mtu); err != nil { + return fmt.Errorf("failed to configure MTU on container interface '%s': %v", ifaceName, err) + } + return nil +} + // createContainerLink creates HNSEndpoint using the IP configuration in the IPAM result. func (ic *ifConfigurator) createContainerLink(endpointName string, result *current.Result, containerID, podName, podNamespace string) (hostLink *hcsshim.HNSEndpoint, err error) { containerIP, err := findContainerIPConfig(result.IPs) diff --git a/pkg/agent/cniserver/pod_configuration.go b/pkg/agent/cniserver/pod_configuration.go index 89b023ab48d..92baeb3cb32 100644 --- a/pkg/agent/cniserver/pod_configuration.go +++ b/pkg/agent/cniserver/pod_configuration.go @@ -196,6 +196,21 @@ func ParseOVSPortInterfaceConfig(portData *ovsconfig.OVSPortData, portConfig *in return interfaceConfig } +func (pc *podConfigurator) configureInterfacesMTU( + podName string, + podNameSpace string, + containerID string, + containerNetNS string, + containerIFDev string, + result *current.Result, + mtu int) error { + hostIface := result.Interfaces[0] + if err := pc.ifConfigurator.configureContainerMTU(podName, podNameSpace, containerID, containerNetNS, containerIFDev, hostIface.Name, mtu); err != nil { + return err + } + return nil +} + func (pc *podConfigurator) configureInterfaces( podName string, podNameSpace string, diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 9cdfbd89027..fe321e70eb0 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -117,6 +117,7 @@ type CNIServer struct { enableSecondaryNetworkIPAM bool disableTXChecksumOffload bool secondaryNetworkEnabled bool + multiClusterEnabled bool // networkReadyCh notifies that the network is ready so new Pods can be created. Therefore, CmdAdd waits for it. networkReadyCh <-chan struct{} } @@ -624,7 +625,7 @@ func New( nodeConfig *config.NodeConfig, kubeClient clientset.Interface, routeClient route.Interface, - isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload bool, + isChaining, enableBridgingMode, enableSecondaryNetworkIPAM, disableTXChecksumOffload, multiclusterEnabled bool, networkReadyCh <-chan struct{}, ) *CNIServer { return &CNIServer{ @@ -640,6 +641,7 @@ func New( enableBridgingMode: enableBridgingMode, disableTXChecksumOffload: disableTXChecksumOffload, enableSecondaryNetworkIPAM: enableSecondaryNetworkIPAM, + multiClusterEnabled: multiclusterEnabled, networkReadyCh: networkReadyCh, } } @@ -716,6 +718,22 @@ func (s *CNIServer) interceptAdd(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, e return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to connect container %s to ovs: %w", cniConfig.ContainerId, err) } + if s.multiClusterEnabled { + // Antrea multi-cluster supports Geneve tunnel mode only. + mtu := cniConfig.MTU - config.GeneveOverhead + if err := s.podConfigurator.configureInterfacesMTU( + podName, + podNamespace, + cniConfig.ContainerId, + s.hostNetNsPath(cniConfig.Netns), + cniConfig.Ifname, + prevResult, + mtu, + ); err != nil { + return &cnipb.CniCmdResponse{CniResult: []byte("")}, fmt.Errorf("failed to configure container %s's MTU: %w", cniConfig.ContainerId, err) + } + } + // we return prevResult, which should be exactly what we received from // the runtime, potentially converted to the current CNI version used by // Antrea. diff --git a/pkg/agent/multicluster/mc_route_controller.go b/pkg/agent/multicluster/mc_route_controller.go index 6c5966e9bc4..ce36b02692d 100644 --- a/pkg/agent/multicluster/mc_route_controller.go +++ b/pkg/agent/multicluster/mc_route_controller.go @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -package noderoute +package multicluster import ( + "errors" "fmt" "net" "time" @@ -138,16 +139,8 @@ func NewMCRouteController( func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { gw, isGW := obj.(*mcv1alpha1.Gateway) if !isGW { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", obj) - return - } - gw, ok = deletedState.Obj.(*mcv1alpha1.Gateway) - if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-Gateway object", "object", deletedState.Obj) - return - } + klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj) + return } if gw.Namespace != c.namespace { @@ -166,16 +159,8 @@ func (c *MCRouteController) enqueueGateway(obj interface{}, isDelete bool) { func (c *MCRouteController) enqueueClusterInfoImport(obj interface{}, isDelete bool) { ciImp, isciImp := obj.(*mcv1alpha1.ClusterInfoImport) if !isciImp { - deletedState, ok := obj.(cache.DeletedFinalStateUnknown) - if !ok { - klog.ErrorS(nil, "Received unexpected object", "object", obj) - return - } - ciImp, ok = deletedState.Obj.(*mcv1alpha1.ClusterInfoImport) - if !ok { - klog.ErrorS(nil, "DeletedFinalStateUnknown contains non-ClusterInfoImport object", "object", deletedState.Obj) - return - } + klog.ErrorS(errors.New("received unexpected object"), "enqueueClusterInfoImport can't process event", "obj", obj) + return } if ciImp.Namespace != c.namespace { @@ -290,7 +275,7 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return err } - activeGWChanged := c.checkGateWayIPChange(activeGW) + activeGWChanged := c.checkGatewayIPChange(activeGW) installedCIImportNames := sets.StringKeySet(c.installedCIImports) for _, ciImp := range desiredCIImports { if err = c.addMCFlowsForSingleCIImp(activeGW, ciImp, c.installedCIImports[ciImp.Name], activeGWChanged); err != nil { @@ -307,7 +292,7 @@ func (c *MCRouteController) syncMCFlowsForAllCIImps(activeGW *mcv1alpha1.Gateway return nil } -func (c *MCRouteController) checkGateWayIPChange(activeGW *mcv1alpha1.Gateway) bool { +func (c *MCRouteController) checkGatewayIPChange(activeGW *mcv1alpha1.Gateway) bool { var activeGWChanged bool if activeGW.Name == c.nodeConfig.Name { // On a Gateway Node, the GatewayIP of the active Gateway will impact the Openflow rules. @@ -391,7 +376,7 @@ func (c *MCRouteController) addMCFlowsForSingleCIImp(activeGW *mcv1alpha1.Gatewa } func (c *MCRouteController) deleteMCFlowsForSingleCIImp(ciImpName string) error { - if err := c.ofClient.UninstallMulticlusterFlows(ciImpName); err != nil { + if err := c.ofClient.UninstallMulticlusterFlows(fmt.Sprintf("cluster_%s", ciImpName)); err != nil { return fmt.Errorf("failed to uninstall multi-cluster flows to remote Gateway Node %s: %v", ciImpName, err) } delete(c.installedCIImports, ciImpName) @@ -405,27 +390,30 @@ func (c *MCRouteController) deleteMCFlowsForAllCIImps() error { return nil } -// getActiveGateway compares Gateway's CreationTimestamp to get the active Gateway, -// The last created Gateway will be the active Gateway. func (c *MCRouteController) getActiveGateway() (*mcv1alpha1.Gateway, error) { - gws, err := c.gwLister.Gateways(c.namespace).List(labels.Everything()) + activeGW, err := getActiveGateway(c.gwLister, c.namespace) if err != nil { return nil, err } - if len(gws) == 0 { + if activeGW == nil { return nil, nil } - // Comparing Gateway's CreationTimestamp to get the last created Gateway. - lastCreatedGW := gws[0] - for _, gw := range gws { - if lastCreatedGW.CreationTimestamp.Before(&gw.CreationTimestamp) { - lastCreatedGW = gw - } + if net.ParseIP(activeGW.GatewayIP) == nil || net.ParseIP(activeGW.InternalIP) == nil { + return nil, fmt.Errorf("the active Gateway %s has no valid GatewayIP or InternalIP", activeGW.Name) } - if net.ParseIP(lastCreatedGW.GatewayIP) == nil || net.ParseIP(lastCreatedGW.InternalIP) == nil { - return nil, fmt.Errorf("the last created Gateway %s has no valid GatewayIP or InternalIP", lastCreatedGW.Name) + return activeGW, nil +} + +func getActiveGateway(gwLister mclisters.GatewayLister, namespace string) (*mcv1alpha1.Gateway, error) { + gws, err := gwLister.Gateways(namespace).List(labels.Everything()) + if err != nil { + return nil, err + } + if len(gws) == 0 { + return nil, nil } - return lastCreatedGW, nil + // The Gateway webhook guarantees there will be at most one Gateway in a cluster. + return gws[0], nil } func generatePeerConfigs(subnets []string, gatewayIP net.IP) (map[*net.IPNet]net.IP, error) { diff --git a/pkg/agent/multicluster/mc_route_controller_test.go b/pkg/agent/multicluster/mc_route_controller_test.go index 0f582fbf6a6..6ca4147f8cf 100644 --- a/pkg/agent/multicluster/mc_route_controller_test.go +++ b/pkg/agent/multicluster/mc_route_controller_test.go @@ -12,10 +12,11 @@ // See the License for the specific language governing permissions and // limitations under the License. -package noderoute +package multicluster import ( "context" + "fmt" "net" "testing" "time" @@ -174,7 +175,7 @@ func TestMCRouteControllerAsGateway(t *testing.T) { // Delete a ClusterInfoImport c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()).Delete(context.TODO(), clusterInfoImport2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) + c.ofClient.EXPECT().UninstallMulticlusterFlows(fmt.Sprintf("cluster_%s", clusterInfoImport2.Name)).Times(1) c.processNextWorkItem() // Update Gateway1's GatewayIP @@ -194,28 +195,18 @@ func TestMCRouteControllerAsGateway(t *testing.T) { updatedGateway1b, metav1.UpdateOptions{}) c.processNextWorkItem() + // Delete Gateway1 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), + gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(fmt.Sprintf("cluster_%s", clusterInfoImport1.Name)).Times(1) + c.processNextWorkItem() + // Create Gateway2 as active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), gw2InternalIP).Times(1) c.processNextWorkItem() - - // Delete Gateway2, then Gateway1 become active Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), - gateway2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), true).Times(1) - c.ofClient.EXPECT().InstallMulticlusterGatewayFlows(clusterInfoImport1.Name, - gomock.Any(), peerNodeIP1, updatedGateway1aIP).Times(1) - c.processNextWorkItem() - - // Delete last Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), - gateway1.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() }() select { case <-time.After(5 * time.Second): @@ -270,7 +261,7 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { // Delete a ClusterInfoImport c.mcClient.MulticlusterV1alpha1().ClusterInfoImports(clusterInfoImport2.GetNamespace()).Delete(context.TODO(), clusterInfoImport2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport2.Name).Times(1) + c.ofClient.EXPECT().UninstallMulticlusterFlows(fmt.Sprintf("cluster_%s", clusterInfoImport2.Name)).Times(1) c.processNextWorkItem() // Update Gateway1's GatewayIP @@ -290,27 +281,17 @@ func TestMCRouteControllerAsRegularNode(t *testing.T) { gomock.Any(), updatedGateway1bIP).Times(1) c.processNextWorkItem() + // Delete Gateway1 + c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), + gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(fmt.Sprintf("cluster_%s", clusterInfoImport1.Name)).Times(1) + c.processNextWorkItem() + // Create Gateway2 as the active Gateway c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Create(context.TODO(), &gateway2, metav1.CreateOptions{}) c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, gomock.Any(), peerNodeIP2).Times(1) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.processNextWorkItem() - - // Delete Gateway2, then Gateway1 become active Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway2.GetNamespace()).Delete(context.TODO(), - gateway2.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) - c.ofClient.EXPECT().InstallMulticlusterClassifierFlows(uint32(1), false).Times(1) - c.ofClient.EXPECT().InstallMulticlusterNodeFlows(clusterInfoImport1.Name, - gomock.Any(), updatedGateway1bIP).Times(1) - c.processNextWorkItem() - - // Delete last Gateway - c.mcClient.MulticlusterV1alpha1().Gateways(gateway1.GetNamespace()).Delete(context.TODO(), - gateway1.Name, metav1.DeleteOptions{}) - c.ofClient.EXPECT().UninstallMulticlusterFlows(clusterInfoImport1.Name).Times(1) c.processNextWorkItem() }() select { diff --git a/pkg/agent/multicluster/policy_only_route_controller.go b/pkg/agent/multicluster/policy_only_route_controller.go new file mode 100644 index 00000000000..a72cc6d1d3c --- /dev/null +++ b/pkg/agent/multicluster/policy_only_route_controller.go @@ -0,0 +1,306 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "errors" + "fmt" + "net" + "sync" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/util/wait" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/klog/v2" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcclientset "antrea.io/antrea/multicluster/pkg/client/clientset/versioned" + mclisters "antrea.io/antrea/multicluster/pkg/client/listers/multicluster/v1alpha1" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + "antrea.io/antrea/pkg/agent/openflow" + "antrea.io/antrea/pkg/ovs/ovsconfig" +) + +// MCWithPolicyOnlyNodeRouteController generates necessary L3 forwarding rules +// for any Pod to forward cross-cluster traffic from Gateway to general Node +// inside a member cluster when agent is enabled with networkPolicyOnly mode. +type MCWithPolicyOnlyNodeRouteController struct { + mcClient mcclientset.Interface + ovsBridgeClient ovsconfig.OVSBridgeClient + ofClient openflow.Client + interfaceStore interfacestore.InterfaceStore + nodeConfig *config.NodeConfig + queue workqueue.RateLimitingInterface + podInformer cache.SharedIndexInformer + podLister corelisters.PodLister + gwInformer cache.SharedIndexInformer + gwLister mclisters.GatewayLister + activeGWName string + namespace string + mutex sync.Mutex + installedPods map[string]string +} + +type endpointChangeInfo struct { + isDelete bool + podIP string + podNodeIP string +} + +func NewMCWithPolicyOnlyNodeRouteController( + mcClient mcclientset.Interface, + podInformer cache.SharedIndexInformer, + gwInformer cache.SharedIndexInformer, + client openflow.Client, + ovsBridgeClient ovsconfig.OVSBridgeClient, + interfaceStore interfacestore.InterfaceStore, + nodeConfig *config.NodeConfig, + namespace string, +) *MCWithPolicyOnlyNodeRouteController { + controller := &MCWithPolicyOnlyNodeRouteController{ + mcClient: mcClient, + ovsBridgeClient: ovsBridgeClient, + ofClient: client, + interfaceStore: interfaceStore, + nodeConfig: nodeConfig, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "MCWithPolicyOnlyNodeRouteController"), + podInformer: podInformer, + podLister: corelisters.NewPodLister(podInformer.GetIndexer()), + gwInformer: gwInformer, + gwLister: mclisters.NewGatewayLister(gwInformer.GetIndexer()), + namespace: namespace, + installedPods: make(map[string]string), + } + + controller.gwInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + controller.enqueueGateway(cur, false) + }, + // Gateway UPDATE event doesn't impact Pod flows, so ignore it. + DeleteFunc: func(old interface{}) { + controller.enqueueGateway(old, true) + }, + }, + resyncPeriod, + ) + + controller.podInformer.AddEventHandlerWithResyncPeriod( + cache.ResourceEventHandlerFuncs{ + AddFunc: func(cur interface{}) { + controller.enqueuePod(cur, false) + }, + UpdateFunc: func(old, cur interface{}) { + controller.enqueuePod(cur, false) + }, + DeleteFunc: func(old interface{}) { + controller.enqueuePod(old, true) + }, + }, + resyncPeriod, + ) + return controller +} + +func (c *MCWithPolicyOnlyNodeRouteController) enqueueGateway(obj interface{}, isDelete bool) { + gw, isGW := obj.(*mcv1alpha1.Gateway) + if !isGW { + klog.ErrorS(errors.New("received unexpected object"), "enqueueGateway can't process event", "obj", obj) + return + } + + if gw.Namespace != c.namespace { + return + } + + c.mutex.Lock() + defer c.mutex.Unlock() + // Gateway webhook will guarantee that at most one Gateway in a cluster. + if isDelete { + c.activeGWName = "" + if c.nodeConfig.Name != gw.Name { + klog.InfoS("This Node is not the deleted Gateway, skip it", "node", c.nodeConfig.Name, "gateway", gw.Name) + return + } + + for podIP := range c.installedPods { + c.queue.Add(&endpointChangeInfo{ + isDelete: true, + podIP: podIP, + }) + delete(c.installedPods, podIP) + } + return + } + + c.activeGWName = gw.Name + if c.nodeConfig.Name != gw.Name { + klog.InfoS("The Node is not the Gateway, skip it", "node", c.nodeConfig.Name, "gateway", gw.Name) + return + } + pods, _ := c.podLister.List(labels.Everything()) + for _, p := range pods { + pod := p + isValidPod := c.validatePod(pod) + if !isValidPod { + continue + } + c.queue.Add(&endpointChangeInfo{ + podIP: pod.Status.PodIP, + podNodeIP: pod.Status.HostIP, + }) + c.installedPods[pod.Status.PodIP] = pod.Status.HostIP + } +} + +// validatePod validates the Pod info to determine if it's a valid Pod which should +// have Openflow flows installed for cross-cluster traffic. +func (c *MCWithPolicyOnlyNodeRouteController) validatePod(pod *corev1.Pod) bool { + podIP := pod.Status.PodIP + podNodeIP := pod.Status.HostIP + + if pod.Spec.NodeName == c.activeGWName { + klog.V(2).InfoS("Skip Pod on the Gateway Node") + return false + } + + if podIP == "" || podNodeIP == "" || pod.Spec.HostNetwork { + klog.V(2).InfoS("Pod has no valid IPs or it's HostNetwork, skip it", "name", klog.KObj(pod), + "podIP", podIP, "nodeIP", podNodeIP, "hostNetwork", pod.Spec.HostNetwork) + return false + } + + if c.installedPods[podIP] == podNodeIP { + klog.V(2).InfoS("No Pod change impacts installed Openflow rules", "name", klog.KObj(pod)) + return false + } + return true +} + +func (c *MCWithPolicyOnlyNodeRouteController) enqueuePod(obj interface{}, isDelete bool) { + pod, isPod := obj.(*corev1.Pod) + if !isPod { + klog.ErrorS(errors.New("received unexpected object"), "enqueuePod can't process event", "obj", obj) + return + } + + c.mutex.Lock() + defer c.mutex.Unlock() + if c.activeGWName == "" || c.nodeConfig.Name != c.activeGWName { + klog.InfoS("No active gateway or the Node is not the active Gateway", "node", c.nodeConfig.Name, "gateway", c.activeGWName) + return + } + + podIP := pod.Status.PodIP + podNodeIP := pod.Status.HostIP + _, exists := c.installedPods[podIP] + if isDelete && exists { + c.queue.Add(&endpointChangeInfo{ + isDelete: true, + podIP: podIP, + }) + delete(c.installedPods, podIP) + return + } + + isValidPod := c.validatePod(pod) + if !isValidPod { + return + } + c.queue.Add(&endpointChangeInfo{ + podNodeIP: podNodeIP, + podIP: podIP, + }) + c.installedPods[podIP] = podNodeIP +} + +func (c *MCWithPolicyOnlyNodeRouteController) Run(stopCh <-chan struct{}) { + controllerName := "AntreaAgentMCWithPolicyOnlyNodeRouteController" + defer c.queue.ShutDown() + + cacheSyncs := []cache.InformerSynced{c.podInformer.HasSynced, c.gwInformer.HasSynced} + klog.InfoS("Starting controller", "controller", controllerName) + defer klog.InfoS("Shutting down controller", "controller", controllerName) + if !cache.WaitForNamedCacheSync(controllerName, stopCh, cacheSyncs...) { + return + } + + c.initialize() + for i := 0; i < 5; i++ { + go wait.Until(c.worker, time.Second, stopCh) + } + <-stopCh +} + +func (c *MCWithPolicyOnlyNodeRouteController) worker() { + for c.processNextWorkItem() { + } +} + +func (c *MCWithPolicyOnlyNodeRouteController) processNextWorkItem() bool { + obj, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(obj) + if k, ok := obj.(*endpointChangeInfo); !ok { + c.queue.Forget(obj) + klog.InfoS("Expected endpointChangeInfo in work queue but got", "object", obj) + return true + } else if err := c.syncPodFlows(k); err == nil { + c.queue.Forget(k) + } else { + // Put the item back on the workqueue to handle any transient errors. + c.queue.AddRateLimited(k) + klog.ErrorS(err, "Error syncing key, requeuing", "key", k) + } + return true +} + +func (c *MCWithPolicyOnlyNodeRouteController) initialize() { + activeGW, err := getActiveGateway(c.gwLister, c.namespace) + if err != nil { + klog.ErrorS(err, "Failed to get an active Gateway") + } + + if activeGW != nil { + c.mutex.Lock() + defer c.mutex.Unlock() + c.activeGWName = activeGW.Name + klog.InfoS("Found an active Gateway", "gateway", klog.KObj(activeGW)) + } +} + +func (c *MCWithPolicyOnlyNodeRouteController) syncPodFlows(key *endpointChangeInfo) error { + if key.isDelete { + klog.V(2).InfoS("Deleting Multi-cluster flows for Pod", "podIP", key.podIP) + if err := c.ofClient.UninstallMulticlusterFlows(fmt.Sprintf("pod_%s", key.podIP)); err != nil { + klog.ErrorS(err, "Failed to uninstall Multi-cluster flows for Pod", "podIP", key.podIP) + return err + } + } else { + klog.V(2).InfoS("Adding Multi-cluster flows for Pod", "podIP", key.podIP, "nodeIP", key.podNodeIP) + if err := c.ofClient.InstallMulticlusterPodFlows(net.ParseIP(key.podIP), net.ParseIP(key.podNodeIP)); err != nil { + klog.ErrorS(err, "Failed to install Multi-cluster flows for Pod", "podIP", key.podIP, "nodeIP", key.podNodeIP) + return err + } + } + return nil +} diff --git a/pkg/agent/multicluster/policy_only_route_controller_test.go b/pkg/agent/multicluster/policy_only_route_controller_test.go new file mode 100644 index 00000000000..8c87f0deca4 --- /dev/null +++ b/pkg/agent/multicluster/policy_only_route_controller_test.go @@ -0,0 +1,257 @@ +// Copyright 2022 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package multicluster + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/assert" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/informers" + k8sfake "k8s.io/client-go/kubernetes/fake" + + mcv1alpha1 "antrea.io/antrea/multicluster/apis/multicluster/v1alpha1" + mcfake "antrea.io/antrea/multicluster/pkg/client/clientset/versioned/fake" + mcinformers "antrea.io/antrea/multicluster/pkg/client/informers/externalversions" + "antrea.io/antrea/pkg/agent/config" + "antrea.io/antrea/pkg/agent/interfacestore" + oftest "antrea.io/antrea/pkg/agent/openflow/testing" + ovsconfigtest "antrea.io/antrea/pkg/ovs/ovsconfig/testing" +) + +var ( + defaultNs = "default" + node1Name = "node-1" + ctx = context.TODO() + + nginx1NoIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx1", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + } + + nginx2WithIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx2", + }, + Spec: corev1.PodSpec{ + NodeName: "node-2", + }, + Status: corev1.PodStatus{ + PodIP: "192.168.1.12", + HostIP: "10.170.10.11", + }, + } + + nginx2PodIP = net.ParseIP("192.168.1.12") + nginx2HostIP = net.ParseIP("10.170.10.11") + + nginx3WithIPs = &corev1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: defaultNs, + Name: "nginx3", + }, + Spec: corev1.PodSpec{ + NodeName: node1Name, + }, + Status: corev1.PodStatus{ + PodIP: "192.168.10.13", + HostIP: "10.170.10.13", + }, + } +) + +type fakePolicyOnlyRouteController struct { + *MCWithPolicyOnlyNodeRouteController + mcClient *mcfake.Clientset + k8sClient *k8sfake.Clientset + informerFactory informers.SharedInformerFactory + mcInformerFactory mcinformers.SharedInformerFactory + ofClient *oftest.MockClient + ovsClient *ovsconfigtest.MockOVSBridgeClient + interfaceStore interfacestore.InterfaceStore +} + +func newPolicyOnlyRouteController(t *testing.T, nodeConfig *config.NodeConfig, mcClient *mcfake.Clientset, + k8sClient *k8sfake.Clientset) (*fakePolicyOnlyRouteController, func()) { + mcInformerFactory := mcinformers.NewSharedInformerFactory(mcClient, 0) + gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways() + + informerFactory := informers.NewSharedInformerFactory(k8sClient, 0) + podInformer := informerFactory.Core().V1().Pods() + + ctrl := gomock.NewController(t) + ofClient := oftest.NewMockClient(ctrl) + ovsClient := ovsconfigtest.NewMockOVSBridgeClient(ctrl) + interfaceStore := interfacestore.NewInterfaceStore() + c := NewMCWithPolicyOnlyNodeRouteController( + mcClient, + podInformer.Informer(), + gwInformer.Informer(), + ofClient, + ovsClient, + interfaceStore, + nodeConfig, + defaultNs, + ) + return &fakePolicyOnlyRouteController{ + MCWithPolicyOnlyNodeRouteController: c, + mcClient: mcClient, + k8sClient: k8sClient, + mcInformerFactory: mcInformerFactory, + informerFactory: informerFactory, + ofClient: ofClient, + ovsClient: ovsClient, + interfaceStore: interfaceStore, + }, ctrl.Finish +} + +func TestInitialize(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset() + c, closeFn := newPolicyOnlyRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + + mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(context.TODO(), &gateway1, metav1.CreateOptions{}) + if err := waitForGatewayRealized(mcClient, &gateway1); err != nil { + t.Errorf("error when waiting for Gateway '%s/%s' to be realized: %v", gateway1.Namespace, gateway1.Name, err) + } + c.initialize() + assert.Equal(t, gateway1.Name, c.activeGWName) +} + +func waitForGatewayRealized(clientset *mcfake.Clientset, gateway *mcv1alpha1.Gateway) error { + return wait.Poll(time.Millisecond, 1*time.Second, func() (bool, error) { + _, err := clientset.MulticlusterV1alpha1().Gateways(gateway.Namespace).Get(context.TODO(), gateway.Name, metav1.GetOptions{}) + if err != nil { + return false, nil + } + return true, err + }) +} + +func TestGatewayEvent(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset([]runtime.Object{nginx1NoIPs, nginx2WithIPs, nginx3WithIPs}...) + c, closeFn := newPolicyOnlyRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + + // Create a Gateway node-2 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway2, metav1.CreateOptions{}) + // Delete a Gateway node-2 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Delete(ctx, gateway2.Name, metav1.DeleteOptions{}) + + // Create a Gateway node-1 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(nginx2PodIP, nginx2HostIP) + c.processNextWorkItem() + + // Delete a Gateway node-1 + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Delete(ctx, gateway1.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(fmt.Sprintf("pod_%s", nginx2PodIP)) + c.processNextWorkItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} + +func TestPodEvent(t *testing.T) { + mcClient := mcfake.NewSimpleClientset() + k8sClient := k8sfake.NewSimpleClientset([]runtime.Object{nginx2WithIPs}...) + c, closeFn := newPolicyOnlyRouteController(t, &config.NodeConfig{Name: node1Name}, mcClient, k8sClient) + defer closeFn() + defer c.queue.ShutDown() + + stopCh := make(chan struct{}) + defer close(stopCh) + c.informerFactory.Start(stopCh) + c.informerFactory.WaitForCacheSync(stopCh) + c.mcInformerFactory.Start(stopCh) + c.mcInformerFactory.WaitForCacheSync(stopCh) + + finishCh := make(chan struct{}) + go func() { + defer close(finishCh) + // Create a Gateway + c.mcClient.MulticlusterV1alpha1().Gateways(defaultNs).Create(ctx, &gateway1, metav1.CreateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(nginx2PodIP, nginx2HostIP) + c.processNextWorkItem() + + // Create a Pod without IPs + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginx1NoIPs, metav1.CreateOptions{}) + + // Create a Pod in the Gateway Node + c.k8sClient.CoreV1().Pods(defaultNs).Create(ctx, nginx3WithIPs, metav1.CreateOptions{}) + + // Update a Pod with IPs + nginx1Updated := nginx1NoIPs.DeepCopy() + nginx1Updated.Status.PodIP = "192.168.10.11" + nginx1Updated.Status.HostIP = "172.16.10.11" + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1Updated, metav1.UpdateOptions{}) + c.ofClient.EXPECT().InstallMulticlusterPodFlows(net.ParseIP("192.168.10.11"), net.ParseIP("172.16.10.11")) + c.processNextWorkItem() + + // Update a Pod's label + nginx1UpdatedLabel := nginx1Updated.DeepCopy() + nginx1UpdatedLabel.Labels = map[string]string{"env": "test"} + c.k8sClient.CoreV1().Pods(defaultNs).Update(ctx, nginx1Updated, metav1.UpdateOptions{}) + + // Delete a Pod + c.k8sClient.CoreV1().Pods(defaultNs).Delete(ctx, nginx1Updated.Name, metav1.DeleteOptions{}) + c.ofClient.EXPECT().UninstallMulticlusterFlows(fmt.Sprintf("pod_%s", net.ParseIP("192.168.10.11"))) + c.processNextWorkItem() + }() + select { + case <-time.After(5 * time.Second): + t.Errorf("Test didn't finish in time") + case <-finishCh: + } +} diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 6634bce619a..61e52d0ea99 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -341,9 +341,13 @@ type Client interface { // InstallMulticlusterClassifierFlows installs flows to classify cross-cluster packets. InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error - // UninstallMulticlusterFlows removes cross-cluster flows matching the given clusterID on + // InstallMulticlusterPodFlows installs flows to handle cross-cluster packets from a Gateway to + // local general Nodes. + InstallMulticlusterPodFlows(podIP net.IP, tunnelPeerIP net.IP) error + + // UninstallMulticlusterFlows removes cross-cluster flows matching the given cache key on // a regular Node or a Gateway. - UninstallMulticlusterFlows(clusterID string) error + UninstallMulticlusterFlows(cacheKey string) error // InstallVMUplinkFlows installs flows to forward packet between uplinkPort and hostPort. On a VM, the // uplink and host internal port are paired directly, and no layer 2/3 forwarding flow is installed. @@ -1384,6 +1388,7 @@ func (c *client) InstallMulticlusterGatewayFlows(clusterID string, // InstallMulticlusterClassifierFlows adds the following flows: // - One flow in L2ForwardingCalcTable for the global virtual multicluster MAC 'aa:bb:cc:dd:ee:f0' // to set its target output port as 'antrea-tun0'. This flow will be on both Gateway and regular Node. +// - One flow in ClassifierTable for the tunnel traffic if it's networkPolicyOnly mode. // - One flow to match MC virtual MAC 'aa:bb:cc:dd:ee:f0' in ClassifierTable for Gateway only. // - One flow in L2ForwardingOutTable to allow multicluster hairpin traffic for Gateway only. func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGateway bool) error { @@ -1394,6 +1399,10 @@ func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGatew c.featurePodConnectivity.l2ForwardCalcFlow(GlobalVirtualMACForMulticluster, tunnelOFPort), } + if c.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() { + flows = append(flows, c.featurePodConnectivity.tunnelClassifierFlow(tunnelOFPort)) + } + if isGateway { flows = append(flows, c.featureMulticluster.tunnelClassifierFlow(tunnelOFPort), @@ -1403,9 +1412,17 @@ func (c *client) InstallMulticlusterClassifierFlows(tunnelOFPort uint32, isGatew return c.modifyFlows(c.featureMulticluster.cachedFlows, "multicluster-classifier", flows) } -func (c *client) UninstallMulticlusterFlows(clusterID string) error { +func (c *client) InstallMulticlusterPodFlows(podIP net.IP, tunnelPeerIP net.IP) error { + c.replayMutex.RLock() + defer c.replayMutex.RUnlock() + cacheKey := fmt.Sprintf("pod_%s", podIP.String()) + localGatewayMAC := c.nodeConfig.GatewayConfig.MAC + flows := []binding.Flow{c.featureMulticluster.l3FwdFlowToLocalViaTun(localGatewayMAC, podIP, tunnelPeerIP)} + return c.modifyFlows(c.featureMulticluster.cachedFlows, cacheKey, flows) +} + +func (c *client) UninstallMulticlusterFlows(cacheKey string) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() - cacheKey := fmt.Sprintf("cluster_%s", clusterID) return c.deleteFlows(c.featureMulticluster.cachedFlows, cacheKey) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index f55670ebebe..28bbd5fe1f0 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -585,7 +585,7 @@ func TestMulticlusterFlowsInstallation(t *testing.T) { require.Len(t, fCacheI.(flowCache), 2) m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1) - err = ofClient.UninstallMulticlusterFlows(clusterID) + err = ofClient.UninstallMulticlusterFlows(cacheKey) require.NoError(t, err) _, ok = client.featureMulticluster.cachedFlows.Load(cacheKey) require.False(t, ok) diff --git a/pkg/agent/openflow/multicluster.go b/pkg/agent/openflow/multicluster.go index 71a50ee56c8..adc4f562dba 100644 --- a/pkg/agent/openflow/multicluster.go +++ b/pkg/agent/openflow/multicluster.go @@ -161,3 +161,22 @@ func (f *featureMulticluster) snatConntrackFlows(serviceCIDR net.IPNet, localGat ) return flows } + +func (f *featureMulticluster) l3FwdFlowToLocalViaTun( + localGatewayMAC net.HardwareAddr, + podIP net.IP, + tunnelPeer net.IP) binding.Flow { + ipProtocol := getIPProtocol(podIP) + // This generates the flow to forward cross-cluster request packets based + // on Pod IP. + return L3ForwardingTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(ipProtocol). + MatchDstIP(podIP). + MatchDstMAC(GlobalVirtualMACForMulticluster). + Action().SetSrcMAC(localGatewayMAC). // Rewrite src MAC to local gateway MAC. + Action().SetTunnelDst(tunnelPeer). // Flow based tunnel. Set tunnel destination. + Action().LoadRegMark(ToTunnelRegMark). + Action().GotoTable(L3DecTTLTable.GetID()). + Done() +} diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index 393728772f0..3a85bf6320a 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -381,6 +381,20 @@ func (mr *MockClientMockRecorder) InstallMulticlusterNodeFlows(arg0, arg1, arg2 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterNodeFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterNodeFlows), arg0, arg1, arg2) } +// InstallMulticlusterPodFlows mocks base method +func (m *MockClient) InstallMulticlusterPodFlows(arg0, arg1 net.IP) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "InstallMulticlusterPodFlows", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// InstallMulticlusterPodFlows indicates an expected call of InstallMulticlusterPodFlows +func (mr *MockClientMockRecorder) InstallMulticlusterPodFlows(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallMulticlusterPodFlows", reflect.TypeOf((*MockClient)(nil).InstallMulticlusterPodFlows), arg0, arg1) +} + // InstallNodeFlows mocks base method func (m *MockClient) InstallNodeFlows(arg0 string, arg1 map[*net.IPNet]net.IP, arg2 *ip.DualStackIPs, arg3 uint32, arg4 net.HardwareAddr) error { m.ctrl.T.Helper() diff --git a/test/integration/agent/cniserver_test.go b/test/integration/agent/cniserver_test.go index 8761b4830a1..13bd869c57b 100644 --- a/test/integration/agent/cniserver_test.go +++ b/test/integration/agent/cniserver_test.go @@ -573,7 +573,7 @@ func newTester() *cmdAddDelTester { testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, - false, false, false, false, + false, false, false, false, false, tester.networkReadyCh) tester.server.Initialize(ovsServiceMock, ofServiceMock, ifaceStore, channel.NewSubscribableChannel("PodUpdate", 100), nil) ctx := context.Background() @@ -737,7 +737,7 @@ func setupChainTest( testNodeConfig, k8sFake.NewSimpleClientset(), routeMock, - true, false, false, false, + true, false, false, false, false, networkReadyCh) } else { server = inServer