Skip to content

Commit

Permalink
Refine Endpoint selection for MC Service
Browse files Browse the repository at this point in the history
Add a new flow for the Service's ClusterIP in the EndpointDNAT table with
group action. When an Endpoint of a Multi-cluster Service is a local Service
ClusterIP and being selected, it will go to the corresponding exported Service's
group to select the final Endpoint. This can avoid that the traffic goes out of the
OVS bridge from antrea-gw0 (and handled by kube-proxy when it is running) and
comes back again.

The proposal details can be found in the comment:
#4508 (comment)

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Mar 9, 2023
1 parent c440d81 commit 49ede2d
Show file tree
Hide file tree
Showing 12 changed files with 194 additions and 116 deletions.
6 changes: 3 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ func run(o *Options) error {

switch {
case v4Enabled && v6Enabled:
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter)
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v4GroupCounter, v6GroupCounter)
case v4Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v4GroupCounter)
case v6Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v6GroupCounter)
default:
return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled")
Expand Down
10 changes: 7 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ type Client interface {
// otherwise the installation will fail.
// nodeLocalExternal represents if the externalTrafficPolicy is Local or not. This field is meaningful only when
// the svcType is NodePort or LoadBalancer.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error
// nested represents if the Service has the Endpoints which is other Service's ClusterIP.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nested bool) error
// UninstallServiceFlows removes flows installed by InstallServiceFlows.
UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

Expand Down Expand Up @@ -692,14 +693,17 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint prox
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error {
func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nested bool) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal, svcType))
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal, svcType, nested))
if affinityTimeout != 0 {
flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, svcType))
}
if svcType == v1.ServiceTypeClusterIP && !nested {
flows = append(flows, c.featureService.endpointRedirectFlowForServiceIP(svcIP, svcPort, protocol, groupID))
}
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.featureService.cachedFlows, cacheKey, flows)
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1058,23 +1058,36 @@ func Test_client_InstallServiceFlows(t *testing.T) {
nodeLocalExternal bool
svcType corev1.ServiceType
expectedFlows []string
nested bool
}{
{
name: "Service ClusterIP",
protocol: binding.ProtocolTCP,
svcIP: svcIPv4,
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=EndpointDNAT, priority=210,tcp,reg3=0xa600064,reg4=0x1020050/0x107ffff actions=group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x20000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,group:100",
},
},
{
name: "Service ClusterIP, nested",
protocol: binding.ProtocolTCP,
svcIP: svcIPv4,
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x20000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,set_field:0x1000000/0x1000000->reg4,group:100",
},
nested: true,
},
{
name: "Service ClusterIP,SessionAffinity",
protocol: binding.ProtocolTCP,
svcIP: svcIPv4,
affinityTimeout: uint16(100),
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=EndpointDNAT, priority=210,tcp,reg3=0xa600064,reg4=0x1020050/0x107ffff actions=group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x30000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,group:100",
"cookie=0x1030000000064, table=ServiceLB, priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,load:0x800->NXM_OF_ETH_TYPE[],load:0x6->NXM_OF_IP_PROTO[],load:OXM_OF_TCP_DST[]->OXM_OF_TCP_DST[],load:NXM_OF_IP_DST[]->NXM_OF_IP_DST[],load:NXM_OF_IP_SRC[]->NXM_OF_IP_SRC[],NXM_NX_REG3[],NXM_NX_REG4[0..15],reg4=0x2,reg0=0x1),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT",
},
Expand All @@ -1086,6 +1099,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {
affinityTimeout: uint16(100),
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=EndpointDNAT, priority=210,tcp6,reg4=0x1020050/0x107ffff actions=group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp6,reg4=0x10000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x30000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,group:100",
"cookie=0x1030000000064, table=ServiceLB, priority=190,tcp6,reg4=0x30000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,load:0x86dd->NXM_OF_ETH_TYPE[],load:0x6->NXM_OF_IP_PROTO[],load:OXM_OF_TCP_DST[]->OXM_OF_TCP_DST[],load:NXM_NX_IPV6_DST[]->NXM_NX_IPV6_DST[],load:NXM_NX_IPV6_SRC[]->NXM_NX_IPV6_SRC[],NXM_NX_XXREG3[],NXM_NX_REG4[0..15],reg4=0x2,reg0=0x1),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT",
},
Expand Down Expand Up @@ -1149,7 +1163,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {

cacheKey := generateServicePortFlowCacheKey(tc.svcIP, port, tc.protocol)

assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.nodeLocalExternal, tc.svcType))
assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.nodeLocalExternal, tc.svcType, tc.nested))
fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey)
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))
Expand Down Expand Up @@ -1179,7 +1193,7 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
proxy.NewBaseEndpointInfo("10.10.0.12", "", "", 80, true, true, false, false, nil),
}

assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, corev1.ServiceTypeLoadBalancer))
assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, corev1.ServiceTypeLoadBalancer, false))
assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints))
flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints)
expectedFlowKeys := []string{
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ var (
// externalTrafficPolicy is Cluster.
ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21)
// reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include:
TrafficControlActionField = binding.NewRegField(4, 22, 23)
TrafficControlActionField = binding.NewRegField(4, 22, 23)
// reg4[24]: Mark to indicate whether the Endpoints of a Service includes other Service's ClusterIP.
NestedServiceRegMark = binding.NewOneBitRegMark(4, 24)
TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01)
TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10)

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (f *featureMulticluster) snatConntrackFlows(serviceCIDR net.IPNet, localGat
ipProtocol := getIPProtocol(localGatewayIP)
cookieID := f.cookieAllocator.Request(f.category).Raw()
flows = append(flows,
// This generates the flow to match the first packet of multicluster Service connection, and commit them into
// This generates the flow to match the first packet of multi-cluster Service connection, and commit them into
// DNAT zone to make sure DNAT is performed before SNAT for any remote cluster traffic.
SNATMarkTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
Expand Down
30 changes: 29 additions & 1 deletion pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2364,7 +2364,8 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
protocol binding.Protocol,
withSessionAffinity,
nodeLocalExternal bool,
serviceType v1.ServiceType) binding.Flow {
serviceType v1.ServiceType,
nested bool) binding.Flow {
cookieID := f.cookieAllocator.Request(f.category).Raw()
var lbResultMark *binding.RegMark
if withSessionAffinity {
Expand Down Expand Up @@ -2406,9 +2407,36 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
if f.enableAntreaPolicy {
flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID))
}
if nested {
// `nested` is true only when a Service is a Multi-cluster Service for now.
flowBuilder = flowBuilder.Action().LoadRegMark(NestedServiceRegMark)
}
return flowBuilder.Action().Group(groupID).Done()
}

// endpointRedirectFlowForServiceIP generates the flow which uses the specific group for a Service's ClusterIP
// to do final Endpoint selection.
func (f *featureService) endpointRedirectFlowForServiceIP(clusterIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) binding.Flow {
unionVal := (EpSelectedRegMark.GetValue() << EndpointPortField.GetRange().Length()) + uint32(svcPort)
flowBuilder := EndpointDNATTable.ofTable.BuildFlow(priorityHigh).
MatchProtocol(protocol).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchRegFieldWithValue(EpUnionField, unionVal).
MatchRegMark(NestedServiceRegMark)
ipProtocol := getIPProtocol(clusterIP)

if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(clusterIP.To4())
flowBuilder = flowBuilder.MatchRegFieldWithValue(EndpointIPField, ipVal)
} else {
ipVal := []byte(clusterIP)
flowBuilder = flowBuilder.MatchXXReg(EndpointIP6Field.GetRegID(), ipVal)
}
return flowBuilder.Action().
Group(groupID).
Done()
}

// endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint
// selection decision which is stored in regs.
func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16, protocol binding.Protocol) binding.Flow {
Expand Down
10 changes: 5 additions & 5 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 18 additions & 8 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
resyncPeriod = time.Minute
componentName = "antrea-agent-proxy"
// SessionAffinity timeout is implemented using a hard_timeout in OVS. hard_timeout is
// represented by a uint16 in the OpenFlow protocol,
// represented by a uint16 in the OpenFlow protocol.
maxSupportedAffinityTimeout = math.MaxUint16
)

