Skip to content

Commit

Permalink
Multi-cluster support with more modes
Browse files Browse the repository at this point in the history
In order to support multi-cluster traffic when the member cluster is
deployed with networkPolicyOnly, noEcap or hybrid mode, antrea-agent
will be responsible for the following things:

1. Create tunnel interface `antrea-tun0` for cross-cluster traffic
2. Watch all Pods on the Gateway and set up one rule per Pod in L3Fowarding
table as long as the Pod is running on a regular Node instead of the Gateway.
3. Update container interface's MTU with the tunnel header size deducted.

Signed-off-by: Lan Luo <luola@vmware.com>
  • Loading branch information
luolanzone committed Mar 14, 2023
1 parent c34a47b commit ff78c5b
Show file tree
Hide file tree
Showing 26 changed files with 1,421 additions and 210 deletions.
2 changes: 0 additions & 2 deletions build/yamls/antrea-prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,6 @@ spec:
configMap:
defaultMode: 420
name: prometheus-server-conf

- name: prometheus-storage-volume
emptyDir: {}
---
Expand All @@ -166,7 +165,6 @@ metadata:
annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '9090'

spec:
selector:
app: prometheus-server
Expand Down
58 changes: 40 additions & 18 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ import (
"antrea.io/antrea/pkg/ovs/ovsctl"
"antrea.io/antrea/pkg/signals"
"antrea.io/antrea/pkg/util/channel"
"antrea.io/antrea/pkg/util/env"
"antrea.io/antrea/pkg/util/k8s"
"antrea.io/antrea/pkg/version"
)
Expand Down Expand Up @@ -177,6 +176,7 @@ func run(o *Options) error {
encryptionMode = config.TrafficEncryptionModeIPSec
}
_, ipsecAuthenticationMode := config.GetIPsecAuthenticationModeFromStr(o.config.IPsec.AuthenticationMode)

