Skip to content

Commit

Permalink
[Bugfix] Install Multicast related iptables rules only on IPv4 cluster
Browse files Browse the repository at this point in the history
Add a pre-check on the Multicast feature gate status with IPv6-only cluster
settings in agent Initializer, and install the iptables rules only in the IPv4
related chains.

Signed-off-by: Wenying Dong <wenyingd@vmware.com>
  • Loading branch information
wenyingd committed Mar 22, 2024
1 parent df82b76 commit 7af7f6d
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 65 deletions.
3 changes: 2 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,7 +818,8 @@ func run(o *Options) error {
validator,
networkConfig.TrafficEncapMode.SupportsEncap(),
nodeInformer,
enableBridgingMode)
enableBridgingMode,
v4Enabled)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
12 changes: 11 additions & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package multicast

import (
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -265,6 +266,10 @@ type Controller struct {
installedNodes sets.Set[string]
encapEnabled bool
flexibleIPAMEnabled bool
// ipv4Enabled is the flag that if it is running on IPv4 cluster. An error is returned if IPv4Enabled is false
// in Initialize as Multicast does not support IPv6 for now.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv4Enabled bool
}

func NewMulticastController(ofClient openflow.Client,
Expand All @@ -279,7 +284,8 @@ func NewMulticastController(ofClient openflow.Client,
validator types.McastNetworkPolicyController,
isEncap bool,
nodeInformer coreinformers.NodeInformer,
enableFlexibleIPAM bool) *Controller {
enableFlexibleIPAM bool,
ipv4Enabled bool) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand All @@ -303,6 +309,7 @@ func NewMulticastController(ofClient openflow.Client,
queryGroupId: v4GroupAllocator.Allocate(),
encapEnabled: isEncap,
flexibleIPAMEnabled: enableFlexibleIPAM,
ipv4Enabled: ipv4Enabled,
}
if isEncap {
c.nodeGroupID = v4GroupAllocator.Allocate()
Expand Down Expand Up @@ -331,6 +338,9 @@ func NewMulticastController(ofClient openflow.Client,
}

func (c *Controller) Initialize() error {
if !c.ipv4Enabled {
return fmt.Errorf("multicast is not supported on the IPv6-only cluster")
}
err := c.mRouteClient.Initialize()
if err != nil {
return err
Expand Down
9 changes: 8 additions & 1 deletion pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,7 +1255,7 @@ func newMockMulticastController(t *testing.T, isEncap bool, enableFlexibleIPAM b
clientset = fake.NewSimpleClientset()
informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour)
nodeInformer := informerFactory.Core().V1().Nodes()
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM)
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, nodeInformer, enableFlexibleIPAM, true)
return mctrl
}

Expand All @@ -1265,6 +1265,13 @@ func TestFlexibleIPAMModeInitialize(t *testing.T) {
assert.NoError(t, err)
}

func TestMulticastControllerOnIPv6Cluster(t *testing.T) {
mockController := newMockMulticastController(t, true, false)
mockController.ipv4Enabled = false
err := mockController.Initialize()
assert.Equal(t, err, fmt.Errorf("multicast is not supported on the IPv6-only cluster"))
}

func (c *Controller) initialize(t *testing.T) error {
mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any())
Expand Down
53 changes: 14 additions & 39 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,6 @@ type Client struct {
nodePortsIPv6 sync.Map
// clusterNodeIPs stores the IPv4 of all other Nodes in the cluster
clusterNodeIPs sync.Map
// clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster
clusterNodeIP6s sync.Map
// egressRoutes caches ip routes about Egresses.
egressRoutes sync.Map
// The latest calculated Service CIDRs can be got from serviceCIDRProvider.
Expand Down Expand Up @@ -418,26 +416,18 @@ func (c *Client) syncIPSet() error {
}

if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() {
if err := c.ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil {
return err
}
if err := c.ipset.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true); err != nil {
return err
}
c.clusterNodeIPs.Range(func(_, v interface{}) bool {
ipsetEntry := v.(string)
if err := c.ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil {
return false
}
return true
})
c.clusterNodeIP6s.Range(func(_, v interface{}) bool {
ipSetEntry := v.(string)
if err := c.ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil {
return false
if c.networkConfig.IPv4Enabled {
if err := c.ipset.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false); err != nil {
return err
}
return true
})
c.clusterNodeIPs.Range(func(_, v interface{}) bool {
ipsetEntry := v.(string)
if err := c.ipset.AddEntry(clusterNodeIPSet, ipsetEntry); err != nil {
return false
}
return true
})
}
}

if c.nodeNetworkPolicyEnabled {
Expand Down Expand Up @@ -709,7 +699,7 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
}...)
}

