From 8caa6ec7cdb92ddd008aa3da961d7675b7da66e3 Mon Sep 17 00:00:00 2001 From: Hongliang Liu Date: Thu, 11 May 2023 10:36:42 +0800 Subject: [PATCH] Add support for short-circuiting in AntreaProxy Short-circuiting is used to ensure that the traffic from Pod/Node clients to external addresses behaves the same way as the traffic from external clients to external addresses. External clients do not need to consider which Nodes have local Endpoints, as the load balancer handles this for them. However, for Pod/Node clients, when the externalTrafficPolicy of the Service is set to "Local", it will not work on Nodes without an Endpoint. With this PR, even when the externalTrafficPolicy is set to "Local", Pod/Node clients without local Endpoints can still work by selecting Endpoints from the cluster. Signed-off-by: Hongliang Liu --- pkg/agent/openflow/client.go | 11 +- pkg/agent/openflow/client_test.go | 47 ++- pkg/agent/openflow/pipeline.go | 21 +- pkg/agent/openflow/service.go | 9 + pkg/agent/openflow/testing/mock_openflow.go | 8 +- pkg/agent/proxy/proxier.go | 91 ++++-- pkg/agent/proxy/proxier_test.go | 324 +++++++++++--------- pkg/agent/proxy/topology_test.go | 16 +- test/e2e/proxy_test.go | 45 +-- test/integration/agent/openflow_test.go | 2 +- third_party/proxy/service.go | 8 +- 11 files changed, 342 insertions(+), 240 deletions(-) diff --git a/pkg/agent/openflow/client.go b/pkg/agent/openflow/client.go index 4ac5b819608..e2f6deaa8ce 100644 --- a/pkg/agent/openflow/client.go +++ b/pkg/agent/openflow/client.go @@ -100,9 +100,11 @@ type Client interface { // installs the flow that uses the group/bucket to do Service LB. If the affinityTimeout is not zero, it also // installs the flow which has a learn action to maintain the LB decision. The group with the groupID must be // installed before, otherwise the installation will fail. + // When externalAddress is set and groupID != clusterGroupID, it also installs the flow to implement short-circuiting + // for external Service IPs. // externalAddress indicates that whether the Service is externally accessible, like NodePort, LoadBalancer and ExternalIP. // nested, when setting to true, indicates the Service's Endpoints are ClusterIPs of other Services. - InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, externalAddress, nested bool) error + InstallServiceFlows(groupID, clusterGroupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, externalAddress, nested bool) error // UninstallServiceFlows removes flows installed by InstallServiceFlows. UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error @@ -752,18 +754,21 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoints []p return c.deleteFlowsWithMultipleKeys(c.featureService.cachedFlows, flowCacheKeys) } -func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, externalAddress, nested bool) error { +func (c *client) InstallServiceFlows(groupID, clusterGroupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, externalAddress, nested bool) error { c.replayMutex.RLock() defer c.replayMutex.RUnlock() var flows []binding.Flow nodePortAddress := svcIP.Equal(config.VirtualNodePortDNATIPv4) || svcIP.Equal(config.VirtualNodePortDNATIPv6) - flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, externalAddress, nodePortAddress, nested)) + flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, externalAddress, nodePortAddress, nested, false)) if affinityTimeout != 0 { flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, externalAddress, nodePortAddress)) } if !externalAddress && !nested { flows = append(flows, c.featureService.endpointRedirectFlowForServiceIP(svcIP, svcPort, protocol, groupID)) } + if externalAddress && groupID != clusterGroupID { + flows = append(flows, c.featureService.serviceLBFlow(clusterGroupID, svcIP, svcPort, protocol, affinityTimeout != 0, true, nodePortAddress, false, true)) + } cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol) return c.addFlows(c.featureService.cachedFlows, cacheKey, flows) } diff --git a/pkg/agent/openflow/client_test.go b/pkg/agent/openflow/client_test.go index 01f5f34c776..6b495a9ebbb 100644 --- a/pkg/agent/openflow/client_test.go +++ b/pkg/agent/openflow/client_test.go @@ -1059,12 +1059,15 @@ func Test_client_InstallEndpointFlows(t *testing.T) { func Test_client_InstallServiceFlows(t *testing.T) { groupID := binding.GroupIDType(100) + clusterGroupID := binding.GroupIDType(101) svcIPv4 := net.ParseIP("10.96.0.100") svcIPv6 := net.ParseIP("fec0:10:96::100") port := uint16(80) testCases := []struct { name string + groupID binding.GroupIDType + clusterGroupID binding.GroupIDType protocol binding.Protocol svcIP net.IP affinityTimeout uint16 @@ -1074,6 +1077,7 @@ func Test_client_InstallServiceFlows(t *testing.T) { }{ { name: "Service ClusterIP", + groupID: groupID, protocol: binding.ProtocolTCP, svcIP: svcIPv4, expectedFlows: []string{ @@ -1083,6 +1087,7 @@ func Test_client_InstallServiceFlows(t *testing.T) { }, { name: "Service ClusterIP, nested", + groupID: groupID, protocol: binding.ProtocolTCP, svcIP: svcIPv4, expectedFlows: []string{ @@ -1092,6 +1097,7 @@ func Test_client_InstallServiceFlows(t *testing.T) { }, { name: "Service ClusterIP,SessionAffinity", + groupID: groupID, protocol: binding.ProtocolTCP, svcIP: svcIPv4, affinityTimeout: uint16(100), @@ -1103,6 +1109,7 @@ func Test_client_InstallServiceFlows(t *testing.T) { }, { name: "Service ClusterIP,IPv6,SessionAffinity", + groupID: groupID, protocol: binding.ProtocolTCPv6, svcIP: svcIPv6, affinityTimeout: uint16(100), @@ -1113,6 +1120,8 @@ func Test_client_InstallServiceFlows(t *testing.T) { }, { name: "Service NodePort,SessionAffinity", + groupID: groupID, + clusterGroupID: groupID, protocol: binding.ProtocolUDP, svcIP: config.VirtualNodePortDNATIPv4, affinityTimeout: uint16(100), @@ -1124,6 +1133,8 @@ func Test_client_InstallServiceFlows(t *testing.T) { }, { name: "Service NodePort,IPv6,SessionAffinity", + groupID: groupID, + clusterGroupID: groupID, protocol: binding.ProtocolUDPv6, svcIP: config.VirtualNodePortDNATIPv6, affinityTimeout: uint16(100), @@ -1133,8 +1144,24 @@ func Test_client_InstallServiceFlows(t *testing.T) { "cookie=0x1030000000064, table=ServiceLB, priority=190,udp6,reg4=0xb0000/0xf0000,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,eth_type=0x86dd,nw_proto=0x11,OXM_OF_UDP_DST[],NXM_NX_IPV6_DST[],NXM_NX_IPV6_SRC[],load:NXM_NX_XXREG3[]->NXM_NX_XXREG3[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT", }, }, + { + name: "Service NodePort,SessionAffinity,Short-circuiting", + groupID: groupID, + clusterGroupID: clusterGroupID, + protocol: binding.ProtocolUDP, + svcIP: config.VirtualNodePortDNATIPv4, + affinityTimeout: uint16(100), + toExternalAddress: true, + expectedFlows: []string{ + "cookie=0x1030000000000, table=ServiceLB, priority=210,udp,reg4=0x90000/0xf0000,nw_src=10.10.0.0/24,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x65->reg7,group:101", + "cookie=0x1030000000000, table=ServiceLB, priority=200,udp,reg4=0x90000/0xf0000,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100", + "cookie=0x1030000000064, table=ServiceLB, priority=190,udp,reg4=0xb0000/0xf0000,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,eth_type=0x800,nw_proto=0x11,OXM_OF_UDP_DST[],NXM_OF_IP_DST[],NXM_OF_IP_SRC[],load:NXM_NX_REG3[]->NXM_NX_REG3[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT", + }, + }, { name: "Service LoadBalancer,SessionAffinity", + groupID: groupID, + clusterGroupID: groupID, protocol: binding.ProtocolSCTP, svcIP: svcIPv4, affinityTimeout: uint16(100), @@ -1146,6 +1173,8 @@ func Test_client_InstallServiceFlows(t *testing.T) { }, { name: "Service LoadBalancer,IPv6,SessionAffinity", + groupID: groupID, + clusterGroupID: groupID, protocol: binding.ProtocolSCTPv6, svcIP: svcIPv6, affinityTimeout: uint16(100), @@ -1155,6 +1184,20 @@ func Test_client_InstallServiceFlows(t *testing.T) { "cookie=0x1030000000064, table=ServiceLB, priority=190,sctp6,reg4=0x30000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,eth_type=0x86dd,nw_proto=0x84,OXM_OF_SCTP_DST[],NXM_NX_IPV6_DST[],NXM_NX_IPV6_SRC[],load:NXM_NX_XXREG3[]->NXM_NX_XXREG3[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT", }, }, + { + name: "Service LoadBalancer,SessionAffinity,Short-circuiting", + groupID: groupID, + clusterGroupID: clusterGroupID, + protocol: binding.ProtocolSCTP, + svcIP: svcIPv4, + affinityTimeout: uint16(100), + toExternalAddress: true, + expectedFlows: []string{ + "cookie=0x1030000000000, table=ServiceLB, priority=210,sctp,reg4=0x10000/0x70000,nw_src=10.10.0.0/24,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x65->reg7,group:101", + "cookie=0x1030000000000, table=ServiceLB, priority=200,sctp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100", + "cookie=0x1030000000064, table=ServiceLB, priority=190,sctp,reg4=0x30000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,eth_type=0x800,nw_proto=0x84,OXM_OF_SCTP_DST[],NXM_OF_IP_DST[],NXM_OF_IP_SRC[],load:NXM_NX_REG3[]->NXM_NX_REG3[],load:NXM_NX_REG4[0..15]->NXM_NX_REG4[0..15],load:0x2->NXM_NX_REG4[16..18],load:0x1->NXM_NX_REG0[9],load:0x1->NXM_NX_REG4[21]),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT", + }, + }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { @@ -1169,7 +1212,7 @@ func Test_client_InstallServiceFlows(t *testing.T) { cacheKey := generateServicePortFlowCacheKey(tc.svcIP, port, tc.protocol) - assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.toExternalAddress, tc.nested)) + assert.NoError(t, fc.InstallServiceFlows(tc.groupID, tc.clusterGroupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.toExternalAddress, tc.nested)) fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey) require.True(t, ok) assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI)) @@ -1198,7 +1241,7 @@ func Test_client_GetServiceFlowKeys(t *testing.T) { proxy.NewBaseEndpointInfo("10.10.0.12", "", "", 80, true, true, false, false, nil), } - assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, false)) + assert.NoError(t, fc.InstallServiceFlows(groupID, groupID, svcIP, svcPort, bindingProtocol, 100, true, false)) assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints)) flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints) expectedFlowKeys := []string{ diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index c20bc0d5788..971bb94221a 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -2377,11 +2377,22 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType, withSessionAffinity bool, externalAddress bool, nodePortAddress bool, - nested bool) binding.Flow { - flowBuilder := ServiceLBTable.ofTable.BuildFlow(priorityNormal). - Cookie(f.cookieAllocator.Request(f.category).Raw()). - MatchProtocol(protocol). - MatchDstPort(svcPort, nil) + nested bool, + isShortCircuiting bool) binding.Flow { + var flowBuilder binding.FlowBuilder + if isShortCircuiting { + // For short-circuiting flow, an extra match condition matching packet from local Pod CIDR is added. + flowBuilder = ServiceLBTable.ofTable.BuildFlow(priorityHigh). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(protocol). + MatchDstPort(svcPort, nil). + MatchSrcIPNet(f.localCIDRs[getIPProtocol(svcIP)]) + } else { + flowBuilder = ServiceLBTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchProtocol(protocol). + MatchDstPort(svcPort, nil) + } // EpToSelectRegMark is required to match the packets that haven't undergone Endpoint selection yet. regMarksToMatch := []*binding.RegMark{EpToSelectRegMark} diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 2284672a8c0..97a3a2460e1 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -41,6 +41,7 @@ type featureService struct { gatewayMAC net.HardwareAddr nodePortAddresses map[binding.Protocol][]net.IP serviceCIDRs map[binding.Protocol]net.IPNet + localCIDRs map[binding.Protocol]net.IPNet networkConfig *config.NetworkConfig gatewayPort uint32 @@ -75,6 +76,7 @@ func newFeatureService( snatCtZones := make(map[binding.Protocol]int) nodePortAddresses := make(map[binding.Protocol][]net.IP) serviceCIDRs := make(map[binding.Protocol]net.IPNet) + localCIDRs := make(map[binding.Protocol]net.IPNet) for _, ipProtocol := range ipProtocols { if ipProtocol == binding.ProtocolIP { gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv4 @@ -86,6 +88,9 @@ func newFeatureService( if serviceConfig.ServiceCIDR != nil { serviceCIDRs[ipProtocol] = *serviceConfig.ServiceCIDR } + if nodeConfig.PodIPv4CIDR != nil { + localCIDRs[ipProtocol] = *nodeConfig.PodIPv4CIDR + } } else if ipProtocol == binding.ProtocolIPv6 { gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv6 virtualIPs[ipProtocol] = config.VirtualServiceIPv6 @@ -96,6 +101,9 @@ func newFeatureService( if serviceConfig.ServiceCIDRv6 != nil { serviceCIDRs[ipProtocol] = *serviceConfig.ServiceCIDRv6 } + if nodeConfig.PodIPv6CIDR != nil { + localCIDRs[ipProtocol] = *nodeConfig.PodIPv6CIDR + } } } @@ -112,6 +120,7 @@ func newFeatureService( snatCtZones: snatCtZones, nodePortAddresses: nodePortAddresses, serviceCIDRs: serviceCIDRs, + localCIDRs: localCIDRs, gatewayMAC: nodeConfig.GatewayConfig.MAC, gatewayPort: nodeConfig.GatewayConfig.OFPort, networkConfig: networkConfig, diff --git a/pkg/agent/openflow/testing/mock_openflow.go b/pkg/agent/openflow/testing/mock_openflow.go index f4b6d8bdd78..526ee2e4b4f 100644 --- a/pkg/agent/openflow/testing/mock_openflow.go +++ b/pkg/agent/openflow/testing/mock_openflow.go @@ -466,17 +466,17 @@ func (mr *MockClientMockRecorder) InstallSNATMarkFlows(arg0, arg1 interface{}) * } // InstallServiceFlows mocks base method -func (m *MockClient) InstallServiceFlows(arg0 openflow.GroupIDType, arg1 net.IP, arg2 uint16, arg3 openflow.Protocol, arg4 uint16, arg5, arg6 bool) error { +func (m *MockClient) InstallServiceFlows(arg0, arg1 openflow.GroupIDType, arg2 net.IP, arg3 uint16, arg4 openflow.Protocol, arg5 uint16, arg6, arg7 bool) error { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "InstallServiceFlows", arg0, arg1, arg2, arg3, arg4, arg5, arg6) + ret := m.ctrl.Call(m, "InstallServiceFlows", arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) ret0, _ := ret[0].(error) return ret0 } // InstallServiceFlows indicates an expected call of InstallServiceFlows -func (mr *MockClientMockRecorder) InstallServiceFlows(arg0, arg1, arg2, arg3, arg4, arg5, arg6 interface{}) *gomock.Call { +func (mr *MockClientMockRecorder) InstallServiceFlows(arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceFlows", reflect.TypeOf((*MockClient)(nil).InstallServiceFlows), arg0, arg1, arg2, arg3, arg4, arg5, arg6) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "InstallServiceFlows", reflect.TypeOf((*MockClient)(nil).InstallServiceFlows), arg0, arg1, arg2, arg3, arg4, arg5, arg6, arg7) } // InstallServiceGroup mocks base method diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 2db792677cb..0c22a0025ae 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -184,27 +184,29 @@ func (p *proxier) removeStaleServices() { func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool { svcInfoStr := svcInfo.String() + svcPort := uint16(svcInfo.Port()) + svcProto := svcInfo.OFProtocol // Remove ClusterIP flows. - if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { + if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), svcPort, svcProto); err != nil { klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceInfo", svcInfoStr) return false } if p.proxyAll { // Remove NodePort flows and configurations. - if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcInfo.OFProtocol); err != nil { + if err := p.uninstallNodePortService(uint16(svcInfo.NodePort()), svcProto); err != nil { klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } // Remove ExternalIP flows and configurations. - if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { + if err := p.uninstallExternalIPService(svcInfoStr, svcInfo.ExternalIPStrings(), svcPort, svcProto); err != nil { klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } } // Remove LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs { - if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol); err != nil { + if err := p.uninstallLoadBalancerService(svcInfoStr, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } @@ -357,7 +359,7 @@ func smallSliceDifference(s1, s2 []string) []string { return diff } -func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { +func (p *proxier) installNodePortService(externalGroupID, clusterGroupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { if svcPort == 0 { return nil } @@ -365,7 +367,7 @@ func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort ui if p.isIPv6 { svcIP = agentconfig.VirtualNodePortDNATIPv6 } - if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, true, false); err != nil { + if err := p.ofClient.InstallServiceFlows(externalGroupID, clusterGroupID, svcIP, svcPort, protocol, affinityTimeout, true, false); err != nil { return fmt.Errorf("failed to install NodePort load balancing flows: %w", err) } if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil { @@ -391,10 +393,10 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot return nil } -func (p *proxier) installExternalIPService(svcInfoStr string, groupID binding.GroupIDType, externalIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { +func (p *proxier) installExternalIPService(svcInfoStr string, externalGroupID, clusterGroupID binding.GroupIDType, externalIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { for _, externalIP := range externalIPStrings { ip := net.ParseIP(externalIP) - if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, true, false); err != nil { + if err := p.ofClient.InstallServiceFlows(externalGroupID, clusterGroupID, ip, svcPort, protocol, affinityTimeout, true, false); err != nil { return fmt.Errorf("failed to install ExternalIP load balancing flows: %w", err) } if err := p.addRouteForServiceIP(svcInfoStr, ip, p.routeClient.AddExternalIPRoute); err != nil { @@ -417,11 +419,11 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString return nil } -func (p *proxier) installLoadBalancerService(svcInfoStr string, groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { +func (p *proxier) installLoadBalancerService(svcInfoStr string, externalGroupID, clusterGroupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error { for _, ingress := range loadBalancerIPStrings { if ingress != "" { ip := net.ParseIP(ingress) - if err := p.ofClient.InstallServiceFlows(groupID, ip, svcPort, protocol, affinityTimeout, true, false); err != nil { + if err := p.ofClient.InstallServiceFlows(externalGroupID, clusterGroupID, ip, svcPort, protocol, affinityTimeout, true, false); err != nil { return fmt.Errorf("failed to install LoadBalancer load balancing flows: %w", err) } if p.proxyAll { @@ -539,27 +541,41 @@ func (p *proxier) installServices() { } withSessionAffinity := svcInfo.SessionAffinityType() == corev1.ServiceAffinityClientIP - var internalGroupID, externalGroupID binding.GroupIDType + internalPolicyLocal := svcInfo.InternalPolicyLocal() + externalPolicyLocal := svcInfo.ExternalPolicyLocal() + var internalGroupID, externalGroupID, clusterGroupID binding.GroupIDType // Ensure a group for internal traffic exist. - if internalGroupID, ok = p.installServiceGroup(svcPortName, needUpdateEndpoints, svcInfo.InternalPolicyLocal(), withSessionAffinity, localEndpoints, clusterEndpoints); !ok { + if internalGroupID, ok = p.installServiceGroup(svcPortName, needUpdateEndpoints, internalPolicyLocal, withSessionAffinity, localEndpoints, clusterEndpoints); !ok { continue } // Ensure a group for external traffic exist if it's externally accessible, and remove the unneeded group. if svcInfo.ExternallyAccessible() { - if svcInfo.ExternalPolicyLocal() != svcInfo.InternalPolicyLocal() { - if externalGroupID, ok = p.installServiceGroup(svcPortName, needUpdateEndpoints, svcInfo.ExternalPolicyLocal(), withSessionAffinity, localEndpoints, clusterEndpoints); !ok { + if externalPolicyLocal != internalPolicyLocal { + if externalGroupID, ok = p.installServiceGroup(svcPortName, needUpdateEndpoints, externalPolicyLocal, withSessionAffinity, localEndpoints, clusterEndpoints); !ok { continue } + if externalPolicyLocal { + clusterGroupID = internalGroupID + } else { + clusterGroupID = externalGroupID + } } else { externalGroupID = internalGroupID - // Ensure the other group is removed as ExternalTrafficPolicy is the same as InternalTrafficPolicy. - if !p.removeServiceGroup(svcPortName, !svcInfo.InternalPolicyLocal()) { - continue + if externalPolicyLocal { + if clusterGroupID, ok = p.installServiceGroup(svcPortName, needUpdateEndpoints, false, withSessionAffinity, nil, clusterEndpoints); !ok { + continue + } + } else { + // Ensure the other group is removed as ExternalTrafficPolicy is the same as InternalTrafficPolicy. + if !p.removeServiceGroup(svcPortName, !internalPolicyLocal) { + continue + } + clusterGroupID = externalGroupID } } } else { // Ensure the other group is removed as we only need a group for internal traffic. - if !p.removeServiceGroup(svcPortName, !svcInfo.InternalPolicyLocal()) { + if !p.removeServiceGroup(svcPortName, !internalPolicyLocal) { continue } } @@ -571,11 +587,11 @@ func (p *proxier) installServices() { continue } } - if !p.installServiceFlows(svcInfo, internalGroupID, externalGroupID) { + if !p.installServiceFlows(svcInfo, internalGroupID, externalGroupID, clusterGroupID) { continue } } else if needUpdateServiceExternalAddresses { - if !p.updateServiceExternalAddresses(pSvcInfo, svcInfo, externalGroupID) { + if !p.updateServiceExternalAddresses(pSvcInfo, svcInfo, externalGroupID, clusterGroupID) { continue } } @@ -606,8 +622,10 @@ func getAffinityTimeout(svcInfo *types.ServiceInfo) uint16 { return uint16(affinityTimeout) } -func (p *proxier) installServiceFlows(svcInfo *types.ServiceInfo, internalGroupID, externalGroupID binding.GroupIDType) bool { +func (p *proxier) installServiceFlows(svcInfo *types.ServiceInfo, internalGroupID, externalGroupID, clusterGroupID binding.GroupIDType) bool { svcInfoStr := svcInfo.String() + svcPort := uint16(svcInfo.Port()) + svcProto := svcInfo.OFProtocol affinityTimeout := getAffinityTimeout(svcInfo) var isNestedService bool @@ -618,24 +636,25 @@ func (p *proxier) installServiceFlows(svcInfo *types.ServiceInfo, internalGroupI } // Install ClusterIP flows. - if err := p.ofClient.InstallServiceFlows(internalGroupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout, false, isNestedService); err != nil { + if err := p.ofClient.InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcInfo.ClusterIP(), svcPort, svcProto, affinityTimeout, false, isNestedService); err != nil { klog.ErrorS(err, "Error when installing ClusterIP flows for Service", "ServiceInfo", svcInfoStr) return false } if p.proxyAll { - if err := p.installNodePortService(externalGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, affinityTimeout); err != nil { + // Install NodePort flows and configurations. + if err := p.installNodePortService(externalGroupID, clusterGroupID, uint16(svcInfo.NodePort()), svcProto, affinityTimeout); err != nil { klog.ErrorS(err, "Error when installing NodePort flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } // Install ExternalIP flows and configurations. - if err := p.installExternalIPService(svcInfoStr, externalGroupID, svcInfo.ExternalIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout); err != nil { + if err := p.installExternalIPService(svcInfoStr, externalGroupID, clusterGroupID, svcInfo.ExternalIPStrings(), svcPort, svcProto, affinityTimeout); err != nil { klog.ErrorS(err, "Error when installing ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } } // Install LoadBalancer flows and configurations. if p.proxyLoadBalancerIPs { - if err := p.installLoadBalancerService(svcInfoStr, externalGroupID, svcInfo.LoadBalancerIPStrings(), uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout); err != nil { + if err := p.installLoadBalancerService(svcInfoStr, externalGroupID, clusterGroupID, svcInfo.LoadBalancerIPStrings(), svcPort, svcProto, affinityTimeout); err != nil { klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } @@ -643,29 +662,35 @@ func (p *proxier) installServiceFlows(svcInfo *types.ServiceInfo, internalGroupI return true } -func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.ServiceInfo, externalGroupID binding.GroupIDType) bool { +func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.ServiceInfo, externalGroupID, clusterGroupID binding.GroupIDType) bool { pSvcInfoStr := pSvcInfo.String() svcInfoStr := svcInfo.String() + pSvcPort := uint16(pSvcInfo.Port()) + svcPort := uint16(svcInfo.Port()) + pSvcNodePort := uint16(pSvcInfo.NodePort()) + svcNodePort := uint16(svcInfo.NodePort()) + pSvcProto := pSvcInfo.OFProtocol + svcProto := svcInfo.OFProtocol affinityTimeout := getAffinityTimeout(svcInfo) if p.proxyAll { - if pSvcInfo.NodePort() != svcInfo.NodePort() { - if err := p.uninstallNodePortService(uint16(pSvcInfo.NodePort()), pSvcInfo.OFProtocol); err != nil { + if pSvcNodePort != svcNodePort { + if err := p.uninstallNodePortService(pSvcNodePort, pSvcProto); err != nil { klog.ErrorS(err, "Error when uninstalling NodePort flows and configurations for Service", "ServiceInfo", pSvcInfoStr) return false } - if err := p.installNodePortService(externalGroupID, uint16(svcInfo.NodePort()), svcInfo.OFProtocol, affinityTimeout); err != nil { + if err := p.installNodePortService(externalGroupID, clusterGroupID, svcNodePort, svcProto, affinityTimeout); err != nil { klog.ErrorS(err, "Error when installing NodePort flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } } deletedExternalIPs := smallSliceDifference(pSvcInfo.ExternalIPStrings(), svcInfo.ExternalIPStrings()) addedExternalIPs := smallSliceDifference(svcInfo.ExternalIPStrings(), pSvcInfo.ExternalIPStrings()) - if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { + if err := p.uninstallExternalIPService(pSvcInfoStr, deletedExternalIPs, pSvcPort, pSvcProto); err != nil { klog.ErrorS(err, "Error when uninstalling ExternalIP flows and configurations for Service", "ServiceInfo", pSvcInfoStr) return false } - if err := p.installExternalIPService(svcInfoStr, externalGroupID, addedExternalIPs, uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout); err != nil { + if err := p.installExternalIPService(svcInfoStr, externalGroupID, clusterGroupID, addedExternalIPs, svcPort, svcProto, affinityTimeout); err != nil { klog.ErrorS(err, "Error when installing ExternalIP flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } @@ -673,11 +698,11 @@ func (p *proxier) updateServiceExternalAddresses(pSvcInfo, svcInfo *types.Servic if p.proxyLoadBalancerIPs { deletedLoadBalancerIPs := smallSliceDifference(pSvcInfo.LoadBalancerIPStrings(), svcInfo.LoadBalancerIPStrings()) addedLoadBalancerIPs := smallSliceDifference(svcInfo.LoadBalancerIPStrings(), pSvcInfo.LoadBalancerIPStrings()) - if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, uint16(pSvcInfo.Port()), pSvcInfo.OFProtocol); err != nil { + if err := p.uninstallLoadBalancerService(pSvcInfoStr, deletedLoadBalancerIPs, pSvcPort, pSvcProto); err != nil { klog.ErrorS(err, "Error when uninstalling LoadBalancer flows and configurations for Service", "ServiceInfo", pSvcInfoStr) return false } - if err := p.installLoadBalancerService(svcInfoStr, externalGroupID, addedLoadBalancerIPs, uint16(svcInfo.Port()), svcInfo.OFProtocol, affinityTimeout); err != nil { + if err := p.installLoadBalancerService(svcInfoStr, externalGroupID, clusterGroupID, addedLoadBalancerIPs, svcPort, svcProto, affinityTimeout); err != nil { klog.ErrorS(err, "Error when installing LoadBalancer flows and configurations for Service", "ServiceInfo", svcInfoStr) return false } diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 920d5ba1d52..0693dfa0415 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -438,24 +438,26 @@ func testClusterIPAdd(t *testing.T, if isIPv6 { bindingProtocol = binding.ProtocolTCPv6 } - + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) + var externalGroupID, clusterGroupID binding.GroupIDType if nodeLocalInternal == false { - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + externalGroupID = internalGroupID + clusterGroupID = internalGroupID + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } } else { - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedLocalEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.InAnyOrder(expectedLocalEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) if externalIP != nil { - groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, false) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + externalGroupID = fp.groupCounter.AllocateIfNotExist(svcPortName, false) + clusterGroupID = externalGroupID + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } } if externalIP != nil { @@ -533,10 +535,7 @@ func testLoadBalancerAdd(t *testing.T, serving = true } expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), nodeName, "", svcPort, true, true, serving, false, nil)} - expectedAllEps := expectedLocalEps - if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, serving, false, nil)) - } + expectedAllEps := append(expectedLocalEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, serving, false, nil)) bindingProtocol := binding.ProtocolTCP vIP := agentconfig.VirtualNodePortDNATIPv4 @@ -555,32 +554,45 @@ func testLoadBalancerAdd(t *testing.T, clusterIPEps = expectedAllEps nodePortEps = expectedLocalEps } - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(clusterIPEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalExternal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(nodePortEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) + externalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalExternal) + var clusterGroupID binding.GroupIDType + if nodeLocalInternal { + clusterGroupID = externalGroupID + } else { + clusterGroupID = internalGroupID + } + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.InAnyOrder(clusterIPEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.InAnyOrder(nodePortEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) if proxyLoadBalancerIPs { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } } else { nodeLocalVal := nodeLocalInternal && nodeLocalExternal groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalVal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + var clusterGroupID binding.GroupIDType + if nodeLocalVal { + clusterGroupID = fp.groupCounter.AllocateIfNotExist(svcPortName, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedLocalEps)).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(clusterGroupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) + } else { + clusterGroupID = groupID + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) + mockOFClient.EXPECT().UninstallServiceGroup(fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)).Times(1) + } + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, clusterGroupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) if proxyLoadBalancerIPs { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, clusterGroupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } - groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) } if proxyLoadBalancerIPs { mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) @@ -656,10 +668,7 @@ func testNodePortAdd(t *testing.T, serving = true } expectedLocalEps := []k8sproxy.Endpoint{k8sproxy.NewBaseEndpointInfo(ep2IP.String(), nodeName, "", svcPort, true, true, serving, false, nil)} - expectedAllEps := expectedLocalEps - if !(nodeLocalInternal && nodeLocalExternal) { - expectedAllEps = append(expectedAllEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, serving, false, nil)) - } + expectedAllEps := append(expectedLocalEps, k8sproxy.NewBaseEndpointInfo(ep1IP.String(), "", "", svcPort, false, true, serving, false, nil)) bindingProtocol := binding.ProtocolTCP vIP := agentconfig.VirtualNodePortDNATIPv4 @@ -678,27 +687,40 @@ func testNodePortAdd(t *testing.T, clusterIPEps = expectedAllEps nodePortEps = expectedLocalEps } - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(clusterIPEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) + externalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalExternal) + var clusterGroupID binding.GroupIDType + if nodeLocalInternal { + clusterGroupID = externalGroupID + } else { + clusterGroupID = internalGroupID + } - groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalExternal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(nodePortEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.InAnyOrder(clusterIPEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.InAnyOrder(nodePortEps)).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } } else { nodeLocalVal := nodeLocalInternal && nodeLocalExternal groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalVal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + var clusterGroupID binding.GroupIDType + if nodeLocalVal { + clusterGroupID = fp.groupCounter.AllocateIfNotExist(svcPortName, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedLocalEps)).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(clusterGroupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) + } else { + clusterGroupID = groupID + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) + mockOFClient.EXPECT().UninstallServiceGroup(fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal)).Times(1) + } + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, clusterGroupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } - groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, !nodeLocalVal) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) } mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { @@ -936,18 +958,18 @@ func TestLoadBalancerServiceWithMultiplePorts(t *testing.T) { mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort80}).Times(1) mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort80, remoteEndpointForPort80})).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), svc1IPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30001Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), loadBalancerIPv4, uint16(port80Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30001Int32), binding.ProtocolTCP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIPv4).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, []k8sproxy.Endpoint{localEndpointForPort443}).Times(1) mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), false, gomock.InAnyOrder([]k8sproxy.Endpoint{localEndpointForPort443, remoteEndpointForPort443})).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), svc1IPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), agentconfig.VirtualNodePortDNATIPv4, uint16(port30002Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), loadBalancerIPv4, uint16(port443Int32), binding.ProtocolTCP, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(port30002Int32), binding.ProtocolTCP).Times(1) fp.syncProxyRules() @@ -1101,11 +1123,11 @@ func TestDualStackService(t *testing.T) { mockOFClient.EXPECT().InstallServiceGroup(groupIDv4, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDv4, svc1IPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDv4, binding.GroupIDType(0), svc1IPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0), false, false).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDv6, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCPv6, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDv6, svc1IPv6, uint16(svcPort), binding.ProtocolTCPv6, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDv6, binding.GroupIDType(0), svc1IPv6, uint16(svcPort), binding.ProtocolTCPv6, uint16(0), false, false).Times(1) fpv4.syncProxyRules() fpv6.syncProxyRules() @@ -1147,31 +1169,34 @@ func testClusterIPRemove(t *testing.T, svcIP, externalIP, epIP net.IP, isIPv6 bo bindingProtocol = binding.ProtocolTCPv6 } + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) + var externalGroupID, clusterGroupID binding.GroupIDType if nodeLocalInternal == false { - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + externalGroupID = internalGroupID + clusterGroupID = internalGroupID + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) } } else { - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, nodeLocalInternal) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, true).Times(1) + mockOFClient.EXPECT().UninstallServiceGroup(internalGroupID).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) if externalIP != nil { - groupID = fp.groupCounter.AllocateIfNotExist(svcPortName, false) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + externalGroupID = fp.groupCounter.AllocateIfNotExist(svcPortName, false) + clusterGroupID = externalGroupID + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) - mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) + mockOFClient.EXPECT().UninstallServiceGroup(externalGroupID).Times(1) mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(externalIP, uint16(svcPort), bindingProtocol).Times(1) } @@ -1240,15 +1265,16 @@ func testNodePortRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, externa } mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) - mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + externalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + clusterGroupID := internalGroupID + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) } @@ -1323,18 +1349,18 @@ func testLoadBalancerRemove(t *testing.T, nodePortAddresses []net.IP, svcIP, ext } mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) - groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) - + externalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + clusterGroupID := internalGroupID + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) if externalIP != nil { - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, externalIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(externalIP) } @@ -1457,12 +1483,12 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, []k8sproxy.Endpoint{}).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, false).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() } @@ -1512,16 +1538,16 @@ func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net. groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) mockOFClient.EXPECT().InstallServiceGroup(groupIDCluster, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, binding.GroupIDType(0), svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, groupIDCluster, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, binding.GroupIDType(0), svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, groupIDCluster, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() @@ -1573,13 +1599,14 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP vIP = agentconfig.VirtualNodePortDNATIPv6 } - groupIDCluster := fp.groupCounter.AllocateIfNotExist(svcPortName, false) - groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) - mockOFClient.EXPECT().InstallServiceGroup(groupIDCluster, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), gomock.Any(), uint16(0), true, false).Times(1) + internalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + externalGroupID := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + clusterGroupID := internalGroupID + mockOFClient.EXPECT().InstallServiceGroup(internalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(externalGroupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, loadBalancerIP, uint16(svcPort), gomock.Any(), uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) fp.syncProxyRules() @@ -1589,9 +1616,9 @@ func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(internalGroupID, binding.GroupIDType(0), svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(externalGroupID, clusterGroupID, loadBalancerIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) @@ -1640,8 +1667,8 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), protocolTCP, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, svcIP, uint16(svcPort), protocolUDP, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), protocolTCP, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, binding.GroupIDType(0), svcIP, uint16(svcPort), protocolUDP, uint16(0), false, false).Times(1) fp.syncProxyRules() mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) @@ -1685,7 +1712,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -1753,7 +1780,7 @@ func testSessionAffinity(t *testing.T, svcIP net.IP, epIP net.IP, affinitySecond } else { expectedAffinity = uint16(affinitySeconds) } - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, expectedAffinity, false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, expectedAffinity, false, false).Times(1) fp.syncProxyRules() } @@ -1805,7 +1832,7 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, []k8sproxy.Endpoint{}).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), gomock.Any(), uint16(10800), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), gomock.Any(), uint16(10800), false, false).Times(1) fp.syncProxyRules() } @@ -1861,27 +1888,27 @@ func testServiceClusterIPUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, updatedSvcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), updatedSvcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) s2.After(s1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) } @@ -1962,27 +1989,27 @@ func testServicePortUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort+1), bindingProtocol, uint16(0), false, false).Times(1) + s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort+1), bindingProtocol, uint16(0), false, false).Times(1) s2.After(s1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) s1 = mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) - s2 = mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort+1), bindingProtocol, uint16(0), true, false).Times(1) + s2 = mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort+1), bindingProtocol, uint16(0), true, false).Times(1) s2.After(s1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) @@ -2061,20 +2088,20 @@ func testServiceNodePortUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort+1), bindingProtocol, uint16(0), true, false).Times(1) + s2 := mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort+1), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort+1), bindingProtocol).Times(1) s2.After(s1) } if svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) } @@ -2152,14 +2179,14 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) } fp.syncProxyRules() @@ -2170,14 +2197,13 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, expectedLocalEps).Times(1) - s1 := mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) - s2 := mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + s2 := mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) s2.After(s1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) @@ -2185,7 +2211,7 @@ func testServiceExternalTrafficPolicyUpdate(t *testing.T, } if svcType == corev1.ServiceTypeLoadBalancer { s1 := mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) - s2 := mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + s2 := mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) s2.After(s1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) @@ -2253,7 +2279,7 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedAllEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedAllEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) assert.Contains(t, fp.endpointsInstalledMap, svcPortName) @@ -2277,7 +2303,7 @@ func testServiceInternalTrafficPolicyUpdate(t *testing.T, mockOFClient.EXPECT().UninstallServiceGroup(groupID).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, expectedLocalEps).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) @@ -2339,10 +2365,10 @@ func testServiceIngressIPsUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.InAnyOrder(expectedEps)).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.InAnyOrder(expectedEps)).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, binding.GroupIDType(0), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) for _, ip := range loadBalancerIPs { - mockOFClient.EXPECT().InstallServiceFlows(groupID, ip, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), ip, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) } mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) for _, ip := range loadBalancerIPs { @@ -2356,7 +2382,7 @@ func testServiceIngressIPsUpdate(t *testing.T, mockRouteClient.EXPECT().DeleteExternalIPRoute(net.ParseIP(ipStr)).Times(1) } for _, ipStr := range toAddLoadBalancerIPs { - mockOFClient.EXPECT().InstallServiceFlows(groupID, net.ParseIP(ipStr), uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), net.ParseIP(ipStr), uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(net.ParseIP(ipStr)).Times(1) } @@ -2430,24 +2456,24 @@ func testServiceStickyMaxAgeSecondsUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, expectedEps).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, false).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), false, false).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(updatedAffinitySeconds), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(updatedAffinitySeconds), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(updatedAffinitySeconds), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) } @@ -2528,27 +2554,27 @@ func testServiceSessionAffinityTypeUpdate(t *testing.T, groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, expectedEps).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, expectedEps).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) mockOFClient.EXPECT().InstallServiceGroup(groupID, true, expectedEps).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), svcIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), false, false).Times(1) if svcType == corev1.ServiceTypeNodePort || svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), bindingProtocol).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID, vIP, uint16(svcNodePort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), vIP, uint16(svcNodePort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), bindingProtocol).Times(1) } if svcType == corev1.ServiceTypeLoadBalancer { - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(0), true, false).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), bindingProtocol) - mockOFClient.EXPECT().InstallServiceFlows(groupID, loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, gomock.Any(), loadBalancerIP, uint16(svcPort), bindingProtocol, uint16(affinitySeconds), true, false).Times(1) mockRouteClient.EXPECT().DeleteExternalIPRoute(loadBalancerIP).Times(1) mockRouteClient.EXPECT().AddExternalIPRoute(loadBalancerIP).Times(1) } @@ -2611,8 +2637,8 @@ func TestServicesWithSameEndpoints(t *testing.T) { mockOFClient.EXPECT().InstallServiceGroup(groupID2, false, gomock.Any()).Times(1) bindingProtocol := binding.ProtocolTCP mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID1, svc1IPv4, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(groupID2, svc2IPv4, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID1, binding.GroupIDType(0), svc1IPv4, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID2, binding.GroupIDType(0), svc2IPv4, uint16(svcPort), bindingProtocol, uint16(0), false, false).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svc1IPv4, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceFlows(svc2IPv4, uint16(svcPort), bindingProtocol).Times(1) mockOFClient.EXPECT().UninstallServiceGroup(groupID1).Times(1) @@ -2764,8 +2790,8 @@ func TestGetServiceFlowKeys(t *testing.T) { mockRouteClient.EXPECT().AddNodePort(nodePortAddressesIPv4, uint16(svcNodePort), binding.ProtocolTCP).Times(1) mockOFClient.EXPECT().InstallServiceGroup(gomock.Any(), gomock.Any(), gomock.Any()).Times(2) mockOFClient.EXPECT().InstallEndpointFlows(binding.ProtocolTCP, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), uint16(svcNodePort), binding.ProtocolTCP, uint16(0), true, false).Times(1) - mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), svc1IPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0), false, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), gomock.Any(), uint16(svcNodePort), binding.ProtocolTCP, uint16(0), true, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(gomock.Any(), gomock.Any(), svc1IPv4, uint16(svcPort), binding.ProtocolTCP, uint16(0), false, false).Times(1) fp.syncProxyRules() } diff --git a/pkg/agent/proxy/topology_test.go b/pkg/agent/proxy/topology_test.go index 7158585cd2d..afa90fb6833 100644 --- a/pkg/agent/proxy/topology_test.go +++ b/pkg/agent/proxy/topology_test.go @@ -412,9 +412,9 @@ func TestCategorizeEndpoints(t *testing.T) { "10.0.0.0:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: true}, "10.0.0.1:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, }, - clusterEndpoints: nil, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), localEndpoints: sets.NewString("10.0.0.0:80"), - allEndpoints: sets.NewString("10.0.0.0:80"), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Local, eTP: Local, all endpoints remote", @@ -423,9 +423,9 @@ func TestCategorizeEndpoints(t *testing.T) { "10.0.0.0:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: true, IsLocal: false}, "10.0.0.1:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: true, IsLocal: false}, }, - clusterEndpoints: nil, + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), localEndpoints: sets.NewString(), - allEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Local, eTP: Local, all endpoints remote and terminating", @@ -434,9 +434,9 @@ func TestCategorizeEndpoints(t *testing.T) { "10.0.0.0:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, "10.0.0.1:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, }, - clusterEndpoints: sets.NewString(), + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), localEndpoints: sets.NewString(), - allEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Local, eTP: Local, all endpoints remote and terminating", @@ -445,9 +445,9 @@ func TestCategorizeEndpoints(t *testing.T) { "10.0.0.0:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.0:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, "10.0.0.1:80": &k8sproxy.BaseEndpointInfo{Endpoint: "10.0.0.1:80", Ready: false, Serving: true, Terminating: true, IsLocal: false}, }, - clusterEndpoints: sets.NewString(), + clusterEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), localEndpoints: sets.NewString(), - allEndpoints: sets.NewString(), + allEndpoints: sets.NewString("10.0.0.0:80", "10.0.0.1:80"), }, { name: "iTP: Cluster, eTP: Local, with terminating endpoints", diff --git a/test/e2e/proxy_test.go b/test/e2e/proxy_test.go index f2adcb1a2e3..d732997e13b 100644 --- a/test/e2e/proxy_test.go +++ b/test/e2e/proxy_test.go @@ -249,10 +249,10 @@ func loadBalancerTestCases(t *testing.T, data *TestData, clusterUrl, localUrl, h testLoadBalancerClusterFromPod(t, data, pods, clusterUrl) }) t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) { - testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl, hostnames) + testLoadBalancerLocalFromNode(t, data, nodes, healthUrls, healthExpected, localUrl) }) t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) { - testLoadBalancerLocalFromPod(t, data, pods, localUrl, podIPs, hostnames) + testLoadBalancerLocalFromPod(t, data, pods, localUrl) }) } @@ -269,12 +269,10 @@ func testLoadBalancerClusterFromPod(t *testing.T, data *TestData, pods []string, } } -func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string, expectedHostnames []string) { +func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUrls []string, healthExpected, url string) { skipIfKubeProxyEnabled(t, data) - for idx, node := range nodes { - hostname, err := probeHostnameFromNode(node, url, data) - require.NoError(t, err, "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node") - require.Equal(t, hostname, expectedHostnames[idx]) + for _, node := range nodes { + require.NoError(t, probeFromNode(node, url, data), "Service LoadBalancer whose externalTrafficPolicy is Local should be able to be connected from Node") for _, healthUrl := range healthUrls { healthOutput, _, err := probeHealthFromNode(node, healthUrl, data) @@ -284,16 +282,10 @@ func testLoadBalancerLocalFromNode(t *testing.T, data *TestData, nodes, healthUr } } -func testLoadBalancerLocalFromPod(t *testing.T, data *TestData, pods []string, url string, expectedClientIPs, expectedHostnames []string) { +func testLoadBalancerLocalFromPod(t *testing.T, data *TestData, pods []string, url string) { errMsg := "Service NodePort whose externalTrafficPolicy is Local should be able to be connected from Pod" - for idx, pod := range pods { - hostname, err := probeHostnameFromPod(data, pod, busyboxContainerName, url) - require.NoError(t, err, errMsg) - require.Equal(t, hostname, expectedHostnames[idx]) - - clientIP, err := probeClientIPFromPod(data, pod, busyboxContainerName, url) - require.NoError(t, err, errMsg) - require.Equal(t, clientIP, expectedClientIPs[idx]) + for _, pod := range pods { + require.NoError(t, probeFromPod(data, pod, busyboxContainerName, url), errMsg) } } @@ -404,10 +396,10 @@ func nodePortTestCases(t *testing.T, data *TestData, portStrCluster, portStrLoca testNodePortLocalFromRemote(t, data, nodes, reverseStrs(localUrls), nodeIPs, reverseStrs(hostnames)) }) t.Run("ExternalTrafficPolicy:Local/Client:Node", func(t *testing.T) { - testNodePortLocalFromNode(t, data, nodes, localUrls, hostnames) + testNodePortLocalFromNode(t, data, nodes, localUrls) }) t.Run("ExternalTrafficPolicy:Local/Client:Pod", func(t *testing.T) { - testNodePortLocalFromPod(t, data, pods, localUrls, podIPs, hostnames) + testNodePortLocalFromPod(t, data, pods, localUrls) }) } @@ -532,25 +524,16 @@ func testNodePortLocalFromRemote(t *testing.T, data *TestData, nodes, urls, expe } } -func testNodePortLocalFromNode(t *testing.T, data *TestData, nodes, urls, expectedHostnames []string) { +func testNodePortLocalFromNode(t *testing.T, data *TestData, nodes, urls []string) { skipIfKubeProxyEnabled(t, data) for idx, node := range nodes { - hostname, err := probeHostnameFromNode(node, urls[idx], data) - require.NoError(t, err, "Service NodePort whose externalTrafficPolicy is Local should be able to be connected rom Node") - require.Equal(t, expectedHostnames[idx], hostname) + require.NoError(t, probeFromNode(node, urls[idx], data), "Service NodePort whose externalTrafficPolicy is Local should be able to be connected from Node") } } -func testNodePortLocalFromPod(t *testing.T, data *TestData, pods, urls, expectedClientIPs, expectedHostnames []string) { - errMsg := "There should be no errors when accessing to Service NodePort whose externalTrafficPolicy is Local from Pod" +func testNodePortLocalFromPod(t *testing.T, data *TestData, pods, urls []string) { for idx, pod := range pods { - hostname, err := probeHostnameFromPod(data, pod, busyboxContainerName, urls[idx]) - require.NoError(t, err, errMsg) - require.Equal(t, expectedHostnames[idx], hostname) - - clientIP, err := probeClientIPFromPod(data, pod, busyboxContainerName, urls[idx]) - require.NoError(t, err, errMsg) - require.Equal(t, expectedClientIPs[idx], clientIP) + require.NoError(t, probeFromPod(data, pod, busyboxContainerName, urls[idx]), "There should be no errors when accessing to Service NodePort whose externalTrafficPolicy is Local from Pod") } } diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 7a6860d66b3..19a0931bb20 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -784,7 +784,7 @@ func installServiceFlows(t *testing.T, gid uint32, svc svcConfig, endpointList [ assert.NoError(t, err, "no error should return when installing flows for Endpoints") err = c.InstallServiceGroup(groupID, svc.withSessionAffinity, endpointList) assert.NoError(t, err, "no error should return when installing groups for Service") - err = c.InstallServiceFlows(groupID, svc.ip, svc.port, svc.protocol, stickyMaxAgeSeconds, false, false) + err = c.InstallServiceFlows(groupID, ofconfig.GroupIDType(0), svc.ip, svc.port, svc.protocol, stickyMaxAgeSeconds, false, false) assert.NoError(t, err, "no error should return when installing flows for Service") } diff --git a/third_party/proxy/service.go b/third_party/proxy/service.go index b49c1bd7598..b043be3e0e5 100644 --- a/third_party/proxy/service.go +++ b/third_party/proxy/service.go @@ -164,10 +164,10 @@ func (info *BaseServiceInfo) ExternallyAccessible() bool { // UsesClusterEndpoints is part of ServicePort interface. func (info *BaseServiceInfo) UsesClusterEndpoints() bool { - // TODO(hongliang): support short-circuit. Refer to this link https://github.com/kubernetes/kubernetes/issues/108526 - // for more details. - // The service port uses Cluster endpoints if the internal or external traffic policy is "Cluster". - return !info.internalPolicyLocal || (!info.externalPolicyLocal && info.ExternallyAccessible()) + // The service port uses Cluster endpoints if the internal traffic policy is "Cluster", + // or it is externally accessible (like NodePort, LoadBalancer or ExternalIP, even the + // external traffic policy is "Local", we need Cluster endpoints to implement short-circuiting.) + return !info.internalPolicyLocal || info.ExternallyAccessible() } // UsesLocalEndpoints is part of ServicePort interface.