networkConfig := &config.NetworkConfig{
TunnelType: ovsconfig.TunnelType(o.config.TunnelType),
TunnelPort: o.config.TunnelPort,
Expand All @@ -188,6 +188,7 @@ func run(o *Options) error {
IPsecConfig: config.IPsecConfig{
AuthenticationMode: ipsecAuthenticationMode,
},
EnableMulticlusterGW: enableMulticlusterGW,
}

wireguardConfig := &config.WireGuardConfig{
Expand Down Expand Up @@ -323,31 +324,48 @@ func run(o *Options) error {
)
}

var mcRouteController *mcroute.MCRouteController
var mcDefaultRouteController *mcroute.MCDefaultRouteController
var mcStrechedNetworkPolicyController *mcroute.StretchedNetworkPolicyController
var mcPodRouteController *mcroute.MCPodRouteController
var mcInformerFactory mcinformers.SharedInformerFactory
var mcInformerFactoryWithOption mcinformers.SharedInformerFactory

if enableMulticlusterGW {
mcNamespace := env.GetPodNamespace()
if o.config.Multicluster.Namespace != "" {
mcNamespace = o.config.Multicluster.Namespace
if !networkConfig.IPv4Enabled {
return fmt.Errorf("Antrea Mutli-cluster doesn't not support IPv6 only cluster")
}
mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync)
gwInformer := mcInformerFactory.Multicluster().V1alpha1().Gateways()
ciImportInformer := mcInformerFactory.Multicluster().V1alpha1().ClusterInfoImports()
mcRouteController = mcroute.NewMCRouteController(
mcInformerFactoryWithOption = mcinformers.NewSharedInformerFactoryWithOptions(mcClient,
informerDefaultResync,
mcinformers.WithTweakListOptions(func(lo *metav1.ListOptions) {
lo.FieldSelector = fields.Set{"metadata.namespace": o.config.Multicluster.Namespace}.String()
}),
)
mcDefaultRouteController = mcroute.NewMCDefaultRouteController(
mcClient,
gwInformer,
ciImportInformer,
mcInformerFactoryWithOption,
ofClient,
ovsBridgeClient,
ifaceStore,
nodeConfig,
mcNamespace,
o.config.Multicluster.Namespace,
o.config.Multicluster.EnableStretchedNetworkPolicy,
o.config.Multicluster.EnablePodToPodConnectivity,
)
if networkConfig.TrafficEncapMode != config.TrafficEncapModeEncap {
mcPodRouteController = mcroute.NewMCPodRouteController(
k8sClient,
mcClient,
mcInformerFactoryWithOption.Multicluster().V1alpha1().Gateways().Informer(),
ofClient,
ovsBridgeClient,
ifaceStore,
nodeConfig,
o.config.Multicluster.Namespace,
)
}
}
if enableMulticlusterNP {
mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync)
labelIDInformer := mcInformerFactory.Multicluster().V1alpha1().LabelIdentities()
mcStrechedNetworkPolicyController = mcroute.NewMCAgentStretchedNetworkPolicyController(
ofClient,
Expand Down Expand Up @@ -490,11 +508,9 @@ func run(o *Options) error {
var cniPodInfoStore cnipodcache.CNIPodInfoStore
var externalNodeController *externalnode.ExternalNodeController
var localExternalNodeInformer cache.SharedIndexInformer

if o.nodeType == config.K8sNode {
isChaining := false
if networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
isChaining = true
}
isChaining := networkConfig.TrafficEncapMode.IsNetworkPolicyOnly()
cniServer = cniserver.New(
o.config.CNISocket,
o.config.HostProcPathPrefix,
Expand All @@ -505,6 +521,7 @@ func run(o *Options) error {
enableBridgingMode,
enableAntreaIPAM,
o.config.DisableTXChecksumOffload,
networkConfig,
networkReadyCh)

if features.DefaultFeatureGate.Enabled(features.SecondaryNetwork) {
Expand Down Expand Up @@ -751,10 +768,15 @@ func run(o *Options) error {
}

if enableMulticlusterGW {
mcInformerFactory.Start(stopCh)
go mcRouteController.Run(stopCh)
mcInformerFactoryWithOption.Start(stopCh)
go mcDefaultRouteController.Run(stopCh)
if mcPodRouteController != nil {
go mcPodRouteController.Run(stopCh)
}
}

if enableMulticlusterNP {
mcInformerFactory.Start(stopCh)
go mcStrechedNetworkPolicyController.Run(stopCh)
}

Expand Down
25 changes: 15 additions & 10 deletions cmd/antrea-agent/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,7 +307,7 @@ func (o *Options) validateAntreaIPAMConfig() error {
return nil
}

func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeType) error {
func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeType, encryptionMode config.TrafficEncryptionModeType) error {
if !o.config.Multicluster.EnableGateway && !o.config.Multicluster.EnableStretchedNetworkPolicy {
return nil
}
Expand All @@ -321,9 +321,8 @@ func (o *Options) validateMulticlusterConfig(encapMode config.TrafficEncapModeTy
return fmt.Errorf("Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy")
}

if encapMode != config.TrafficEncapModeEncap {
// Only Encap mode is supported for Multi-cluster Gateway.
return fmt.Errorf("Multicluster is only applicable to the %s mode", config.TrafficEncapModeEncap)
if encapMode.SupportsEncap() && encryptionMode == config.TrafficEncryptionModeWireGuard {
return fmt.Errorf("Multi-cluster Gateway doesn't support in-cluster WireGuard encryption")
}
return nil
}
Expand Down Expand Up @@ -404,11 +403,17 @@ func (o *Options) setK8sNodeDefaultOptions() {
}
}

if features.DefaultFeatureGate.Enabled(features.Multicluster) && o.config.Multicluster.Enable {
// Multicluster.Enable is deprecated but it may be set by an earlier version
// deployment manifest. If it is set to true, pass the value to
// Multicluster.EnableGateway.
o.config.Multicluster.EnableGateway = true
if features.DefaultFeatureGate.Enabled(features.Multicluster) {
if o.config.Multicluster.Enable {
// Multicluster.Enable is deprecated but it may be set by an earlier version
// deployment manifest. If it is set to true, pass the value to
// Multicluster.EnableGateway.
o.config.Multicluster.EnableGateway = true
}

if o.config.Multicluster.EnableGateway && o.config.Multicluster.Namespace == "" {
o.config.Multicluster.Namespace = env.GetPodNamespace()
}
}

if features.DefaultFeatureGate.Enabled(features.Egress) {
Expand Down Expand Up @@ -488,7 +493,7 @@ func (o *Options) validateK8sNodeOptions() error {
}
}
}
if err := o.validateMulticlusterConfig(encapMode); err != nil {
if err := o.validateMulticlusterConfig(encapMode, encryptionMode); err != nil {
return err
}

Expand Down
23 changes: 14 additions & 9 deletions cmd/antrea-agent/options_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,12 @@ import (

func TestMulticlusterOptions(t *testing.T) {
tests := []struct {
name string
mcConfig agentconfig.MulticlusterConfig
featureGate bool
encapMode string
expectedErr string
name string
mcConfig agentconfig.MulticlusterConfig
featureGate bool
encapMode string
encryptionMode string
expectedErr string
}{
{
name: "empty input",
Expand Down Expand Up @@ -80,13 +81,14 @@ func TestMulticlusterOptions(t *testing.T) {
expectedErr: "Multi-cluster Gateway must be enabled to enable StretchedNetworkPolicy",
},
{
name: "NoEncap",
name: "Multicluster with in-cluster WireGuard Encryption",
mcConfig: agentconfig.MulticlusterConfig{
EnableGateway: true,
},
featureGate: true,
encapMode: "NoEncap",
expectedErr: "Multicluster is only applicable to the encap mode",
featureGate: true,
encapMode: "encap",
encryptionMode: "wireguard",
expectedErr: "Multi-cluster Gateway doesn't support in-cluster WireGuard encryption",
},
{
name: "NoEncap and feature disabled",
Expand All @@ -104,6 +106,9 @@ func TestMulticlusterOptions(t *testing.T) {
TrafficEncapMode: tt.encapMode,
Multicluster: tt.mcConfig,
}
if tt.encryptionMode != "" {
config.TrafficEncryptionMode = tt.encryptionMode
}
o := &Options{config: config}
features.DefaultMutableFeatureGate.SetFromMap(o.config.FeatureGates)
o.setDefaults()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,12 @@ func (r *ResourceExportReconciler) refreshEndpointsResourceImport(
}
if len(svcResExport.Status.Conditions) > 0 {
if svcResExport.Status.Conditions[0].Status != corev1.ConditionTrue {
return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() +
"has not been converged successfully, retry later")
err := fmt.Errorf("the Service type of ResourceExport %s has not been converged successfully, retry later", svcResExportName.String())
return newResImport, false, err
}
} else {
return newResImport, false, fmt.Errorf("corresponding Service type of ResourceExport " + svcResExportName.String() +
"has not been converged yet, retry later")
err := fmt.Errorf("the Service type of ResourceExport %s has not been converged yet, retry later", svcResExportName.String())
return newResImport, false, err
}
}

Expand Down
37 changes: 15 additions & 22 deletions pkg/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,12 +716,12 @@ func (i *Initializer) setupGatewayInterface() error {
// Idempotent operation to set the gateway's MTU: we perform this operation regardless of
// whether the gateway interface already exists, as the desired MTU may change across
// restarts.
klog.V(4).Infof("Setting gateway interface %s MTU to %d", i.hostGateway, i.nodeConfig.NodeMTU)
klog.V(4).Infof("Setting gateway interface %s MTU to %d", i.hostGateway, i.networkConfig.InterfaceMTU)

if err := i.configureGatewayInterface(gatewayIface); err != nil {
return err
}
if err := i.setInterfaceMTU(i.hostGateway, i.nodeConfig.NodeMTU); err != nil {
if err := i.setInterfaceMTU(i.hostGateway, i.networkConfig.InterfaceMTU); err != nil {
return err
}

Expand Down Expand Up @@ -819,10 +819,11 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
// It's not necessary for new Linux kernel versions with the following patch:
// https://github.com/torvalds/linux/commit/89e5c58fc1e2857ccdaae506fb8bc5fed57ee063.
shouldEnableCsum := i.networkConfig.TunnelCsum && (i.networkConfig.TunnelType == ovsconfig.GeneveTunnel || i.networkConfig.TunnelType == ovsconfig.VXLANTunnel)
createTunnelInterface := i.networkConfig.NeedsTunnelInterface()

// Check the default tunnel port.
if portExists {
if i.networkConfig.TrafficEncapMode.SupportsEncap() &&
if createTunnelInterface &&
tunnelIface.TunnelInterfaceConfig.Type == i.networkConfig.TunnelType &&
tunnelIface.TunnelInterfaceConfig.DestinationPort == i.networkConfig.TunnelPort &&
tunnelIface.TunnelInterfaceConfig.LocalIP.Equal(localIP) {
Expand All @@ -839,7 +840,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
}

if err := i.ovsBridgeClient.DeletePort(tunnelIface.PortUUID); err != nil {
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if createTunnelInterface {
return fmt.Errorf("failed to remove tunnel port %s with wrong tunnel type: %s", tunnelPortName, err)
}
klog.Errorf("Failed to remove tunnel port %s in NoEncapMode: %v", tunnelPortName, err)
Expand All @@ -850,7 +851,7 @@ func (i *Initializer) setupDefaultTunnelInterface() error {
}

// Create the default tunnel port and interface.
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if createTunnelInterface {
if tunnelPortName != defaultTunInterfaceName {
// Reset the tunnel interface name to the desired name before
// recreating the tunnel port and interface.
Expand Down Expand Up @@ -1012,12 +1013,11 @@ func (i *Initializer) initK8sNodeLocalConfig(nodeName string) error {
WireGuardConfig: i.wireGuardConfig,
}

mtu, err := i.getNodeMTU(transportInterface)
i.networkConfig.InterfaceMTU, err = i.getInterfaceMTU(transportInterface)
if err != nil {
return err
}
i.nodeConfig.NodeMTU = mtu
klog.InfoS("Setting Node MTU", "MTU", mtu)
klog.InfoS("Getting Interface MTU", "MTU", i.networkConfig.InterfaceMTU)

if i.networkConfig.TrafficEncapMode.IsNetworkPolicyOnly() {
return nil
Expand Down Expand Up @@ -1164,27 +1164,20 @@ func getRoundInfo(bridgeClient ovsconfig.OVSBridgeClient) types.RoundInfo {
return roundInfo
}

func (i *Initializer) getNodeMTU(transportInterface *net.Interface) (int, error) {
func (i *Initializer) getInterfaceMTU(transportInterface *net.Interface) (int, error) {
if i.mtu != 0 {
return i.mtu, nil
}
mtu := transportInterface.MTU
// Make sure mtu is set on the interface.
// Make sure MTU is set on the interface.
if mtu <= 0 {
return 0, fmt.Errorf("Failed to fetch Node MTU : %v", mtu)
}
if i.networkConfig.TrafficEncapMode.SupportsEncap() {
if i.networkConfig.TunnelType == ovsconfig.VXLANTunnel {
mtu -= config.VXLANOverhead
} else if i.networkConfig.TunnelType == ovsconfig.GeneveTunnel {
mtu -= config.GeneveOverhead
} else if i.networkConfig.TunnelType == ovsconfig.GRETunnel {
mtu -= config.GREOverhead
}
if i.nodeConfig.NodeIPv6Addr != nil {
mtu -= config.IPv6ExtraOverhead
}
}

isIPv6 := i.nodeConfig.NodeIPv6Addr != nil
i.networkConfig.CalculateMTUDeduction(isIPv6)
mtu -= i.networkConfig.MTUDeduction

if i.networkConfig.TrafficEncryptionMode == config.TrafficEncryptionModeIPSec {
mtu -= config.IPSecESPOverhead
}
Expand Down
5 changes: 2 additions & 3 deletions pkg/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,6 @@ func TestInitNodeLocalConfig(t *testing.T) {
NodeTransportInterfaceName: ipDevice.Name,
NodeTransportIPv4Addr: nodeIPNet,
NodeTransportInterfaceMTU: tt.expectedNodeLocalIfaceMTU,
NodeMTU: tt.expectedMTU,
UplinkNetConfig: new(config.AdapterNetConfig),
}

Expand Down Expand Up @@ -596,12 +595,12 @@ func TestSetupGatewayInterface(t *testing.T) {
Type: config.K8sNode,
OVSBridge: "br-int",
PodIPv4CIDR: podCIDR,
NodeMTU: 1450,
}
networkConfig := &config.NetworkConfig{
TrafficEncapMode: config.TrafficEncapModeEncap,
TunnelType: ovsconfig.GeneveTunnel,
TunnelCsum: false,
InterfaceMTU: 1450,
}

mockOVSBridgeClient := ovsconfigtest.NewMockOVSBridgeClient(controller)
Expand All @@ -624,7 +623,7 @@ func TestSetupGatewayInterface(t *testing.T) {
mockOVSBridgeClient.EXPECT().CreateInternalPort(initializer.hostGateway, ofport, mock.Any(), mock.Any()).Return(portUUID, nil)
mockOVSBridgeClient.EXPECT().SetInterfaceMAC(initializer.hostGateway, fakeMAC).Return(nil)
mockOVSBridgeClient.EXPECT().GetOFPort(initializer.hostGateway, false).Return(ofport, nil)
mockOVSBridgeClient.EXPECT().SetInterfaceMTU(initializer.hostGateway, nodeConfig.NodeMTU).Return(nil)
mockOVSBridgeClient.EXPECT().SetInterfaceMTU(initializer.hostGateway, networkConfig.InterfaceMTU).Return(nil)
err := initializer.setupGatewayInterface()
assert.NoError(t, err)
}
Expand Down
Loading

0 comments on commit ff78c5b

Please sign in to comment.