Skip to content

Commit

Permalink
Refine Endpoint selection for multi-cluster Service (antrea-io#4508)
Browse files Browse the repository at this point in the history
When an Endpoint of a Multi-cluster Service is a local Service ClusterIP,
change the flow action to let it go to the corresponding exported Service's
group to select the endpoint. This can avoid that the traffic goes out of
the OVS bridge from antrea-gw0 (and handled by kube-proxy when it is
running) and comes back again.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Mar 7, 2023
1 parent 5846f82 commit 6cdbca3
Show file tree
Hide file tree
Showing 15 changed files with 392 additions and 106 deletions.
6 changes: 3 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ func run(o *Options) error {

switch {
case v4Enabled && v6Enabled:
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter)
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
groupCounters = append(groupCounters, v4GroupCounter, v6GroupCounter)
case v4Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
groupCounters = append(groupCounters, v4GroupCounter)
case v6Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
groupCounters = append(groupCounters, v6GroupCounter)
default:
return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled")
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
openflowtypes "antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
Expand Down Expand Up @@ -84,7 +85,7 @@ type Client interface {

// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints []proxy.Endpoint) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error
Expand Down Expand Up @@ -618,11 +619,11 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
return c.getFlowKeysFromCache(c.featurePodConnectivity.podCachedFlows, interfaceName)
}

func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, mcsLocalService, endpoints...)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil {
Expand Down
19 changes: 17 additions & 2 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
oftest "antrea.io/antrea/pkg/agent/openflow/testing"
"antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
ovsoftest "antrea.io/antrea/pkg/ovs/openflow/testing"
Expand Down Expand Up @@ -844,12 +845,17 @@ func Test_client_GetPodFlowKeys(t *testing.T) {

func Test_client_InstallServiceGroup(t *testing.T) {
groupID := binding.GroupIDType(100)
mcsLocalService := &types.ServiceGroupInfo{
GroupID: binding.GroupIDType(2),
Endpoint: proxy.NewBaseEndpointInfo("10.10.0.101", "", "", 80, false, true, false, false, nil),
}

testCases := []struct {
name string
withSessionAffinity bool
endpoints []proxy.Endpoint
expectedGroup string
mcsLocalService *types.ServiceGroupInfo
}{
{
name: "IPv4 Endpoints",
Expand All @@ -861,6 +867,16 @@ func Test_client_InstallServiceGroup(t *testing.T) {
"bucket=bucket_id:0,weight:100,actions=set_field:0xa0a0064->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT," +
"bucket=bucket_id:1,weight:100,actions=set_field:0xa0a0065->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT",
},
{
name: "IPv4 Endpoints with multi-cluster enabled",
endpoints: []proxy.Endpoint{
proxy.NewBaseEndpointInfo("10.10.0.100", "", "", 80, false, true, false, false, nil),
},
mcsLocalService: mcsLocalService,
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0xa0a0064->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT," +
"bucket=bucket_id:1,weight:100,actions=group:2",
},
{
name: "IPv6 Endpoints",
endpoints: []proxy.Endpoint{
Expand Down Expand Up @@ -906,8 +922,7 @@ func Test_client_InstallServiceGroup(t *testing.T) {

m.EXPECT().AddOFEntries(gomock.Any()).Return(nil).Times(1)
m.EXPECT().DeleteOFEntries(gomock.Any()).Return(nil).Times(1)

assert.NoError(t, fc.InstallServiceGroup(groupID, tc.withSessionAffinity, tc.endpoints))
assert.NoError(t, fc.InstallServiceGroup(groupID, tc.withSessionAffinity, tc.mcsLocalService, tc.endpoints))
gCacheI, ok := fc.featureService.groupCache.Load(groupID)
require.True(t, ok)
group := getGroupFromCache(gCacheI.(binding.Group))
Expand Down
17 changes: 14 additions & 3 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
openflowtypes "antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
Expand Down Expand Up @@ -199,6 +200,9 @@ var (
// between the uplink and its pair port directly.
NonIPTable = newTable("NonIP", stageClassifier, pipelineNonIP, defaultDrop)

// Default bucket weight in an Openflow Group
defaultGroupBucketWeight = uint16(100)

// Flow priority level
priorityHigh = uint16(210)
priorityNormal = uint16(200)
Expand Down Expand Up @@ -2442,7 +2446,9 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16
// serviceEndpointGroup creates/modifies the group/buckets of Endpoints. If the withSessionAffinity is true, then buckets
// will resubmit packets back to ServiceLBTable to trigger the learn flow, the learn flow will then send packets to
// EndpointDNATTable. Otherwise, buckets will resubmit packets to EndpointDNATTable directly.
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
// When mcsLocalService is not nil, the Service is a Multi-cluster Service with a member Service in the local cluster,
// in which case the packets should go to the local Service's group, the action will go to the group of the exported Service.
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints ...proxy.Endpoint) binding.Group {
group := f.bridge.CreateGroup(groupID).ResetBuckets()
var resubmitTableID uint8
if withSessionAffinity {
Expand All @@ -2458,20 +2464,25 @@ func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withS

if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(endpointIP.To4())
group = group.Bucket().Weight(100).
group = group.Bucket().Weight(defaultGroupBucketWeight).
LoadToRegField(EndpointIPField, ipVal).
LoadToRegField(EndpointPortField, uint32(portVal)).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(100).
group = group.Bucket().Weight(defaultGroupBucketWeight).
LoadXXReg(EndpointIP6Field.GetRegID(), ipVal).
LoadToRegField(EndpointPortField, uint32(portVal)).
ResubmitToTable(resubmitTableID).
Done()
}
}
if mcsLocalService != nil {
group = group.Bucket().Weight(defaultGroupBucketWeight).
Group(uint32(mcsLocalService.GroupID)).
Done()
}
return group
}

Expand Down
47 changes: 24 additions & 23 deletions pkg/agent/openflow/testing/mock_openflow.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

29 changes: 29 additions & 0 deletions pkg/agent/openflow/types/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package types

import (
binding "antrea.io/antrea/pkg/ovs/openflow"
k8sproxy "antrea.io/antrea/third_party/proxy"
)

// ServiceGroupInfo is used in AntreaProxy for Multi-cluster Service load-balancing.
// It stores a local exported Service's GroupID and its ClusterIP.
type ServiceGroupInfo struct {
// GroupID of an exported Service.
GroupID binding.GroupIDType
// ClusterIP info of an exported Service.
Endpoint k8sproxy.Endpoint
}
Loading

0 comments on commit 6cdbca3

Please sign in to comment.