Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refine Endpoint selection for Multi-cluster Service #4693

Merged
merged 2 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,13 +376,13 @@ func run(o *Options) error {

switch {
case v4Enabled && v6Enabled:
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
proxier = proxy.NewDualStackProxier(nodeConfig.Name, informerFactory, ofClient, routeClient, nodePortAddressesIPv4, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, v6GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v4GroupCounter, v6GroupCounter)
case v4Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, false, routeClient, nodePortAddressesIPv4, proxyAll, skipServices, proxyLoadBalancerIPs, v4GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v4GroupCounter)
case v6Enabled:
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW, serviceCIDRProvider)
proxier = proxy.NewProxier(nodeConfig.Name, informerFactory, ofClient, true, routeClient, nodePortAddressesIPv6, proxyAll, skipServices, proxyLoadBalancerIPs, v6GroupCounter, enableMulticlusterGW)
groupCounters = append(groupCounters, v6GroupCounter)
default:
return fmt.Errorf("at least one of IPv4 or IPv6 should be enabled")
Expand Down
4 changes: 4 additions & 0 deletions multicluster/controllers/multicluster/common/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,7 @@ func HashLabelIdentity(l string) string {
hashValue := hex.EncodeToString(hash.Sum(nil))
return hashValue[:labelIdentityHashLength]
}

func IsMulticlusterService(service *corev1.Service) bool {
return service.Annotations[AntreaMCServiceAnnotation] == "true"
jianjuns marked this conversation as resolved.
Show resolved Hide resolved
}
17 changes: 10 additions & 7 deletions pkg/agent/openflow/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
openflowtypes "antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
Expand Down Expand Up @@ -85,7 +84,7 @@ type Client interface {

// InstallServiceGroup installs a group for Service LB. Each endpoint
// is a bucket of the group. For now, each bucket has the same weight.
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints []proxy.Endpoint) error
InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error
// UninstallServiceGroup removes the group and its buckets that are
// installed by InstallServiceGroup.
UninstallServiceGroup(groupID binding.GroupIDType) error
Expand All @@ -104,7 +103,8 @@ type Client interface {
// otherwise the installation will fail.
// nodeLocalExternal represents if the externalTrafficPolicy is Local or not. This field is meaningful only when
// the svcType is NodePort or LoadBalancer.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error
// nested represents if the Service has the Endpoints which is other Service's ClusterIP.
InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nested bool) error
// UninstallServiceFlows removes flows installed by InstallServiceFlows.
UninstallServiceFlows(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error

Expand Down Expand Up @@ -619,11 +619,11 @@ func (c *client) GetPodFlowKeys(interfaceName string) []string {
return c.getFlowKeysFromCache(c.featurePodConnectivity.podCachedFlows, interfaceName)
}

func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints []proxy.Endpoint) error {
func (c *client) InstallServiceGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints []proxy.Endpoint) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()

