Skip to content

Commit

Permalink
Add support for short-circuiting in AntreaProxy
Browse files Browse the repository at this point in the history
Short-circuiting is used to ensure that Pod/Node to local NodePort, LoadBalancer,
or ExternalIP can work, regardless of the externalTrafficPolicy of the Service.

Previously, externalTrafficPolicy determined the selected Endpoint for Pod/Node clients
and external clients. With "Cluster", Endpoints were selected from all Endpoints across
the cluster, while "Local" only selected from local Endpoints. With this PR, even when
externalTrafficPolicy is set to "Local", Pod/Node clients without local Endpoints can
still work, because Endpoints from the cluster can be selected.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed May 11, 2023
1 parent f2e17bb commit 194f5db
Show file tree
Hide file tree
Showing 9 changed files with 333 additions and 54 deletions.
32 changes: 31 additions & 1 deletion pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,14 @@ type Client interface {
// UninstallServiceFlows removes flows installed by InstallServiceFlows.
UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

// InstallServiceShortCircuitingFlows installs flows for accessing Service NodePort, LoadBalancer and ExternalIP from
// local Node or Pod when their externalTrafficPolicy is Local. It also installs the flow that uses the group/bucket
// to do Service LB, similar to InstallServiceFlows. Note that the group includes all Endpoints, while the flow
// selectively matches traffic originating from the local Pod CIDR.
InstallServiceShortCircuitingFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error
// UninstallServiceShortCircuitingFlows removes flows installed by InstallServiceShortCircuitingFlows.
UninstallServiceShortCircuitingFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

// GetFlowTableStatus should return an array of flow table status, all existing flow tables should be included in the list.
GetFlowTableStatus() []binding.TableStatus

Expand Down Expand Up @@ -712,6 +720,10 @@ func generateServicePortFlowCacheKey(svcIP net.IP, svcPort uint16, protocol bind
return fmt.Sprintf("S%s%s%x", svcIP, protocol, svcPort)
}

func generateServicePortShortCircuitingFlowCacheKey(svcIP net.IP, svcPort uint16, protocol binding.Protocol) string {
return fmt.Sprintf("S%s%s%x_short_circuiting", svcIP, protocol, svcPort)
}

func (c *client) InstallEndpointFlows(protocol binding.Protocol, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
Expand Down Expand Up @@ -757,7 +769,7 @@ func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP,
defer c.replayMutex.RUnlock()
var flows []binding.Flow
nodePortAddress := svcIP.Equal(config.VirtualNodePortDNATIPv4) || svcIP.Equal(config.VirtualNodePortDNATIPv6)
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, externalAddress, nodePortAddress, nested))
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, externalAddress, nodePortAddress, nested, false))
if affinityTimeout != 0 {
flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, externalAddress, nodePortAddress))
}
Expand All @@ -775,9 +787,27 @@ func (c *client) UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol bi
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) InstallServiceShortCircuitingFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
isNodePortSvc := svcIP.Equal(config.VirtualNodePortDNATIPv4) || svcIP.Equal(config.VirtualNodePortDNATIPv6)
flows := []binding.Flow{c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, true, isNodePortSvc, false, true)}
cacheKey := generateServicePortShortCircuitingFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.featureService.cachedFlows, cacheKey, flows)
}

func (c *client) UninstallServiceShortCircuitingFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
cacheKey := generateServicePortShortCircuitingFlowCacheKey(svcIP, svcPort, protocol)
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) GetServiceFlowKeys(svcIP net.IP, svcPort uint16, protocol binding.Protocol, endpoints []proxy.Endpoint) []string {
cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
flowKeys := c.getFlowKeysFromCache(c.featureService.cachedFlows, cacheKey)
cacheKey = generateServicePortShortCircuitingFlowCacheKey(svcIP, svcPort, protocol)
flowKeys = append(flowKeys, c.getFlowKeysFromCache(c.featureService.cachedFlows, cacheKey)...)
for _, ep := range endpoints {
epPort, _ := ep.Port()
cacheKey = generateEndpointFlowCacheKey(ep.IP(), epPort, protocol)
Expand Down
76 changes: 76 additions & 0 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,6 +1191,82 @@ func Test_client_InstallServiceFlows(t *testing.T) {
}
}

func Test_client_InstallServiceShortCircuitingFlows(t *testing.T) {
groupID := binding.GroupIDType(100)
svcIPv4 := net.ParseIP("10.96.0.100")
svcIPv6 := net.ParseIP("fec0:10:96::100")
port := uint16(80)

testCases := []struct {
name string
protocol binding.Protocol
svcIP net.IP
affinityTimeout uint16
expectedFlows []string
}{
{
name: "Service NodePort,SessionAffinity",
protocol: binding.ProtocolUDP,
svcIP: config.VirtualNodePortDNATIPv4,
affinityTimeout: uint16(100),
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceLB, priority=210,udp,reg4=0x90000/0xf0000,nw_src=10.10.0.0/24,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100",
},
},
{
name: "Service NodePort,SessionAffinity,IPv6",
protocol: binding.ProtocolUDPv6,
svcIP: config.VirtualNodePortDNATIPv6,
affinityTimeout: uint16(100),
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceLB, priority=210,udp6,reg4=0x90000/0xf0000,ipv6_src=fec0:10:10::/80,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100",
},
},
{
name: "Service LoadBalancer,SessionAffinity",
protocol: binding.ProtocolSCTP,
svcIP: svcIPv4,
affinityTimeout: uint16(100),
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceLB, priority=210,sctp,reg4=0x10000/0x70000,nw_src=10.10.0.0/24,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100",
},
},
{
name: "Service LoadBalancer,SessionAffinity,IPv6",
protocol: binding.ProtocolSCTPv6,
svcIP: svcIPv6,
affinityTimeout: uint16(100),
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceLB, priority=210,sctp6,reg4=0x10000/0x70000,ipv6_src=fec0:10:10::/80,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x200/0x200->reg0,set_field:0x30000/0x70000->reg4,set_field:0x200000/0x200000->reg4,set_field:0x64->reg7,group:100",
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
m := oftest.NewMockOFEntryOperations(ctrl)

fc := newFakeClient(m, true, true, config.K8sNode, config.TrafficEncapModeEncap)
defer resetPipelines()

m.EXPECT().AddAll(gomock.Any()).Return(nil).Times(1)
m.EXPECT().DeleteAll(gomock.Any()).Return(nil).Times(1)

cacheKey := generateServicePortShortCircuitingFlowCacheKey(tc.svcIP, port, tc.protocol)

assert.NoError(t, fc.InstallServiceShortCircuitingFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout))
fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey)
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))

