Skip to content

Commit

Permalink
Add Service CIDR provider to AntreaAgent to discover Service CIDR (#4570
Browse files Browse the repository at this point in the history
)

Signed-off-by: Hongliang Liu <lhongliang@vmware.com>
  • Loading branch information
hongliangl committed Feb 17, 2023
1 parent cf90cfa commit be49b4a
Show file tree
Hide file tree
Showing 10 changed files with 488 additions and 94 deletions.
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import (
"antrea.io/antrea/pkg/agent/route"
"antrea.io/antrea/pkg/agent/secondarynetwork/cnipodcache"
"antrea.io/antrea/pkg/agent/secondarynetwork/podwatch"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/stats"
support "antrea.io/antrea/pkg/agent/supportbundlecollection"
agenttypes "antrea.io/antrea/pkg/agent/types"
Expand Down Expand Up @@ -167,6 +168,7 @@ func run(o *Options) error {
if o.config.ServiceCIDRv6 != "" {
_, serviceCIDRNetv6, _ = net.ParseCIDR(o.config.ServiceCIDRv6)
}
serviceCIDRProvider := servicecidr.NewServiceCIDRDiscoverer(serviceInformer)

_, encapMode := config.GetTrafficEncapModeFromStr(o.config.TrafficEncapMode)
_, encryptionMode := config.GetTrafficEncryptionModeFromStr(o.config.TrafficEncryptionMode)
Expand Down Expand Up @@ -199,7 +201,7 @@ func run(o *Options) error {
egressConfig := &config.EgressConfig{
ExceptCIDRs: exceptCIDRs,
}
routeClient, err := route.NewClient(networkConfig, o.config.NoSNAT, o.config.AntreaProxy.ProxyAll, connectUplinkToBridge, multicastEnabled)
routeClient, err := route.NewClient(networkConfig, o.config.NoSNAT, o.config.AntreaProxy.ProxyAll, connectUplinkToBridge, multicastEnabled, serviceCIDRProvider)
if err != nil {
return fmt.Errorf("error creating route client: %v", err)
}
Expand Down
13 changes: 7 additions & 6 deletions hack/update-codegen-dockerized.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ MOCKGEN_TARGETS=(
"pkg/agent/ipassigner IPAssigner testing"
"pkg/agent/secondarynetwork/podwatch InterfaceConfigurator testing"
"pkg/agent/secondarynetwork/ipam IPAMDelegator testing"
"pkg/agent/servicecidr Interface testing"
"pkg/agent/util/ipset Interface testing"
"pkg/agent/util/iptables Interface testing mock_iptables_linux.go" # Must specify linux.go suffix, otherwise compilation would fail on windows platform as source file has linux build tag.
"pkg/agent/util/netlink Interface testing mock_netlink_linux.go"
Expand Down Expand Up @@ -80,7 +81,7 @@ fi
function generate_antrea_client_code {
# Generate protobuf code for CNI gRPC service with protoc.
protoc --go_out=plugins=grpc:. pkg/apis/cni/v1beta1/cni.proto

# Generate clientset and apis code with K8s codegen tools.
$GOPATH/bin/client-gen \
--clientset-name versioned \
Expand All @@ -98,7 +99,7 @@ function generate_antrea_client_code {
--plural-exceptions "AntreaClusterNetworkPolicyStats:AntreaClusterNetworkPolicyStats" \
--plural-exceptions "ClusterGroupMembers:ClusterGroupMembers" \
--go-header-file hack/boilerplate/license_header.go.txt

# Generate listers with K8s codegen tools.
$GOPATH/bin/lister-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1alpha1" \
Expand All @@ -107,7 +108,7 @@ function generate_antrea_client_code {
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1beta1" \
--output-package "${ANTREA_PKG}/pkg/client/listers" \
--go-header-file hack/boilerplate/license_header.go.txt

# Generate informers with K8s codegen tools.
$GOPATH/bin/informer-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/crd/v1alpha1" \
Expand All @@ -118,7 +119,7 @@ function generate_antrea_client_code {
--listers-package "${ANTREA_PKG}/pkg/client/listers" \
--output-package "${ANTREA_PKG}/pkg/client/informers" \
--go-header-file hack/boilerplate/license_header.go.txt

$GOPATH/bin/deepcopy-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane" \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane/v1beta2" \
Expand All @@ -131,13 +132,13 @@ function generate_antrea_client_code {
--input-dirs "${ANTREA_PKG}/pkg/apis/stats/v1alpha1" \
-O zz_generated.deepcopy \
--go-header-file hack/boilerplate/license_header.go.txt

$GOPATH/bin/conversion-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane/v1beta2,${ANTREA_PKG}/pkg/apis/controlplane/" \
--input-dirs "${ANTREA_PKG}/pkg/apis/stats/v1alpha1,${ANTREA_PKG}/pkg/apis/stats/" \
-O zz_generated.conversion \
--go-header-file hack/boilerplate/license_header.go.txt

$GOPATH/bin/openapi-gen \
--input-dirs "${ANTREA_PKG}/pkg/apis/controlplane/v1beta2" \
--input-dirs "${ANTREA_PKG}/pkg/apis/system/v1beta1" \
Expand Down
113 changes: 53 additions & 60 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ import (
utilnet "k8s.io/utils/net"

"antrea.io/antrea/pkg/agent/config"
"antrea.io/antrea/pkg/agent/servicecidr"
"antrea.io/antrea/pkg/agent/types"
"antrea.io/antrea/pkg/agent/util"
"antrea.io/antrea/pkg/agent/util/ipset"
"antrea.io/antrea/pkg/agent/util/iptables"
utilnetlink "antrea.io/antrea/pkg/agent/util/netlink"
Expand Down Expand Up @@ -73,9 +73,6 @@ const (
antreaPostRoutingChain = "ANTREA-POSTROUTING"
antreaOutputChain = "ANTREA-OUTPUT"
antreaMangleChain = "ANTREA-MANGLE"

ipv4AddrLength = 32
ipv6AddrLength = 128
)

// Client implements Interface.
Expand Down Expand Up @@ -118,18 +115,20 @@ type Client struct {
nodePortsIPv4 sync.Map
// nodePortsIPv6 caches all existing IPv6 NodePorts.
nodePortsIPv6 sync.Map
// clusterIPv4CIDR stores the calculated ClusterIP CIDR for IPv4.
clusterIPv4CIDR *net.IPNet
// clusterIPv6CIDR stores the calculated ClusterIP CIDR for IPv6.
clusterIPv6CIDR *net.IPNet
// serviceIPv4CIDR stores the latest Service IPv4 CIDR from serviceCIDRProvider.
serviceIPv4CIDR *net.IPNet
// serviceIPv6CIDR stores the latest Service IPv6 CIDR from serviceCIDRProvider.
serviceIPv6CIDR *net.IPNet
// clusterNodeIPs stores the IPv4 of all other Nodes in the cluster
clusterNodeIPs sync.Map
// clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster
clusterNodeIP6s sync.Map
// The latest calculated Service CIDRs can be got from serviceCIDRProvider.
serviceCIDRProvider servicecidr.Interface
}

// NewClient returns a route client.
func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUplinkToBridge, multicastEnabled bool) (*Client, error) {
func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUplinkToBridge, multicastEnabled bool, serviceCIDRProvider servicecidr.Interface) (*Client, error) {
return &Client{
networkConfig: networkConfig,
noSNAT: noSNAT,
Expand All @@ -139,6 +138,7 @@ func NewClient(networkConfig *config.NetworkConfig, noSNAT, proxyAll, connectUpl
ipset: ipset.NewClient(),
netlink: &netlink.Handle{},
isCloudEKS: env.IsCloudEKS(),
serviceCIDRProvider: serviceCIDRProvider,
}, nil
}

Expand Down Expand Up @@ -838,6 +838,13 @@ func (c *Client) initServiceIPRoutes() error {
return err
}
}
c.serviceCIDRProvider.AddEventHandler(func(serviceCIDRs []*net.IPNet) {
for _, serviceCIDR := range serviceCIDRs {
if err := c.addServiceCIDRRoute(serviceCIDR); err != nil {
klog.ErrorS(err, "Failed to install route for Service CIDR", "ServiceCIDR", serviceCIDR)
}
}
})
return nil
}

Expand Down Expand Up @@ -1259,10 +1266,10 @@ func (c *Client) DeleteSNATRule(mark uint32) error {
func (c *Client) addVirtualServiceIPRoute(isIPv6 bool) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
svcIP := config.VirtualServiceIPv4
mask := ipv4AddrLength
mask := net.IPv4len * 8
if isIPv6 {
svcIP = config.VirtualServiceIPv6
mask = ipv6AddrLength
mask = net.IPv6len * 8
}

neigh := generateNeigh(svcIP, linkIndex)
Expand Down Expand Up @@ -1324,59 +1331,40 @@ func (c *Client) DeleteNodePort(nodePortAddresses []net.IP, port uint16, protoco
return nil
}

// AddClusterIPRoute is used to add or update a routing entry which is used to route ClusterIP traffic to Antrea gateway.
// TODO: remove it after unifying Windows and Linux functions.
func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
isIPv6 := utilnet.IsIPv6(svcIP)
return nil
}

func (c *Client) addServiceCIDRRoute(serviceCIDR *net.IPNet) error {
isIPv6 := utilnet.IsIPv6(serviceCIDR.IP)
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
scope := netlink.SCOPE_UNIVERSE
curClusterIPCIDR := c.clusterIPv4CIDR
mask := ipv4AddrLength
curServiceCIDR := c.serviceIPv4CIDR
gw := config.VirtualServiceIPv4
if isIPv6 {
curClusterIPCIDR = c.clusterIPv6CIDR
mask = ipv6AddrLength
curServiceCIDR = c.serviceIPv6CIDR
gw = config.VirtualServiceIPv6
}

// If the route exists and its destination CIDR contains the ClusterIP, there is no need to update the route.
if curClusterIPCIDR != nil && curClusterIPCIDR.Contains(svcIP) {
klog.V(4).InfoS("Route with current ClusterIP CIDR can route the ClusterIP to Antrea gateway", "ClusterIP CIDR", curClusterIPCIDR, "ClusterIP", svcIP)
return nil
}

var newClusterIPCIDR *net.IPNet
var err error
if curClusterIPCIDR != nil {
// If the route exists and its destination CIDR doesn't contain the ClusterIP, generate a new destination CIDR by
// enlarging the current destination CIDR with the ClusterIP.
if newClusterIPCIDR, err = util.ExtendCIDRWithIP(curClusterIPCIDR, svcIP); err != nil {
return fmt.Errorf("enlarge the destination CIDR with an error: %w", err)
}
} else {
// If the route doesn't exist, generate a new destination CIDR with the ClusterIP. Note that, this is the first
// ClusterIP since the route doesn't exist.
newClusterIPCIDR = &net.IPNet{IP: svcIP, Mask: net.CIDRMask(mask, mask)}
}

// Generate a route with the new destination CIDR and install it.
newClusterIPCIDRMask, _ := newClusterIPCIDR.Mask.Size()
route := generateRoute(newClusterIPCIDR.IP, newClusterIPCIDRMask, gw, linkIndex, scope)
if err = c.netlink.RouteReplace(route); err != nil {
return fmt.Errorf("failed to install new ClusterIP route: %w", err)
// Generate a route with the new Service CIDR and install it.
serviceCIDRMask, _ := serviceCIDR.Mask.Size()
route := generateRoute(serviceCIDR.IP, serviceCIDRMask, gw, linkIndex, scope)
if err := c.netlink.RouteReplace(route); err != nil {
return fmt.Errorf("failed to install a new Service CIDR route: %w", err)
}
// Store the new destination CIDR.
// Store the new Service CIDR.
if isIPv6 {
c.clusterIPv6CIDR = route.Dst
c.serviceIPv6CIDR = serviceCIDR
} else {
c.clusterIPv4CIDR = route.Dst
c.serviceIPv4CIDR = serviceCIDR
}
klog.V(4).InfoS("Created a route to route the ClusterIP to Antrea gateway", "route", route, "ClusterIP", svcIP)

// Collect stale routes.
var staleRoutes []*netlink.Route
if curClusterIPCIDR != nil {
if curServiceCIDR != nil {
// If current destination CIDR is not nil, the route with current destination CIDR should be uninstalled.
route.Dst = curClusterIPCIDR
route.Dst = curServiceCIDR
staleRoutes = []*netlink.Route{route}
} else {
// If current destination CIDR is nil, which means that Antrea Agent has just started, then all existing routes
Expand All @@ -1388,18 +1376,23 @@ func (c *Client) AddClusterIPRoute(svcIP net.IP) error {
return fmt.Errorf("error listing ip routes: %w", err)
}
for i := 0; i < len(routes); i++ {
if routes[i].Gw.Equal(gw) && !routes[i].Dst.IP.Equal(svcIP) && routes[i].Dst.Contains(svcIP) {
if routes[i].Gw.Equal(gw) && !routes[i].Dst.IP.Equal(serviceCIDR.IP) && routes[i].Dst.Contains(serviceCIDR.IP) {
staleRoutes = append(staleRoutes, &routes[i])
}
}
}

// Remove stale routes.
for _, rt := range staleRoutes {
if err = c.netlink.RouteDel(rt); err != nil {
return fmt.Errorf("failed to uninstall stale ClusterIP route %s: %w", rt.String(), err)
if err := c.netlink.RouteDel(rt); err != nil {
if err.Error() == "no such process" {
klog.InfoS("Failed to delete stale Service CIDR route since the route has been deleted", "route", rt)
} else {
return fmt.Errorf("failed to uninstall stale Service CIDR route %s: %w", rt.String(), err)
}
} else {
klog.V(4).InfoS("Uninstalled stale Service CIDR route successfully", "route", rt)
}
klog.V(4).InfoS("Uninstalled stale ClusterIP route successfully", "stale route", rt)
}

return nil
Expand All @@ -1409,11 +1402,11 @@ func (c *Client) addVirtualNodePortDNATIPRoute(isIPv6 bool) error {
linkIndex := c.nodeConfig.GatewayConfig.LinkIndex
vIP := config.VirtualNodePortDNATIPv4
gw := config.VirtualServiceIPv4
mask := ipv4AddrLength
mask := net.IPv4len * 8
if isIPv6 {
vIP = config.VirtualNodePortDNATIPv6
gw = config.VirtualServiceIPv6
mask = ipv6AddrLength
mask = net.IPv6len * 8
}
route := generateRoute(vIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
if err := c.netlink.RouteReplace(route); err != nil {
Expand All @@ -1435,10 +1428,10 @@ func (c *Client) addLoadBalancerIngressIPRoute(svcIPStr string) error {
var mask int
if !isIPv6 {
gw = config.VirtualServiceIPv4
mask = ipv4AddrLength
mask = net.IPv4len * 8
} else {
gw = config.VirtualServiceIPv6
mask = ipv6AddrLength
mask = net.IPv6len * 8
}

route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
Expand All @@ -1461,10 +1454,10 @@ func (c *Client) deleteLoadBalancerIngressIPRoute(svcIPStr string) error {
var mask int
if !isIPv6 {
gw = config.VirtualServiceIPv4
mask = ipv4AddrLength
mask = net.IPv4len * 8
} else {
gw = config.VirtualServiceIPv6
mask = ipv6AddrLength
mask = net.IPv6len * 8
}

route := generateRoute(svcIP, mask, gw, linkIndex, netlink.SCOPE_UNIVERSE)
Expand Down Expand Up @@ -1621,9 +1614,9 @@ func isIPv6Protocol(protocol binding.Protocol) bool {
}

func generateRoute(ip net.IP, mask int, gw net.IP, linkIndex int, scope netlink.Scope) *netlink.Route {
addrBits := ipv4AddrLength
addrBits := net.IPv4len * 8
if ip.To4() == nil {
addrBits = ipv6AddrLength
addrBits = net.IPv6len * 8
}

route := &netlink.Route{
Expand Down
Loading

0 comments on commit be49b4a

Please sign in to comment.