Expand Down Expand Up @@ -119,6 +119,7 @@ type proxier struct {
endpointSliceEnabled bool
proxyLoadBalancerIPs bool
topologyAwareHintsEnabled bool
supportNestedService bool
}

func (p *proxier) SyncedOnce() bool {
Expand Down Expand Up @@ -288,7 +289,7 @@ func (p *proxier) installNodePortService(groupID binding.GroupIDType, svcPort ui
if p.isIPv6 {
svcIP = agentconfig.VirtualNodePortDNATIPv6
}
if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeNodePort); err != nil {
if err := p.ofClient.InstallServiceFlows(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeNodePort, false); err != nil {
return fmt.Errorf("failed to install Service NodePort load balancing flows: %w", err)
}
if err := p.routeClient.AddNodePort(p.nodePortAddresses, svcPort, protocol); err != nil {
Expand All @@ -314,7 +315,7 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot
func (p *proxier) installLoadBalancerService(groupID binding.GroupIDType, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool) error {
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer); err != nil {
if err := p.ofClient.InstallServiceFlows(groupID, net.ParseIP(ingress), svcPort, protocol, affinityTimeout, nodeLocalExternal, corev1.ServiceTypeLoadBalancer, false); err != nil {
return fmt.Errorf("failed to install Service LoadBalancer load balancing flows: %w", err)
}
}
Expand Down Expand Up @@ -533,9 +534,15 @@ func (p *proxier) installServices() {
}
}

var isNestedService bool
if p.supportNestedService {
// Check the `IsNested` field only when Proxy is enabled with `supportNestedService`.
isNestedService = svcInfo.IsNested
}

// Install ClusterIP flows for the Service.
groupID := p.groupCounter.AllocateIfNotExist(svcPortName, internalPolicyLocal)
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalPolicyLocal, corev1.ServiceTypeClusterIP); err != nil {
if err := p.ofClient.InstallServiceFlows(groupID, svcInfo.ClusterIP(), uint16(svcInfo.Port()), svcInfo.OFProtocol, uint16(affinityTimeout), externalPolicyLocal, corev1.ServiceTypeClusterIP, isNestedService); err != nil {
klog.Errorf("Error when installing Service flows: %v", err)
continue
}
Expand Down Expand Up @@ -898,7 +905,8 @@ func NewProxier(
proxyAllEnabled bool,
skipServices []string,
proxyLoadBalancerIPs bool,
groupCounter types.GroupCounter) *proxier {
groupCounter types.GroupCounter,
supportNestedService bool) *proxier {
recorder := record.NewBroadcaster().NewRecorder(
runtime.NewScheme(),
corev1.EventSource{Component: componentName, Host: hostname},
Expand Down Expand Up @@ -946,6 +954,7 @@ func NewProxier(
hostname: hostname,
serviceHealthServer: serviceHealthServer,
numLocalEndpoints: map[apimachinerytypes.NamespacedName]int{},
supportNestedService: supportNestedService,
}

p.serviceConfig.RegisterEventHandler(p)
Expand Down Expand Up @@ -1005,13 +1014,14 @@ func NewDualStackProxier(
skipServices []string,
proxyLoadBalancerIPs bool,
v4groupCounter types.GroupCounter,
v6groupCounter types.GroupCounter) *metaProxierWrapper {
v6groupCounter types.GroupCounter,
nestedServiceSupport bool) *metaProxierWrapper {

// Create an IPv4 instance of the single-stack proxier.
ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAllEnabled, skipServices, proxyLoadBalancerIPs, v4groupCounter)
ipv4Proxier := NewProxier(hostname, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAllEnabled, skipServices, proxyLoadBalancerIPs, v4groupCounter, nestedServiceSupport)

// Create an IPv6 instance of the single-stack proxier.
ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAllEnabled, skipServices, proxyLoadBalancerIPs, v6groupCounter)
ipv6Proxier := NewProxier(hostname, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAllEnabled, skipServices, proxyLoadBalancerIPs, v6groupCounter, nestedServiceSupport)

// Create a meta-proxier that dispatch calls between the two
// single-stack proxier instances.
Expand Down
Loading

0 comments on commit 49ede2d

Please sign in to comment.