assert.NoError(t, fc.UninstallServiceShortCircuitingFlows(tc.svcIP, port, tc.protocol))
_, ok = fc.featureService.cachedFlows.Load(cacheKey)
require.False(t, ok)
})
}
}

func Test_client_GetServiceFlowKeys(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
14 changes: 12 additions & 2 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2377,11 +2377,21 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
withSessionAffinity bool,
externalAddress bool,
nodePortAddress bool,
nested bool) binding.Flow {
flowBuilder := ServiceLBTable.ofTable.BuildFlow(priorityNormal).
nested bool,
isShortCircuiting bool) binding.Flow {
priority := priorityNormal
// For short-circuiting flow, priority should be higher since an extra match condition is added.
if isShortCircuiting {
priority = priorityHigh
}
flowBuilder := ServiceLBTable.ofTable.BuildFlow(priority).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchProtocol(protocol).
MatchDstPort(svcPort, nil)
// For short-circuiting flow, an extra match condition matching packet from local Pod CIDR is added.
if isShortCircuiting {
flowBuilder = flowBuilder.MatchSrcIPNet(f.localCIDRs[getIPProtocol(svcIP)])
}

// EpToSelectRegMark is required to match the packets that haven't undergone Endpoint selection yet.
regMarksToMatch := []*binding.RegMark{EpToSelectRegMark}
Expand Down
9 changes: 9 additions & 0 deletions pkg/agent/openflow/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type featureService struct {
gatewayMAC net.HardwareAddr
nodePortAddresses map[binding.Protocol][]net.IP
serviceCIDRs map[binding.Protocol]net.IPNet
localCIDRs map[binding.Protocol]net.IPNet
networkConfig *config.NetworkConfig
gatewayPort uint32

Expand Down Expand Up @@ -75,6 +76,7 @@ func newFeatureService(
snatCtZones := make(map[binding.Protocol]int)
nodePortAddresses := make(map[binding.Protocol][]net.IP)
serviceCIDRs := make(map[binding.Protocol]net.IPNet)
localCIDRs := make(map[binding.Protocol]net.IPNet)
for _, ipProtocol := range ipProtocols {
if ipProtocol == binding.ProtocolIP {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv4
Expand All @@ -86,6 +88,9 @@ func newFeatureService(
if serviceConfig.ServiceCIDR != nil {
serviceCIDRs[ipProtocol] = *serviceConfig.ServiceCIDR
}
if nodeConfig.PodIPv4CIDR != nil {
localCIDRs[ipProtocol] = *nodeConfig.PodIPv4CIDR
}
} else if ipProtocol == binding.ProtocolIPv6 {
gatewayIPs[ipProtocol] = nodeConfig.GatewayConfig.IPv6
virtualIPs[ipProtocol] = config.VirtualServiceIPv6
Expand All @@ -96,6 +101,9 @@ func newFeatureService(
if serviceConfig.ServiceCIDRv6 != nil {
serviceCIDRs[ipProtocol] = *serviceConfig.ServiceCIDRv6
}
if nodeConfig.PodIPv6CIDR != nil {
localCIDRs[ipProtocol] = *nodeConfig.PodIPv6CIDR
}
}
}

Expand All @@ -112,6 +120,7 @@ func newFeatureService(
snatCtZones: snatCtZones,
nodePortAddresses: nodePortAddresses,
serviceCIDRs: serviceCIDRs,
localCIDRs: localCIDRs,
gatewayMAC: nodeConfig.GatewayConfig.MAC,
gatewayPort: nodeConfig.GatewayConfig.OFPort,
networkConfig: networkConfig,
Expand Down
28 changes: 28 additions & 0 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 194f5db

Please sign in to comment.