if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() {
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Drop the multicast packets forwarded from other Nodes in the cluster. This is because
// the packet sent out from the sender Pod is already received via tunnel port with encap mode,
// and the one forwarded via the underlay network is to send to external receivers
Expand Down Expand Up @@ -832,7 +822,7 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain))
// The masqueraded multicast traffic will become unicast so we
// stop traversing this antreaPostRoutingChain for multicast traffic.
if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
writeLine(iptablesData, []string{
"-A", antreaPostRoutingChain,
"-m", "comment", "--comment", `"Antrea: skip masquerade for multicast traffic"`,
Expand Down Expand Up @@ -1837,17 +1827,12 @@ func (c *Client) addNodeIP(podCIDR *net.IPNet, nodeIP net.IP) error {
if nodeIP == nil {
return nil
}
ipSetEntry := nodeIP.String()
if nodeIP.To4() != nil {
ipSetEntry := nodeIP.String()
if err := c.ipset.AddEntry(clusterNodeIPSet, ipSetEntry); err != nil {
return err
}
c.clusterNodeIPs.Store(podCIDR.String(), ipSetEntry)
} else {
if err := c.ipset.AddEntry(clusterNodeIP6Set, ipSetEntry); err != nil {
return err
}
c.clusterNodeIP6s.Store(podCIDR.String(), ipSetEntry)
}
return nil
}
Expand All @@ -1870,16 +1855,6 @@ func (c *Client) deleteNodeIP(podCIDR *net.IPNet) error {
return err
}
c.clusterNodeIPs.Delete(podCIDRStr)
} else {
obj, exists := c.clusterNodeIP6s.Load(podCIDRStr)
if !exists {
return nil
}
ipSetEntry := obj.(string)
if err := c.ipset.DelEntry(clusterNodeIP6Set, ipSetEntry); err != nil {
return err
}
c.clusterNodeIP6s.Delete(podCIDRStr)
}
return nil
}
Expand Down
30 changes: 7 additions & 23 deletions pkg/agent/route/route_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ func TestSyncIPSet(t *testing.T) {
nodePortsIPv4 []string
nodePortsIPv6 []string
clusterNodeIPs map[string]string
clusterNodeIP6s map[string]string
nodeNetworkPolicyIPSetsIPv4 map[string]sets.Set[string]
nodeNetworkPolicyIPSetsIPv6 map[string]sets.Set[string]
expectedCalls func(ipset *ipsettest.MockInterfaceMockRecorder)
Expand Down Expand Up @@ -205,7 +204,6 @@ func TestSyncIPSet(t *testing.T) {
nodePortsIPv4: []string{"192.168.0.2,tcp:10000", "127.0.0.1,tcp:10000"},
nodePortsIPv6: []string{"fe80::e643:4bff:fe44:ee,tcp:10000", "::1,tcp:10000"},
clusterNodeIPs: map[string]string{"172.16.3.0/24": "192.168.0.3", "172.16.4.0/24": "192.168.0.4"},
clusterNodeIP6s: map[string]string{"2001:ab03:cd04:5503::/64": "fe80::e643:4bff:fe03", "2001:ab03:cd04:5504::/64": "fe80::e643:4bff:fe04"},
nodeNetworkPolicyIPSetsIPv4: map[string]sets.Set[string]{"ANTREA-POL-RULE1-4": sets.New[string]("1.1.1.1/32", "2.2.2.2/32")},
nodeNetworkPolicyIPSetsIPv6: map[string]sets.Set[string]{"ANTREA-POL-RULE1-6": sets.New[string]("fec0::1111/128", "fec0::2222/128")},
expectedCalls: func(mockIPSet *ipsettest.MockInterfaceMockRecorder) {
Expand All @@ -220,11 +218,8 @@ func TestSyncIPSet(t *testing.T) {
mockIPSet.AddEntry(antreaNodePortIP6Set, "fe80::e643:4bff:fe44:ee,tcp:10000")
mockIPSet.AddEntry(antreaNodePortIP6Set, "::1,tcp:10000")
mockIPSet.CreateIPSet(clusterNodeIPSet, ipset.HashIP, false)
mockIPSet.CreateIPSet(clusterNodeIP6Set, ipset.HashIP, true)
mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.3")
mockIPSet.AddEntry(clusterNodeIPSet, "192.168.0.4")
mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe03")
mockIPSet.AddEntry(clusterNodeIP6Set, "fe80::e643:4bff:fe04")
mockIPSet.CreateIPSet("ANTREA-POL-RULE1-4", ipset.HashNet, false)
mockIPSet.CreateIPSet("ANTREA-POL-RULE1-6", ipset.HashNet, true)
mockIPSet.AddEntry("ANTREA-POL-RULE1-4", "1.1.1.1/32")
Expand Down Expand Up @@ -269,7 +264,6 @@ func TestSyncIPSet(t *testing.T) {
nodePortsIPv4: sync.Map{},
nodePortsIPv6: sync.Map{},
clusterNodeIPs: sync.Map{},
clusterNodeIP6s: sync.Map{},
}
for _, nodePortIPv4 := range tt.nodePortsIPv4 {
c.nodePortsIPv4.Store(nodePortIPv4, struct{}{})
Expand All @@ -280,9 +274,6 @@ func TestSyncIPSet(t *testing.T) {
for cidr, nodeIP := range tt.clusterNodeIPs {
c.clusterNodeIPs.Store(cidr, nodeIP)
}
for cidr, nodeIP := range tt.clusterNodeIP6s {
c.clusterNodeIP6s.Store(cidr, nodeIP)
}
for set, ips := range tt.nodeNetworkPolicyIPSetsIPv4 {
c.nodeNetworkPolicyIPSetsIPv4.Store(set, ips)
}
Expand Down Expand Up @@ -402,7 +393,6 @@ COMMIT
:ANTREA-OUTPUT - [0:0]
-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK
-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK
-A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP6 src -d 224.0.0.0/4 -j DROP
COMMIT
*mangle
:ANTREA-MANGLE - [0:0]
Expand Down Expand Up @@ -1790,10 +1780,6 @@ func TestAddAndDeleteNodeIP(t *testing.T) {
networkConfig: &config.NetworkConfig{TrafficEncapMode: config.TrafficEncapModeEncap},
podCIDR: ip.MustParseCIDR("1122:3344::/80"),
nodeIP: net.ParseIP("aabb:ccdd::1"),
expectedCalls: func(mockIPSet *ipsettest.MockInterfaceMockRecorder) {
mockIPSet.AddEntry(clusterNodeIP6Set, "aabb:ccdd::1")
mockIPSet.DelEntry(clusterNodeIP6Set, "aabb:ccdd::1")
},
},
}
for _, tt := range tests {
Expand All @@ -1805,25 +1791,23 @@ func TestAddAndDeleteNodeIP(t *testing.T) {
networkConfig: tt.networkConfig,
multicastEnabled: tt.multicastEnabled,
}
tt.expectedCalls(mockIPSet.EXPECT())
if tt.expectedCalls != nil {
tt.expectedCalls(mockIPSet.EXPECT())
}

ipv6 := tt.nodeIP.To4() == nil
assert.NoError(t, c.addNodeIP(tt.podCIDR, tt.nodeIP))
var exists bool
if ipv6 {
_, exists = c.clusterNodeIP6s.Load(tt.podCIDR.String())
} else {
if !ipv6 {
_, exists = c.clusterNodeIPs.Load(tt.podCIDR.String())
assert.True(t, exists)
}
assert.True(t, exists)

assert.NoError(t, c.deleteNodeIP(tt.podCIDR))
if ipv6 {
_, exists = c.clusterNodeIP6s.Load(tt.podCIDR.String())
} else {
if !ipv6 {
_, exists = c.clusterNodeIPs.Load(tt.podCIDR.String())
assert.False(t, exists)
}
assert.False(t, exists)
})
}
}
Expand Down

0 comments on commit 7af7f6d

Please sign in to comment.