diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 3c911bedc29..c618d293420 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -178,7 +178,7 @@ func (p *proxier) removeStaleServices() { } // Remove associated Endpoints flows. if endpoints, ok := p.endpointsInstalledMap[svcPortName]; ok { - if !p.removeStaleEndpoints(svcPortName, svcInfo.OFProtocol, endpoints) { + if !p.removeStaleEndpoints(svcPortName, svcInfo, endpoints) { continue } delete(p.endpointsInstalledMap, svcPortName) @@ -193,8 +193,8 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool { svcInfoStr := svcInfo.String() svcPort := uint16(svcInfo.Port()) svcProto := svcInfo.OFProtocol - // Remove ClusterIP flows. - if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), svcPort, svcProto); err != nil { + // Remove ClusterIP flows and configurations. + if err := p.uninstallClusterIPService(svcInfo.ClusterIP(), svcPort, svcProto); err != nil { klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceInfo", svcInfoStr) return false } @@ -265,7 +265,8 @@ func (p *proxier) removeServiceGroup(svcPortName k8sproxy.ServicePortName, local // given Service. If the Endpoints are still referenced by any other Services, no flow will be removed. // The method only returns an error if a data path operation fails. If the flows are successfully // removed from the data path, the method returns nil. -func (p *proxier) removeStaleEndpoints(svcPortName k8sproxy.ServicePortName, protocol binding.Protocol, staleEndpoints map[string]k8sproxy.Endpoint) bool { +func (p *proxier) removeStaleEndpoints(svcPortName k8sproxy.ServicePortName, svcInfo *types.ServiceInfo, staleEndpoints map[string]k8sproxy.Endpoint) bool { + protocol := svcInfo.OFProtocol var endpointsToRemove []k8sproxy.Endpoint // Get all Endpoints whose reference counter is 1, and these Endpoints should be removed. @@ -303,6 +304,37 @@ func (p *proxier) removeStaleEndpoints(svcPortName k8sproxy.ServicePortName, pro return true } +func (p *proxier) removeStaleEndpointConntrackEntries(svcPortName k8sproxy.ServicePortName, svcInfo *types.ServiceInfo, staleEndpoints map[string]k8sproxy.Endpoint) bool { + svcPort := uint16(svcInfo.Port()) + nodePort := uint16(svcInfo.NodePort()) + svcProto := svcInfo.OFProtocol + var svcIPs []net.IP + svcIPs = append(svcIPs, svcInfo.ClusterIP()) + for _, externalIP := range svcInfo.ExternalIPStrings() { + svcIPs = append(svcIPs, net.ParseIP(externalIP)) + } + for _, ingressIP := range svcInfo.LoadBalancerIPStrings() { + if ingressIP != "" { + svcIPs = append(svcIPs, net.ParseIP(ingressIP)) + } + } + for _, endpoint := range staleEndpoints { + for _, svcIP := range svcIPs { + if err := p.routeClient.ClearConntrackEntryForService(svcIP, svcPort, net.ParseIP(endpoint.IP()), svcProto); err != nil { + klog.ErrorS(err, "Error when removing conntrack of stale Endpoints for Service", "ServicePortName", svcPortName, "ServiceIP", svcIP, "ServicePort", svcPort, "endpoint", endpoint) + return false + } + } + if nodePort > 0 { + if err := p.routeClient.ClearConntrackEntryForService(nil, nodePort, net.ParseIP(endpoint.IP()), svcProto); err != nil { + klog.ErrorS(err, "Error when removing conntrack of stale Endpoints for Service", "ServicePortName", svcPortName, "NodePort", nodePort, "endpoint", endpoint) + return false + } + } + } + return true +} + func (p *proxier) addNewEndpoints(svcPortName k8sproxy.ServicePortName, protocol binding.Protocol, newEndpoints map[string]k8sproxy.Endpoint) bool { var endpointsToAdd []k8sproxy.Endpoint @@ -366,6 +398,19 @@ func smallSliceDifference(s1, s2 []string) []string { return diff } +func (p *proxier) uninstallClusterIPService(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error { + if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil { + return fmt.Errorf("failed to remove ClusterIP flows: %w", err) + } + if needClearConntrackEntries(protocol) { + // Remove ClusterIP conntrack entries when protocol is UDP. + if err := p.routeClient.ClearConntrackEntryForService(svcIP, svcPort, nil, protocol); err != nil { + return fmt.Errorf("failed to clearn ClusterIP UDP conntrack entries: %w", err) + } + } + return nil +} + func (p *proxier) installNodePortService(externalGroupID, clusterGroupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { if svcPort == 0 { return nil @@ -397,6 +442,12 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err) } + if needClearConntrackEntries(protocol) { + // Remove NodePort conntrack entries when protocol is UDP. + if err := p.routeClient.ClearConntrackEntryForService(nil, svcPort, nil, protocol); err != nil { + return fmt.Errorf("failed to clearn NodePort UDP conntrack entries: %w", err) + } + } return nil } @@ -414,6 +465,7 @@ func (p *proxier) installExternalIPService(svcInfoStr string, externalGroupID, c } func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPStrings []string, svcPort uint16, protocol binding.Protocol) error { + needClearConntrack := needClearConntrackEntries(protocol) for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil { @@ -422,6 +474,12 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil { return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err) } + if needClearConntrack { + // Remove ExternalIP conntrack entries when protocol is UDP. + if err := p.routeClient.ClearConntrackEntryForService(ip, svcPort, nil, protocol); err != nil { + return fmt.Errorf("failed to clearn ExternalIP UDP conntrack entries: %w", err) + } + } } return nil } @@ -461,6 +519,7 @@ func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn } func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error { + needClearConntrack := needClearConntrackEntries(protocol) for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) @@ -472,6 +531,12 @@ func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIP return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err) } } + if needClearConntrack { + // Remove LoadBalancer conntrack entries when protocol is UDP. + if err := p.routeClient.ClearConntrackEntryForService(ip, svcPort, nil, protocol); err != nil { + return fmt.Errorf("failed to clearn LoadBalancer UDP conntrack entries: %w", err) + } + } } } return nil @@ -542,9 +607,14 @@ func (p *proxier) installServices() { if !p.addNewEndpoints(svcPortName, svcInfo.OFProtocol, newEndpoints) { continue } - if !p.removeStaleEndpoints(svcPortName, svcInfo.OFProtocol, staleEndpoints) { + if !p.removeStaleEndpoints(svcPortName, svcInfo, staleEndpoints) { continue } + if needClearConntrackEntries(svcInfo.OFProtocol) { + if !p.removeStaleEndpointConntrackEntries(svcPortName, svcInfo, staleEndpoints) { + continue + } + } } withSessionAffinity := svcInfo.SessionAffinityType() == corev1.ServiceAffinityClientIP @@ -1370,3 +1440,7 @@ func NewProxier(hostname string, return proxier, nil } + +func needClearConntrackEntries(protocol binding.Protocol) bool { + return protocol == binding.ProtocolUDP || protocol == binding.ProtocolUDPv6 +} diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index c9acb328267..caf583208cf 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -1151,10 +1151,26 @@ func TestDualStackService(t *testing.T) { assert.Contains(t, fpv6.serviceInstalledMap, svcPortName) } -func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bool, nodeLocalInternal, endpointSliceEnabled bool) { +func getAPIProtocol(bindingProtocol binding.Protocol) corev1.Protocol { + switch bindingProtocol { + case binding.ProtocolUDP, binding.ProtocolUDPv6: + return corev1.ProtocolUDP + case binding.ProtocolTCP, binding.ProtocolTCPv6: + return corev1.ProtocolTCP + case binding.ProtocolSCTP, binding.ProtocolSCTPv6: + return corev1.ProtocolSCTP + default: + return "" + } +} + +func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, bindingProtocol binding.Protocol, isIPv6 bool, nodeLocalInternal, endpointSliceEnabled bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) options := []proxyOptionsFn{withProxyAll, withSupportNestedService} if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) @@ -1165,26 +1181,21 @@ func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bo if nodeLocalInternal { internalTrafficPolicy = corev1.ServiceInternalTrafficPolicyLocal } - svc := makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), corev1.ProtocolTCP, nil, &internalTrafficPolicy, true, nil) + svc := makeTestClusterIPService(&svcPortName, svcIP, []net.IP{externalIP}, int32(svcPort), apiProtocol, nil, &internalTrafficPolicy, true, nil) makeServiceMap(fp, svc) var ep *corev1.Endpoints var eps *discovery.EndpointSlice if !endpointSliceEnabled { - epSubset := makeTestEndpointSubset(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + epSubset := makeTestEndpointSubset(&svcPortName, epIP, int32(svcPort), apiProtocol, false) ep = makeTestEndpoints(&svcPortName, []corev1.EndpointSubset{*epSubset}) makeEndpointsMap(fp, ep) } else { - epSubset, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + epSubset, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps = makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*epSubset}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) } - bindingProtocol := binding.ProtocolTCP - if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 - } - internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) var externalGroupID, clusterGroupID binding.GroupIDType if nodeLocalInternal == false { @@ -1221,6 +1232,12 @@ func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bo mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) } + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) + if externalIP != nil { + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol) + } + } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1240,10 +1257,13 @@ func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bo assert.False(t, exists) } -func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP net.IP, isIPv6 bool, endpointSliceEnabled bool) { +func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP net.IP, bindingProtocol binding.Protocol, isIPv6 bool, endpointSliceEnabled bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) options := []proxyOptionsFn{withProxyAll} if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) @@ -1255,7 +1275,7 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa []net.IP{externalIP}, int32(svcPort), int32(svcNodePort), - corev1.ProtocolTCP, + apiProtocol, nil, corev1.ServiceInternalTrafficPolicyCluster, corev1.ServiceExternalTrafficPolicyTypeLocal) @@ -1264,19 +1284,17 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa var ep *corev1.Endpoints var eps *discovery.EndpointSlice if !endpointSliceEnabled { - epSubset := makeTestEndpointSubset(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + epSubset := makeTestEndpointSubset(&svcPortName, epIP, int32(svcPort), apiProtocol, false) ep = makeTestEndpoints(&svcPortName, []corev1.EndpointSubset{*epSubset}) makeEndpointsMap(fp, ep) } else { - epSubset, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + epSubset, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps = makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*epSubset}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) } - bindingProtocol := binding.ProtocolTCP vIP := agentconfig.VirtualNodePortDNATIPv4 if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 vIP = agentconfig.VirtualNodePortDNATIPv6 } @@ -1303,6 +1321,13 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) } + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(nil, uint16(svcNodePort), nil, bindingProtocol) + if externalIP != nil { + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol) + } + } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1320,10 +1345,13 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa assert.NotContains(t, fp.endpointsInstalledMap, svcPortName) } -func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP, loadBalancerIP net.IP, isIPv6 bool, endpointSliceEnabled bool) { +func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP, loadBalancerIP net.IP, bindingProtocol binding.Protocol, isIPv6 bool, endpointSliceEnabled bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) options := []proxyOptionsFn{withProxyAll} if !endpointSliceEnabled { options = append(options, withoutEndpointSlice) @@ -1339,7 +1367,7 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, ext []net.IP{loadBalancerIP}, int32(svcPort), int32(svcNodePort), - corev1.ProtocolTCP, + apiProtocol, nil, &internalTrafficPolicy, externalTrafficPolicy) @@ -1348,19 +1376,17 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, ext var ep *corev1.Endpoints var eps *discovery.EndpointSlice if !endpointSliceEnabled { - epSubset := makeTestEndpointSubset(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, true) + epSubset := makeTestEndpointSubset(&svcPortName, epIP, int32(svcPort), apiProtocol, true) ep = makeTestEndpoints(&svcPortName, []corev1.EndpointSubset{*epSubset}) makeEndpointsMap(fp, ep) } else { - epSubset, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, true) + epSubset, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, true) eps = makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*epSubset}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) } - bindingProtocol := binding.ProtocolTCP vIP := agentconfig.VirtualNodePortDNATIPv4 if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 vIP = agentconfig.VirtualNodePortDNATIPv6 } @@ -1391,6 +1417,14 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, ext mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(externalIP) } + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(nil, uint16(svcNodePort), nil, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), nil, bindingProtocol) + if externalIP != nil { + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), nil, bindingProtocol) + } + } fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1409,81 +1443,148 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, ext } func TestClusterIPRemove(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { + t.Run("IPv4 TCP", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, false, false, false) + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolTCP, false, false, false) }) t.Run("InternalTrafficPolicy Local", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, false, true, false) + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolTCP, false, true, false) }) }) t.Run("EndpointSlice", func(t *testing.T) { t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, false, false, true) + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolTCP, false, false, true) }) t.Run("InternalTrafficPolicy Local", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, false, true, true) + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolTCP, false, true, true) }) }) }) - t.Run("IPv6", func(t *testing.T) { + t.Run("IPv4 UDP", func(t *testing.T) { + t.Run("Endpoints", func(t *testing.T) { + t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolUDP, false, false, false) + }) + t.Run("InternalTrafficPolicy Local", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolUDP, false, true, false) + }) + }) + t.Run("EndpointSlice", func(t *testing.T) { + t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolUDP, false, false, true) + }) + t.Run("InternalTrafficPolicy Local", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolUDP, false, true, true) + }) + }) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + t.Run("Endpoints", func(t *testing.T) { + t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolTCPv6, true, false, false) + }) + t.Run("InternalTrafficPolicy Local", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolTCPv6, true, true, false) + }) + }) + t.Run("EndpointSlice", func(t *testing.T) { + t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolTCPv6, true, false, true) + }) + t.Run("InternalTrafficPolicy Local", func(t *testing.T) { + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolTCPv6, true, true, true) + }) + }) + }) + t.Run("IPv6 UDP", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, true, false, false) + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolUDPv6, true, false, false) }) t.Run("InternalTrafficPolicy Local", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, true, true, false) + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolUDPv6, true, true, false) }) }) t.Run("EndpointSlice", func(t *testing.T) { t.Run("InternalTrafficPolicy Cluster", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, true, false, true) + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolUDPv6, true, false, true) }) t.Run("InternalTrafficPolicy Local", func(t *testing.T) { - testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, true, true, true) + testClusterIPRemove(t, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolUDPv6, true, true, true) }) }) }) } func TestNodePortRemove(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { + t.Run("IPv4 TCP", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { - testNodePortRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, false, false) + testNodePortRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolTCP, false, false) }) t.Run("EndpointSlice", func(t *testing.T) { - testNodePortRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, false, true) + testNodePortRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolTCP, false, true) }) }) - t.Run("IPv6", func(t *testing.T) { + t.Run("IPv4 UDP", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { - testNodePortRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, true, false) + testNodePortRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolUDP, false, false) }) t.Run("EndpointSlice", func(t *testing.T) { - testNodePortRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, true, true) + testNodePortRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, binding.ProtocolUDP, false, true) + }) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + t.Run("Endpoints", func(t *testing.T) { + testNodePortRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolTCPv6, true, false) + }) + t.Run("EndpointSlice", func(t *testing.T) { + testNodePortRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolTCPv6, true, true) + }) + }) + t.Run("IPv6 UDP", func(t *testing.T) { + t.Run("Endpoints", func(t *testing.T) { + testNodePortRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolUDPv6, true, false) + }) + t.Run("EndpointSlice", func(t *testing.T) { + testNodePortRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, binding.ProtocolUDPv6, true, true) }) }) } func TestLoadBalancerRemove(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { + t.Run("IPv4 TCP", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { - testLoadBalancerRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, false, false) + testLoadBalancerRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, binding.ProtocolTCP, false, false) }) t.Run("EndpointSlice", func(t *testing.T) { - testLoadBalancerRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, false, true) + testLoadBalancerRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, binding.ProtocolTCP, false, true) }) }) - t.Run("IPv6", func(t *testing.T) { + t.Run("IPv4 UDP", func(t *testing.T) { t.Run("Endpoints", func(t *testing.T) { - testLoadBalancerRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, true, false) + testLoadBalancerRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, binding.ProtocolUDP, false, false) }) t.Run("EndpointSlice", func(t *testing.T) { - testLoadBalancerRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, true, true) + testLoadBalancerRemove(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, binding.ProtocolUDP, false, true) + }) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + t.Run("Endpoints", func(t *testing.T) { + testLoadBalancerRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, binding.ProtocolTCPv6, true, false) + }) + t.Run("EndpointSlice", func(t *testing.T) { + testLoadBalancerRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, binding.ProtocolTCPv6, true, true) + }) + }) + t.Run("IPv6 UDP", func(t *testing.T) { + t.Run("Endpoints", func(t *testing.T) { + testLoadBalancerRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, binding.ProtocolUDPv6, true, false) + }) + t.Run("EndpointSlice", func(t *testing.T) { + testLoadBalancerRemove(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, binding.ProtocolUDPv6, true, true) }) }) - } func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { @@ -1689,6 +1790,7 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), epIP, protocolUDP) fp.endpointsChanges.OnEndpointSliceUpdate(epsUDP, true) fp.syncProxyRules() @@ -1707,28 +1809,49 @@ func TestClusterIPRemoveSamePortEndpoint(t *testing.T) { }) } -func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { +func testLoadBalancerRemoveEndpoints(t *testing.T, nodePortAddresses []net.IP, svcIP, externalIP, epIP, loadBalancerIP net.IP, bindingProtocol binding.Protocol, isIPv6 bool) { ctrl := gomock.NewController(t) mockOFClient, mockRouteClient := getMockClients(ctrl) groupAllocator := openflow.NewGroupAllocator(isIPv6) - fp := newFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) + apiProtocol := getAPIProtocol(bindingProtocol) + // Create a ServicePort with a specific protocol, avoiding using the global variable 'svcPortName' which is set to TCP protocol. + svcPortName := makeSvcPortName("ns", "svc", strconv.Itoa(svcPort), apiProtocol) + fp := newFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) - svc := makeTestClusterIPService(&svcPortName, svcIP, nil, int32(svcPort), corev1.ProtocolTCP, nil, nil, false, nil) + externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeCluster + internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster + + svc := makeTestLoadBalancerService(&svcPortName, + svcIP, + []net.IP{externalIP}, + []net.IP{loadBalancerIP}, + int32(svcPort), + int32(svcNodePort), + apiProtocol, + nil, + &internalTrafficPolicy, + externalTrafficPolicy) makeServiceMap(fp, svc) - ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), corev1.ProtocolTCP, false) + ep, epPort := makeTestEndpointSliceEndpointAndPort(&svcPortName, epIP, int32(svcPort), apiProtocol, false) eps := makeTestEndpointSlice(svcPortName.Namespace, svcPortName.Name, []discovery.Endpoint{*ep}, []discovery.EndpointPort{*epPort}, isIPv6) makeEndpointSliceMap(fp, eps) - bindingProtocol := binding.ProtocolTCP + vIP := agentconfig.VirtualNodePortDNATIPv4 if isIPv6 { - bindingProtocol = binding.ProtocolTCPv6 + vIP = agentconfig.VirtualNodePortDNATIPv6 } groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) + mockRouteClient.EXPECT().AddExternalIPRoute(externalIP).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1736,6 +1859,12 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) + if needClearConntrackEntries(bindingProtocol) { + mockRouteClient.EXPECT().ClearConntrackEntryForService(svcIP, uint16(svcPort), epIP, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(nil, uint16(svcNodePort), epIP, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(loadBalancerIP, uint16(svcPort), epIP, bindingProtocol) + mockRouteClient.EXPECT().ClearConntrackEntryForService(externalIP, uint16(svcPort), epIP, bindingProtocol) + } fp.endpointsChanges.OnEndpointSliceUpdate(eps, true) fp.syncProxyRules() @@ -1746,12 +1875,18 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv fp.syncProxyRules() } -func TestClusterIPRemoveEndpoints(t *testing.T) { - t.Run("IPv4", func(t *testing.T) { - testClusterIPRemoveEndpoints(t, svc1IPv4, ep1IPv4, false) +func TestLoadBalancerRemoveEndpoints(t *testing.T) { + t.Run("IPv4 TCP", func(t *testing.T) { + testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, binding.ProtocolTCP, false) }) - t.Run("IPv6", func(t *testing.T) { - testClusterIPRemoveEndpoints(t, svc1IPv6, ep1IPv6, true) + t.Run("IPv4 UDP", func(t *testing.T) { + testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv4, svc1IPv4, externalIPv4, ep1IPv4, loadBalancerIPv4, binding.ProtocolUDP, false) + }) + t.Run("IPv6 TCP", func(t *testing.T) { + testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, binding.ProtocolTCPv6, true) + }) + t.Run("IPv6 UDP", func(t *testing.T) { + testLoadBalancerRemoveEndpoints(t, nodePortAddressesIPv6, svc1IPv6, externalIPv6, ep1IPv6, loadBalancerIPv6, binding.ProtocolUDPv6, true) }) } @@ -2689,9 +2824,10 @@ func TestMetrics(t *testing.T) { name string svcIP, ep1IP, ep2IP net.IP isIPv6 bool + bindingProtocol binding.Protocol }{ - {"IPv4", svc1IPv4, ep1IPv4, ep2IPv4, false}, - {"IPv6", svc1IPv6, ep1IPv6, ep2IPv6, true}, + {"IPv4", svc1IPv4, ep1IPv4, ep2IPv4, false, binding.ProtocolTCP}, + {"IPv6", svc1IPv6, ep1IPv6, ep2IPv6, true, binding.ProtocolTCPv6}, } { t.Run(tc.name, func(t *testing.T) { endpointsUpdateTotalMetric := metrics.EndpointsUpdatesTotal.CounterMetric @@ -2719,7 +2855,7 @@ func TestMetrics(t *testing.T) { assert.Equal(t, 2, int(v)) assert.NoError(t, err) - testClusterIPRemove(t, tc.svcIP, nil, tc.ep1IP, tc.isIPv6, false, false) + testClusterIPRemove(t, tc.svcIP, nil, tc.ep1IP, tc.bindingProtocol, tc.isIPv6, false, false) v, err = testutil.GetCounterMetricValue(endpointsUpdateTotalMetric) assert.NoError(t, err) diff --git a/pkg/agent/route/interfaces.go b/pkg/agent/route/interfaces.go index 4fcb7b5d5ec..30b86097bc3 100644 --- a/pkg/agent/route/interfaces.go +++ b/pkg/agent/route/interfaces.go @@ -87,4 +87,7 @@ type Interface interface { // DeleteRouteForLink deletes a route entry for a specific link. DeleteRouteForLink(dstCIDR *net.IPNet, linkIndex int) error + + // ClearConntrackEntryForService deletes a conntrack entry for a Service connection. + ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, endpointIP net.IP, protocol binding.Protocol) error } diff --git a/pkg/agent/route/route_linux.go b/pkg/agent/route/route_linux.go index 1e18ba6dbdb..f72ccf3bbe9 100644 --- a/pkg/agent/route/route_linux.go +++ b/pkg/agent/route/route_linux.go @@ -1591,6 +1591,44 @@ func (c *Client) DeleteRouteForLink(cidr *net.IPNet, linkIndex int) error { return nil } +func (c *Client) ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, endpointIP net.IP, protocol binding.Protocol) error { + var protoVar uint8 + var ipFamily netlink.InetFamily + switch protocol { + case binding.ProtocolTCP: + ipFamily = unix.AF_INET + protoVar = unix.IPPROTO_TCP + case binding.ProtocolTCPv6: + ipFamily = unix.AF_INET6 + protoVar = unix.IPPROTO_TCP + case binding.ProtocolUDP: + ipFamily = unix.AF_INET + protoVar = unix.IPPROTO_UDP + case binding.ProtocolUDPv6: + ipFamily = unix.AF_INET6 + protoVar = unix.IPPROTO_UDP + case binding.ProtocolSCTP: + ipFamily = unix.AF_INET + protoVar = unix.IPPROTO_SCTP + case binding.ProtocolSCTPv6: + ipFamily = unix.AF_INET6 + protoVar = unix.IPPROTO_SCTP + } + filter := &netlink.ConntrackFilter{} + filter.AddProtocol(protoVar) + if svcIP != nil { + filter.AddIP(netlink.ConntrackOrigDstIP, svcIP) + } + if svcPort != 0 { + filter.AddPort(netlink.ConntrackOrigDstPort, svcPort) + } + if endpointIP != nil { + filter.AddIP(netlink.ConntrackReplySrcIP, endpointIP) + } + _, err := c.netlink.ConntrackDeleteFilter(netlink.ConntrackTable, ipFamily, filter) + return err +} + func getTransProtocolStr(protocol binding.Protocol) string { if protocol == binding.ProtocolTCP || protocol == binding.ProtocolTCPv6 { return "tcp" diff --git a/pkg/agent/route/route_windows.go b/pkg/agent/route/route_windows.go index b3d4f037318..1ce4490e424 100644 --- a/pkg/agent/route/route_windows.go +++ b/pkg/agent/route/route_windows.go @@ -591,3 +591,7 @@ func (c *Client) AddRouteForLink(dstCIDR *net.IPNet, linkIndex int) error { func (c *Client) DeleteRouteForLink(dstCIDR *net.IPNet, linkIndex int) error { return errors.New("DeleteRouteForLink is not implemented on Windows") } + +func (c *Client) ClearConntrackEntryForService(svcIP net.IP, svcPort uint16, endpointIP net.IP, protocol binding.Protocol) error { + return nil +} diff --git a/pkg/agent/route/testing/mock_route.go b/pkg/agent/route/testing/mock_route.go index e7dc180aee4..177ee437347 100644 --- a/pkg/agent/route/testing/mock_route.go +++ b/pkg/agent/route/testing/mock_route.go @@ -134,6 +134,20 @@ func (mr *MockInterfaceMockRecorder) AddSNATRule(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddSNATRule", reflect.TypeOf((*MockInterface)(nil).AddSNATRule), arg0, arg1) } +// ClearConntrackEntryForService mocks base method +func (m *MockInterface) ClearConntrackEntryForService(arg0 net.IP, arg1 uint16, arg2 net.IP, arg3 openflow.Protocol) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ClearConntrackEntryForService", arg0, arg1, arg2, arg3) + ret0, _ := ret[0].(error) + return ret0 +} + +// ClearConntrackEntryForService indicates an expected call of ClearConntrackEntryForService +func (mr *MockInterfaceMockRecorder) ClearConntrackEntryForService(arg0, arg1, arg2, arg3 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearConntrackEntryForService", reflect.TypeOf((*MockInterface)(nil).ClearConntrackEntryForService), arg0, arg1, arg2, arg3) +} + // DeleteExternalIPRoute mocks base method func (m *MockInterface) DeleteExternalIPRoute(arg0 net.IP) error { m.ctrl.T.Helper() diff --git a/pkg/agent/util/netlink/netlink_linux.go b/pkg/agent/util/netlink/netlink_linux.go index 6938ba543ad..ac053fbedee 100644 --- a/pkg/agent/util/netlink/netlink_linux.go +++ b/pkg/agent/util/netlink/netlink_linux.go @@ -59,4 +59,6 @@ type Interface interface { LinkSetName(link netlink.Link, name string) error LinkSetUp(link netlink.Link) error + + ConntrackDeleteFilter(table netlink.ConntrackTableType, family netlink.InetFamily, filter netlink.CustomConntrackFilter) (uint, error) } diff --git a/pkg/agent/util/netlink/testing/mock_netlink_linux.go b/pkg/agent/util/netlink/testing/mock_netlink_linux.go index fde505fa945..f0aad5292f1 100644 --- a/pkg/agent/util/netlink/testing/mock_netlink_linux.go +++ b/pkg/agent/util/netlink/testing/mock_netlink_linux.go @@ -106,6 +106,21 @@ func (mr *MockInterfaceMockRecorder) AddrReplace(arg0, arg1 interface{}) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddrReplace", reflect.TypeOf((*MockInterface)(nil).AddrReplace), arg0, arg1) } +// ConntrackDeleteFilter mocks base method +func (m *MockInterface) ConntrackDeleteFilter(arg0 netlink.ConntrackTableType, arg1 netlink.InetFamily, arg2 netlink.CustomConntrackFilter) (uint, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ConntrackDeleteFilter", arg0, arg1, arg2) + ret0, _ := ret[0].(uint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ConntrackDeleteFilter indicates an expected call of ConntrackDeleteFilter +func (mr *MockInterfaceMockRecorder) ConntrackDeleteFilter(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ConntrackDeleteFilter", reflect.TypeOf((*MockInterface)(nil).ConntrackDeleteFilter), arg0, arg1, arg2) +} + // LinkByIndex mocks base method func (m *MockInterface) LinkByIndex(arg0 int) (netlink.Link, error) { m.ctrl.T.Helper()