group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, mcsLocalService, endpoints...)
group := c.featureService.serviceEndpointGroup(groupID, withSessionAffinity, endpoints...)
_, installed := c.featureService.groupCache.Load(groupID)
if !installed {
if err := c.ofEntryOperations.AddOFEntries([]binding.OFEntry{group}); err != nil {
Expand Down Expand Up @@ -696,14 +696,17 @@ func (c *client) UninstallEndpointFlows(protocol binding.Protocol, endpoint prox
return c.deleteFlows(c.featureService.cachedFlows, cacheKey)
}

func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType) error {
func (c *client) InstallServiceFlows(groupID binding.GroupIDType, svcIP net.IP, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16, nodeLocalExternal bool, svcType v1.ServiceType, nested bool) error {
c.replayMutex.RLock()
defer c.replayMutex.RUnlock()
var flows []binding.Flow
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal, svcType))
flows = append(flows, c.featureService.serviceLBFlow(groupID, svcIP, svcPort, protocol, affinityTimeout != 0, nodeLocalExternal, svcType, nested))
if affinityTimeout != 0 {
flows = append(flows, c.featureService.serviceLearnFlow(groupID, svcIP, svcPort, protocol, affinityTimeout, nodeLocalExternal, svcType))
}
if svcType == v1.ServiceTypeClusterIP && !nested {
flows = append(flows, c.featureService.endpointRedirectFlowForServiceIP(svcIP, svcPort, protocol, groupID))
}
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
Comment on lines +707 to +709
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if svcType == v1.ServiceTypeClusterIP && !nested {
flows = append(flows, c.featureService.endpointRedirectFlowForServiceIP(svcIP, svcPort, protocol, groupID))
}
if svcType == v1.ServiceTypeClusterIP && nested != nil && !*nested {
flows = append(flows, c.featureService.endpointRedirectFlowForServiceIP(svcIP, svcPort, protocol, groupID))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this way, I think we can skip the ClusterIP flow of NodePort or LoadBalancer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in previous comment, I feel there is no need to skip it. They will be be supported via ClusterIP.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the only gap here is that we didn't officially announce that we support these two kinds of Service in multi-cluster docs, I probably will update it after double check MC controller codes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we don't officially announce and support that , maybe we shouldn't introduce extra flows which are not used totally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned, there are probably just doc updates involved to support NodePort or LoadBalancer with ClusterIP, so I prefer not to skip it, otherwise we may change this part back soon. If you have further concern, we can sync offline, thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ClusterIP from NodePort or LoadBalancer will be supported eventually, I'm ok with that.

cacheKey := generateServicePortFlowCacheKey(svcIP, svcPort, protocol)
return c.addFlows(c.featureService.cachedFlows, cacheKey, flows)
}
Expand Down
36 changes: 17 additions & 19 deletions pkg/agent/openflow/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/openflow/cookie"
oftest "antrea.io/antrea/pkg/agent/openflow/testing"
"antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/apis/crd/v1alpha2"
binding "antrea.io/antrea/pkg/ovs/openflow"
ovsoftest "antrea.io/antrea/pkg/ovs/openflow/testing"
Expand Down Expand Up @@ -845,17 +844,12 @@ func Test_client_GetPodFlowKeys(t *testing.T) {

func Test_client_InstallServiceGroup(t *testing.T) {
groupID := binding.GroupIDType(100)
mcsLocalService := &types.ServiceGroupInfo{
GroupID: binding.GroupIDType(2),
Endpoint: proxy.NewBaseEndpointInfo("10.10.0.101", "", "", 80, false, true, false, false, nil),
}

testCases := []struct {
name string
withSessionAffinity bool
endpoints []proxy.Endpoint
expectedGroup string
mcsLocalService *types.ServiceGroupInfo
deleteOFEntriesError error
}{
{
Expand All @@ -868,16 +862,6 @@ func Test_client_InstallServiceGroup(t *testing.T) {
"bucket=bucket_id:0,weight:100,actions=set_field:0xa0a0064->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT," +
"bucket=bucket_id:1,weight:100,actions=set_field:0xa0a0065->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT",
},
{
name: "IPv4 Endpoints with multi-cluster enabled",
endpoints: []proxy.Endpoint{
proxy.NewBaseEndpointInfo("10.10.0.100", "", "", 80, false, true, false, false, nil),
},
mcsLocalService: mcsLocalService,
expectedGroup: "group_id=100,type=select," +
"bucket=bucket_id:0,weight:100,actions=set_field:0xa0a0064->reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT," +
"bucket=bucket_id:1,weight:100,actions=group:2",
},
{
name: "IPv6 Endpoints",
endpoints: []proxy.Endpoint{
Expand Down Expand Up @@ -934,7 +918,7 @@ func Test_client_InstallServiceGroup(t *testing.T) {

m.EXPECT().AddOFEntries(gomock.Any()).Return(nil).Times(1)
m.EXPECT().DeleteOFEntries(gomock.Any()).Return(tc.deleteOFEntriesError).Times(1)
assert.NoError(t, fc.InstallServiceGroup(groupID, tc.withSessionAffinity, tc.mcsLocalService, tc.endpoints))
assert.NoError(t, fc.InstallServiceGroup(groupID, tc.withSessionAffinity, tc.endpoints))
gCacheI, ok := fc.featureService.groupCache.Load(groupID)
require.True(t, ok)
group := getGroupFromCache(gCacheI.(binding.Group))
Expand Down Expand Up @@ -1091,23 +1075,36 @@ func Test_client_InstallServiceFlows(t *testing.T) {
nodeLocalExternal bool
svcType corev1.ServiceType
expectedFlows []string
nested bool
}{
{
name: "Service ClusterIP",
protocol: binding.ProtocolTCP,
svcIP: svcIPv4,
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=EndpointDNAT, priority=210,tcp,reg3=0xa600064,reg4=0x1020050/0x107ffff actions=group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x20000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,group:100",
},
},
{
name: "Service ClusterIP, nested",
protocol: binding.ProtocolTCP,
svcIP: svcIPv4,
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x20000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,set_field:0x1000000/0x1000000->reg4,group:100",
},
nested: true,
},
{
name: "Service ClusterIP,SessionAffinity",
protocol: binding.ProtocolTCP,
svcIP: svcIPv4,
affinityTimeout: uint16(100),
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=EndpointDNAT, priority=210,tcp,reg3=0xa600064,reg4=0x1020050/0x107ffff actions=group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp,reg4=0x10000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=set_field:0x30000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,group:100",
"cookie=0x1030000000064, table=ServiceLB, priority=190,tcp,reg4=0x30000/0x70000,nw_dst=10.96.0.100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,load:0x800->NXM_OF_ETH_TYPE[],load:0x6->NXM_OF_IP_PROTO[],load:OXM_OF_TCP_DST[]->OXM_OF_TCP_DST[],load:NXM_OF_IP_DST[]->NXM_OF_IP_DST[],load:NXM_OF_IP_SRC[]->NXM_OF_IP_SRC[],NXM_NX_REG3[],NXM_NX_REG4[0..15],reg4=0x2,reg0=0x1),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT",
},
Expand All @@ -1119,6 +1116,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {
affinityTimeout: uint16(100),
svcType: corev1.ServiceTypeClusterIP,
expectedFlows: []string{
"cookie=0x1030000000000, table=EndpointDNAT, priority=210,tcp6,reg4=0x1020050/0x107ffff actions=group:100",
"cookie=0x1030000000000, table=ServiceLB, priority=200,tcp6,reg4=0x10000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=set_field:0x30000/0x70000->reg4,set_field:0x200/0x200->reg0,set_field:0x64->reg7,group:100",
"cookie=0x1030000000064, table=ServiceLB, priority=190,tcp6,reg4=0x30000/0x70000,ipv6_dst=fec0:10:96::100,tp_dst=80 actions=learn(table=SessionAffinity,hard_timeout=100,priority=200,delete_learned,cookie=0x1030000000064,load:0x86dd->NXM_OF_ETH_TYPE[],load:0x6->NXM_OF_IP_PROTO[],load:OXM_OF_TCP_DST[]->OXM_OF_TCP_DST[],load:NXM_NX_IPV6_DST[]->NXM_NX_IPV6_DST[],load:NXM_NX_IPV6_SRC[]->NXM_NX_IPV6_SRC[],NXM_NX_XXREG3[],NXM_NX_REG4[0..15],reg4=0x2,reg0=0x1),set_field:0x20000/0x70000->reg4,goto_table:EndpointDNAT",
},
Expand Down Expand Up @@ -1182,7 +1180,7 @@ func Test_client_InstallServiceFlows(t *testing.T) {

cacheKey := generateServicePortFlowCacheKey(tc.svcIP, port, tc.protocol)

assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.nodeLocalExternal, tc.svcType))
assert.NoError(t, fc.InstallServiceFlows(groupID, tc.svcIP, port, tc.protocol, tc.affinityTimeout, tc.nodeLocalExternal, tc.svcType, tc.nested))
fCacheI, ok := fc.featureService.cachedFlows.Load(cacheKey)
require.True(t, ok)
assert.ElementsMatch(t, tc.expectedFlows, getFlowStrings(fCacheI))
Expand Down Expand Up @@ -1212,7 +1210,7 @@ func Test_client_GetServiceFlowKeys(t *testing.T) {
proxy.NewBaseEndpointInfo("10.10.0.12", "", "", 80, true, true, false, false, nil),
}

assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, corev1.ServiceTypeLoadBalancer))
assert.NoError(t, fc.InstallServiceFlows(groupID, svcIP, svcPort, bindingProtocol, 100, true, corev1.ServiceTypeLoadBalancer, false))
assert.NoError(t, fc.InstallEndpointFlows(bindingProtocol, endpoints))
flowKeys := fc.GetServiceFlowKeys(svcIP, svcPort, bindingProtocol, endpoints)
expectedFlowKeys := []string{
Expand Down
4 changes: 3 additions & 1 deletion pkg/agent/openflow/fields.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,9 @@ var (
// externalTrafficPolicy is Cluster.
ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21)
// reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include:
TrafficControlActionField = binding.NewRegField(4, 22, 23)
TrafficControlActionField = binding.NewRegField(4, 22, 23)
// reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP.
NestedServiceRegMark = binding.NewOneBitRegMark(4, 24)
TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01)
TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10)

Expand Down
2 changes: 1 addition & 1 deletion pkg/agent/openflow/multicluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func (f *featureMulticluster) snatConntrackFlows(serviceCIDR net.IPNet, localGat
ipProtocol := getIPProtocol(localGatewayIP)
cookieID := f.cookieAllocator.Request(f.category).Raw()
flows = append(flows,
// This generates the flow to match the first packet of multicluster Service connection, and commit them into
// This generates the flow to match the first packet of multi-cluster Service connection, and commit them into
// DNAT zone to make sure DNAT is performed before SNAT for any remote cluster traffic.
SNATMarkTable.ofTable.BuildFlow(priorityHigh).
Cookie(cookieID).
Expand Down
46 changes: 31 additions & 15 deletions pkg/agent/openflow/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/metrics"
"antrea.io/antrea/pkg/agent/openflow/cookie"
openflowtypes "antrea.io/antrea/pkg/agent/openflow/types"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
binding "antrea.io/antrea/pkg/ovs/openflow"
Expand Down Expand Up @@ -200,9 +199,6 @@ var (
// between the uplink and its pair port directly.
NonIPTable = newTable("NonIP", stageClassifier, pipelineNonIP, defaultDrop)

// Default bucket weight in an Openflow Group
defaultGroupBucketWeight = uint16(100)

// Flow priority level
priorityHigh = uint16(210)
priorityNormal = uint16(200)
Expand Down Expand Up @@ -2368,7 +2364,8 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
protocol binding.Protocol,
withSessionAffinity,
nodeLocalExternal bool,
serviceType v1.ServiceType) binding.Flow {
serviceType v1.ServiceType,
nested bool) binding.Flow {
cookieID := f.cookieAllocator.Request(f.category).Raw()
var lbResultMark *binding.RegMark
if withSessionAffinity {
Expand Down Expand Up @@ -2410,9 +2407,35 @@ func (f *featureService) serviceLBFlow(groupID binding.GroupIDType,
if f.enableAntreaPolicy {
flowBuilder = flowBuilder.Action().LoadToRegField(ServiceGroupIDField, uint32(groupID))
}
if nested {
luolanzone marked this conversation as resolved.
Show resolved Hide resolved
flowBuilder = flowBuilder.Action().LoadRegMark(NestedServiceRegMark)
}
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
return flowBuilder.Action().Group(groupID).Done()
}

// endpointRedirectFlowForServiceIP generates the flow which uses the specific group for a Service's ClusterIP
// to do final Endpoint selection.
func (f *featureService) endpointRedirectFlowForServiceIP(clusterIP net.IP, svcPort uint16, protocol binding.Protocol, groupID binding.GroupIDType) binding.Flow {
hongliangl marked this conversation as resolved.
Show resolved Hide resolved
unionVal := (EpSelectedRegMark.GetValue() << EndpointPortField.GetRange().Length()) + uint32(svcPort)
flowBuilder := EndpointDNATTable.ofTable.BuildFlow(priorityHigh).
MatchProtocol(protocol).
Cookie(f.cookieAllocator.Request(f.category).Raw()).
MatchRegFieldWithValue(EpUnionField, unionVal).
MatchRegMark(NestedServiceRegMark)
ipProtocol := getIPProtocol(clusterIP)

if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(clusterIP.To4())
flowBuilder = flowBuilder.MatchRegFieldWithValue(EndpointIPField, ipVal)
} else {
ipVal := []byte(clusterIP)
flowBuilder = flowBuilder.MatchXXReg(EndpointIP6Field.GetRegID(), ipVal)
}
return flowBuilder.Action().
Group(groupID).
Done()
}

// endpointDNATFlow generates the flow which transforms the Service Cluster IP to the Endpoint IP according to the Endpoint
// selection decision which is stored in regs.
func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16, protocol binding.Protocol) binding.Flow {
Expand Down Expand Up @@ -2446,9 +2469,7 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16
// serviceEndpointGroup creates/modifies the group/buckets of Endpoints. If the withSessionAffinity is true, then buckets
// will resubmit packets back to ServiceLBTable to trigger the learn flow, the learn flow will then send packets to
// EndpointDNATTable. Otherwise, buckets will resubmit packets to EndpointDNATTable directly.
// When mcsLocalService is not nil, the Service is a Multi-cluster Service with a member Service in the local cluster,
// in which case the packets should go to the local Service's group, the action will go to the group of the exported Service.
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, mcsLocalService *openflowtypes.ServiceGroupInfo, endpoints ...proxy.Endpoint) binding.Group {
func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group {
group := f.bridge.CreateGroup(groupID).ResetBuckets()
var resubmitTableID uint8
if withSessionAffinity {
Expand All @@ -2464,25 +2485,20 @@ func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withS

if ipProtocol == binding.ProtocolIP {
ipVal := binary.BigEndian.Uint32(endpointIP.To4())
group = group.Bucket().Weight(defaultGroupBucketWeight).
group = group.Bucket().Weight(100).
LoadToRegField(EndpointIPField, ipVal).
LoadToRegField(EndpointPortField, uint32(portVal)).
ResubmitToTable(resubmitTableID).
Done()
} else if ipProtocol == binding.ProtocolIPv6 {
ipVal := []byte(endpointIP)
group = group.Bucket().Weight(defaultGroupBucketWeight).
group = group.Bucket().Weight(100).
LoadXXReg(EndpointIP6Field.GetRegID(), ipVal).
LoadToRegField(EndpointPortField, uint32(portVal)).
ResubmitToTable(resubmitTableID).
Done()
}
}
if mcsLocalService != nil {
group = group.Bucket().Weight(defaultGroupBucketWeight).
Group(uint32(mcsLocalService.GroupID)).
Done()
}
return group
}

Expand Down
Loading