Skip to content

Commit

Permalink
Remove the associated stale conntrack entries when UDP Endpoints are …
Browse files Browse the repository at this point in the history
…removed

Previously, when a client accessed a UDP Service through AntreaProxy, if the
selected backend Pod was deleted and no new available backend Pod was automatically
selected by AntreaProxy, the connection would always remain unavailable. This issue
occurred because a conntrack entry was generated through OVS ct action in AntreaProxy
when handling the first packet of the UDP connection. Even if the selected Endpoint
was removed, the conntrack entry's timeout would be flushed upon receiving a packet
from the client in AntreaProxy OVS pipeline. Consequently, the stale conntrack entry
would persist as long as the client continued sending packets to the UDP service,
causing AntreaProxy's OVS pipeline to direct the packets to the IP of the removed
backend Pod.

This PR addresses the issue by removing the stale conntrack entries when the
associated UDP Endpoints are removed. This ensures that a new available backend Pod
can be selected in AntreaProxy's OVS pipeline.

Please note the following:

- Currently, this implementation is only available on Linux.
- Due to the restriction of the go library 'netlink', there is no API to specify a
  target zone. As a result, when deleting a stale conntrack entry with a destination
  port (such as NodePort), not only will the conntrack entry whose destination port
  is the port added by AntreaProxy be deleted, but also the conntrack entry that is
  not added by AntreaProxy will be deleted. This behavior is unexpected, as only the
  conntrack entries added by AntreaProxy should be deleted.

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Jun 28, 2023
1 parent a84eb0e commit e441042
Show file tree
Hide file tree
Showing 8 changed files with 349 additions and 63 deletions.
84 changes: 79 additions & 5 deletions pkg/agent/proxy/proxier.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (p *proxier) removeStaleServices() {
}
// Remove associated Endpoints flows.
if endpoints, ok := p.endpointsInstalledMap[svcPortName]; ok {
if !p.removeStaleEndpoints(svcPortName, svcInfo.OFProtocol, endpoints) {
if !p.removeStaleEndpoints(svcPortName, svcInfo, endpoints) {
continue
}
delete(p.endpointsInstalledMap, svcPortName)
Expand All @@ -193,8 +193,8 @@ func (p *proxier) removeServiceFlows(svcInfo *types.ServiceInfo) bool {
svcInfoStr := svcInfo.String()
svcPort := uint16(svcInfo.Port())
svcProto := svcInfo.OFProtocol
// Remove ClusterIP flows.
if err := p.ofClient.UninstallServiceFlows(svcInfo.ClusterIP(), svcPort, svcProto); err != nil {
// Remove ClusterIP flows and configurations.
if err := p.uninstallClusterIPService(svcInfo.ClusterIP(), svcPort, svcProto); err != nil {
klog.ErrorS(err, "Error when uninstalling ClusterIP flows for Service", "ServiceInfo", svcInfoStr)
return false
}
Expand Down Expand Up @@ -265,7 +265,8 @@ func (p *proxier) removeServiceGroup(svcPortName k8sproxy.ServicePortName, local
// given Service. If the Endpoints are still referenced by any other Services, no flow will be removed.
// The method only returns an error if a data path operation fails. If the flows are successfully
// removed from the data path, the method returns nil.
func (p *proxier) removeStaleEndpoints(svcPortName k8sproxy.ServicePortName, protocol binding.Protocol, staleEndpoints map[string]k8sproxy.Endpoint) bool {
func (p *proxier) removeStaleEndpoints(svcPortName k8sproxy.ServicePortName, svcInfo *types.ServiceInfo, staleEndpoints map[string]k8sproxy.Endpoint) bool {
protocol := svcInfo.OFProtocol
var endpointsToRemove []k8sproxy.Endpoint

// Get all Endpoints whose reference counter is 1, and these Endpoints should be removed.
Expand Down Expand Up @@ -303,6 +304,37 @@ func (p *proxier) removeStaleEndpoints(svcPortName k8sproxy.ServicePortName, pro
return true
}

func (p *proxier) removeStaleEndpointConntrackEntries(svcPortName k8sproxy.ServicePortName, svcInfo *types.ServiceInfo, staleEndpoints map[string]k8sproxy.Endpoint) bool {
svcPort := uint16(svcInfo.Port())
nodePort := uint16(svcInfo.NodePort())
svcProto := svcInfo.OFProtocol
var svcIPs []net.IP
svcIPs = append(svcIPs, svcInfo.ClusterIP())
for _, externalIP := range svcInfo.ExternalIPStrings() {
svcIPs = append(svcIPs, net.ParseIP(externalIP))
}
for _, ingressIP := range svcInfo.LoadBalancerIPStrings() {
if ingressIP != "" {
svcIPs = append(svcIPs, net.ParseIP(ingressIP))
}
}
for _, endpoint := range staleEndpoints {
for _, svcIP := range svcIPs {
if err := p.routeClient.ClearConntrackEntryForService(svcIP, svcPort, net.ParseIP(endpoint.IP()), svcProto); err != nil {
klog.ErrorS(err, "Error when removing conntrack of stale Endpoints for Service", "ServicePortName", svcPortName, "ServiceIP", svcIP, "ServicePort", svcPort, "endpoint", endpoint)
return false
}
}
if nodePort > 0 {
if err := p.routeClient.ClearConntrackEntryForService(nil, nodePort, net.ParseIP(endpoint.IP()), svcProto); err != nil {
klog.ErrorS(err, "Error when removing conntrack of stale Endpoints for Service", "ServicePortName", svcPortName, "NodePort", nodePort, "endpoint", endpoint)
return false
}
}
}
return true
}

func (p *proxier) addNewEndpoints(svcPortName k8sproxy.ServicePortName, protocol binding.Protocol, newEndpoints map[string]k8sproxy.Endpoint) bool {
var endpointsToAdd []k8sproxy.Endpoint

Expand Down Expand Up @@ -366,6 +398,19 @@ func smallSliceDifference(s1, s2 []string) []string {
return diff
}

func (p *proxier) uninstallClusterIPService(svcIP net.IP, svcPort uint16, protocol binding.Protocol) error {
if err := p.ofClient.UninstallServiceFlows(svcIP, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove ClusterIP flows: %w", err)
}
if needClearConntrackEntries(protocol) {
// Remove ClusterIP conntrack entries when protocol is UDP.
if err := p.routeClient.ClearConntrackEntryForService(svcIP, svcPort, nil, protocol); err != nil {
return fmt.Errorf("failed to clearn ClusterIP UDP conntrack entries: %w", err)
}
}
return nil
}

func (p *proxier) installNodePortService(externalGroupID, clusterGroupID binding.GroupIDType, svcPort uint16, protocol binding.Protocol, affinityTimeout uint16) error {
if svcPort == 0 {
return nil
Expand Down Expand Up @@ -397,6 +442,12 @@ func (p *proxier) uninstallNodePortService(svcPort uint16, protocol binding.Prot
if err := p.routeClient.DeleteNodePort(p.nodePortAddresses, svcPort, protocol); err != nil {
return fmt.Errorf("failed to remove NodePort traffic redirecting rules: %w", err)
}
if needClearConntrackEntries(protocol) {
// Remove NodePort conntrack entries when protocol is UDP.
if err := p.routeClient.ClearConntrackEntryForService(nil, svcPort, nil, protocol); err != nil {
return fmt.Errorf("failed to clearn NodePort UDP conntrack entries: %w", err)
}
}
return nil
}

Expand All @@ -414,6 +465,7 @@ func (p *proxier) installExternalIPService(svcInfoStr string, externalGroupID, c
}

func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
needClearConntrack := needClearConntrackEntries(protocol)
for _, externalIP := range externalIPStrings {
ip := net.ParseIP(externalIP)
if err := p.ofClient.UninstallServiceFlows(ip, svcPort, protocol); err != nil {
Expand All @@ -422,6 +474,12 @@ func (p *proxier) uninstallExternalIPService(svcInfoStr string, externalIPString
if err := p.deleteRouteForServiceIP(svcInfoStr, ip, p.routeClient.DeleteExternalIPRoute); err != nil {
return fmt.Errorf("failed to remove ExternalIP traffic redirecting routes: %w", err)
}
if needClearConntrack {
// Remove ExternalIP conntrack entries when protocol is UDP.
if err := p.routeClient.ClearConntrackEntryForService(ip, svcPort, nil, protocol); err != nil {
return fmt.Errorf("failed to clearn ExternalIP UDP conntrack entries: %w", err)
}
}
}
return nil
}
Expand Down Expand Up @@ -461,6 +519,7 @@ func (p *proxier) addRouteForServiceIP(svcInfoStr string, ip net.IP, addRouteFn
}

func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIPStrings []string, svcPort uint16, protocol binding.Protocol) error {
needClearConntrack := needClearConntrackEntries(protocol)
for _, ingress := range loadBalancerIPStrings {
if ingress != "" {
ip := net.ParseIP(ingress)
Expand All @@ -472,6 +531,12 @@ func (p *proxier) uninstallLoadBalancerService(svcInfoStr string, loadBalancerIP
return fmt.Errorf("failed to remove LoadBalancer traffic redirecting routes: %w", err)
}
}
if needClearConntrack {
// Remove LoadBalancer conntrack entries when protocol is UDP.
if err := p.routeClient.ClearConntrackEntryForService(ip, svcPort, nil, protocol); err != nil {
return fmt.Errorf("failed to clearn LoadBalancer UDP conntrack entries: %w", err)
}
}
}
}
return nil
Expand Down Expand Up @@ -542,9 +607,14 @@ func (p *proxier) installServices() {
if !p.addNewEndpoints(svcPortName, svcInfo.OFProtocol, newEndpoints) {
continue
}
if !p.removeStaleEndpoints(svcPortName, svcInfo.OFProtocol, staleEndpoints) {
if !p.removeStaleEndpoints(svcPortName, svcInfo, staleEndpoints) {
continue
}
if needClearConntrackEntries(svcInfo.OFProtocol) {
if !p.removeStaleEndpointConntrackEntries(svcPortName, svcInfo, staleEndpoints) {
continue
}
}
}

withSessionAffinity := svcInfo.SessionAffinityType() == corev1.ServiceAffinityClientIP
Expand Down Expand Up @@ -1370,3 +1440,7 @@ func NewProxier(hostname string,

return proxier, nil
}

func needClearConntrackEntries(protocol binding.Protocol) bool {
return protocol == binding.ProtocolUDP || protocol == binding.ProtocolUDPv6
}
Loading

0 comments on commit e441042

Please sign in to comment.