Skip to content

Commit

Permalink
Install Multicast related iptables rules only on IPv4 cluster (#6123) (
Browse files Browse the repository at this point in the history
…#6175)

Add a pre-check on the Multicast feature gate status with IPv6-only cluster
settings in agent Initializer, and install the iptables rules only in the IPv4
related chains.

Signed-off-by: Wenying Dong <wenyingd@vmware.com>
  • Loading branch information
wenyingd committed Apr 2, 2024
1 parent 6bb84c1 commit b9f7b8c
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 21 deletions.
4 changes: 3 additions & 1 deletion cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -789,7 +789,9 @@ func run(o *Options) error {
o.igmpQueryVersions,
validator,
networkConfig.TrafficEncapMode.SupportsEncap(),
informerFactory)
informerFactory,
v4Enabled,
v6Enabled)
if err := mcastController.Initialize(); err != nil {
return err
}
Expand Down
19 changes: 18 additions & 1 deletion pkg/agent/multicast/mcast_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package multicast

import (
"fmt"
"net"
"sync"
"time"
Expand Down Expand Up @@ -267,6 +268,13 @@ type Controller struct {
// installedNodes is the installed Node set that the IGMP report message is sent to.
installedNodes sets.Set[string]
encapEnabled bool
// ipv4Enabled is the flag that if it is running on IPv4 cluster. An error is returned if IPv4Enabled is false
// in Initialize as Multicast does not support IPv6 for now.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv4Enabled bool
// ipv6Enabled is the flag that if it is running on IPv6 cluster.
// TODO: remove this flag after IPv6 is supported in Multicast.
ipv6Enabled bool
}

func NewMulticastController(ofClient openflow.Client,
Expand All @@ -281,7 +289,9 @@ func NewMulticastController(ofClient openflow.Client,
igmpQueryVersions []uint8,
validator types.McastNetworkPolicyController,
isEncap bool,
informerFactory informers.SharedInformerFactory) *Controller {
informerFactory informers.SharedInformerFactory,
ipv4Enabled bool,
ipv6Enabled bool) *Controller {
eventCh := make(chan *mcastGroupEvent, workerCount)
groupSnooper := newSnooper(ofClient, ifaceStore, eventCh, igmpQueryInterval, igmpQueryVersions, validator, isEncap)
groupCache := cache.NewIndexer(getGroupEventKey, cache.Indexers{
Expand All @@ -305,6 +315,8 @@ func NewMulticastController(ofClient openflow.Client,
mcastGroupTimeout: igmpQueryInterval * 3,
queryGroupId: v4GroupAllocator.Allocate(),
encapEnabled: isEncap,
ipv4Enabled: ipv4Enabled,
ipv6Enabled: ipv6Enabled,
}
if isEncap {
c.nodeGroupID = v4GroupAllocator.Allocate()
Expand Down Expand Up @@ -333,6 +345,11 @@ func NewMulticastController(ofClient openflow.Client,
}

func (c *Controller) Initialize() error {
if !c.ipv4Enabled {
return fmt.Errorf("Multicast is not supported on an IPv6-only cluster")
} else if c.ipv6Enabled {
klog.InfoS("Multicast only works with IPv4 traffic on a dual-stack cluster")
}
err := c.mRouteClient.Initialize()
if err != nil {
return err
Expand Down
70 changes: 55 additions & 15 deletions pkg/agent/multicast/mcast_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestAddGroupMemberStatus(t *testing.T) {
iface: if1,
}
mctrl := newMockMulticastController(t, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
}
Expand All @@ -117,7 +117,7 @@ func TestAddGroupMemberStatus(t *testing.T) {

func TestUpdateGroupMemberStatus(t *testing.T) {
mctrl := newMockMulticastController(t, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
assert.NoError(t, err)
mgroup := net.ParseIP("224.96.1.4")
event := &mcastGroupEvent{
Expand Down Expand Up @@ -183,7 +183,7 @@ func TestUpdateGroupMemberStatus(t *testing.T) {

func TestCheckNodeUpdate(t *testing.T) {
mockController := newMockMulticastController(t, false)
err := mockController.initialize(t)
err := mockController.initialize()
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -351,7 +351,7 @@ func TestGetGroupPods(t *testing.T) {
now := time.Now()

mctrl := newMockMulticastController(t, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
groupMemberStatuses := []*GroupMemberStatus{
{
Expand Down Expand Up @@ -387,7 +387,7 @@ func TestGetGroupPods(t *testing.T) {

func TestGetPodStats(t *testing.T) {
mctrl := newMockMulticastController(t, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)

iface := if1
Expand All @@ -404,7 +404,7 @@ func TestGetPodStats(t *testing.T) {

func TestGetAllPodStats(t *testing.T) {
mctrl := newMockMulticastController(t, false)
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)

for _, tc := range []struct {
Expand Down Expand Up @@ -449,7 +449,7 @@ func TestGetAllPodStats(t *testing.T) {
func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) {
mctrl := newMockMulticastController(t, false)
workerCount = 1
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
now := time.Now()
staleTime := now.Add(-mctrl.mcastGroupTimeout - time.Second)
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestClearStaleGroupsCreatingLeaveEvent(t *testing.T) {
func TestClearStaleGroups(t *testing.T) {
mctrl := newMockMulticastController(t, false)
workerCount = 1
err := mctrl.initialize(t)
err := mctrl.initialize()
require.NoError(t, err)
mctrl.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
Expand Down Expand Up @@ -736,13 +736,13 @@ func TestProcessPacketIn(t *testing.T) {
func TestEncapModeInitialize(t *testing.T) {
mockController := newMockMulticastController(t, true)
assert.True(t, mockController.nodeGroupID != 0)
err := mockController.initialize(t)
err := mockController.initialize()
assert.NoError(t, err)
}

func TestEncapLocalReportAndNotifyRemote(t *testing.T) {
mockController := newMockMulticastController(t, true)
_ = mockController.initialize(t)
_ = mockController.initialize()
mockController.mRouteClient.multicastInterfaceConfigs = []multicastInterfaceConfig{
{Name: if1.InterfaceName, IPv4Addr: &net.IPNet{IP: nodeIf1IP, Mask: net.IPv4Mask(255, 255, 255, 0)}},
}
Expand Down Expand Up @@ -944,7 +944,7 @@ func TestNodeUpdate(t *testing.T) {

func TestMemberChanged(t *testing.T) {
mockController := newMockMulticastController(t, false)
_ = mockController.initialize(t)
_ = mockController.initialize()

containerA := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podA", ContainerID: "tttt"}
containerB := &interfacestore.ContainerInterfaceConfig{PodNamespace: "nameA", PodName: "podB", ContainerID: "mmmm"}
Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestConcurrentEventHandlerAndWorkers(t *testing.T) {

func TestRemoteMemberJoinLeave(t *testing.T) {
mockController := newMockMulticastController(t, true)
_ = mockController.initialize(t)
_ = mockController.initialize()
stopCh := make(chan struct{})
defer close(stopCh)

Expand Down Expand Up @@ -1257,11 +1257,52 @@ func newMockMulticastController(t *testing.T, isEncap bool) *Controller {

clientset = fake.NewSimpleClientset()
informerFactory = informers.NewSharedInformerFactory(clientset, 12*time.Hour)
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), ovsClient, podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, informerFactory)
mctrl := NewMulticastController(mockOFClient, groupAllocator, nodeConfig, mockIfaceStore, mockMulticastSocket, sets.New[string](), ovsClient, podUpdateSubscriber, time.Second*5, []uint8{1, 2, 3}, mockMulticastValidator, isEncap, informerFactory, true, false)
return mctrl
}

func (c *Controller) initialize(t *testing.T) error {
func TestMulticastControllerOnIPv6Cluster(t *testing.T) {
for _, tc := range []struct {
name string
ipv4Enabled bool
ipv6Enabled bool
expErr string
}{
{
name: "Fails on IPv6-only cluster",
ipv4Enabled: false,
ipv6Enabled: true,
expErr: "Multicast is not supported on an IPv6-only cluster",
},
{
name: "Succeeds on dual-stack cluster",
ipv4Enabled: true,
ipv6Enabled: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
mockController := newMockMulticastController(t, true)
mockController.ipv4Enabled = tc.ipv4Enabled
mockController.ipv6Enabled = tc.ipv6Enabled
if tc.expErr == "" {
mockController.initMocks()
}
err := mockController.Initialize()
if tc.expErr != "" {
assert.EqualError(t, err, tc.expErr)
} else {
assert.NoError(t, err)
}
})
}
}

func (c *Controller) initialize() error {
c.initMocks()
return c.Initialize()
}

func (c *Controller) initMocks() {
mockOFClient.EXPECT().InstallMulticastGroup(c.queryGroupId, gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallMulticastFlows(gomock.Any(), gomock.Any())
mockIfaceStore.EXPECT().GetInterfacesByType(interfacestore.InterfaceType(0)).Times(1).Return([]*interfacestore.InterfaceConfig{})
Expand All @@ -1271,7 +1312,6 @@ func (c *Controller) initialize(t *testing.T) error {
mockOFClient.EXPECT().InstallMulticastGroup(c.nodeGroupID, gomock.Any(), gomock.Any()).Times(1)
mockOFClient.EXPECT().InstallMulticastRemoteReportFlows(c.nodeGroupID).Times(1)
}
return c.Initialize()
}

func createInterface(name string, ofport uint32) *interfacestore.InterfaceConfig {
Expand Down
11 changes: 8 additions & 3 deletions pkg/agent/route/route_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ type Client struct {
nodePortsIPv6 sync.Map
// clusterNodeIPs stores the IPv4 of all other Nodes in the cluster
clusterNodeIPs sync.Map
// clusterNodeIP6s stores the IPv6 of all other Nodes in the cluster
// clusterNodeIP6s stores the IPv6 address of all other Nodes in the cluster. It is maintained but not consumed
// until Multicast supports IPv6.
clusterNodeIP6s sync.Map
// The latest calculated Service CIDRs can be got from serviceCIDRProvider.
serviceCIDRProvider servicecidr.Interface
Expand Down Expand Up @@ -616,7 +617,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
}...)
}

if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after
// IPv6 is supported.
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsEncap() {
// Drop the multicast packets forwarded from other Nodes in the cluster. This is because
// the packet sent out from the sender Pod is already received via tunnel port with encap mode,
// and the one forwarded via the underlay network is to send to external receivers
Expand Down Expand Up @@ -722,7 +725,9 @@ func (c *Client) restoreIptablesData(podCIDR *net.IPNet,
writeLine(iptablesData, iptables.MakeChainLine(antreaPostRoutingChain))
// The masqueraded multicast traffic will become unicast so we
// stop traversing this antreaPostRoutingChain for multicast traffic.
if c.multicastEnabled && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
// Note: Multicast can only work with IPv4 for now. Remove condition "!isIPv6" in the future after
// IPv6 is supported.
if c.multicastEnabled && !isIPv6 && c.networkConfig.TrafficEncapMode.SupportsNoEncap() {
writeLine(iptablesData, []string{
"-A", antreaPostRoutingChain,
"-m", "comment", "--comment", `"Antrea: skip masquerade for multicast traffic"`,
Expand Down
1 change: 0 additions & 1 deletion pkg/agent/route/route_linux_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,6 @@ COMMIT
:ANTREA-OUTPUT - [0:0]
-A ANTREA-PREROUTING -m comment --comment "Antrea: do not track incoming encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --dst-type LOCAL -j NOTRACK
-A ANTREA-OUTPUT -m comment --comment "Antrea: do not track outgoing encapsulation packets" -m udp -p udp --dport 6081 -m addrtype --src-type LOCAL -j NOTRACK
-A ANTREA-PREROUTING -m comment --comment "Antrea: drop Pod multicast traffic forwarded via underlay network" -m set --match-set CLUSTER-NODE-IP6 src -d 224.0.0.0/4 -j DROP
COMMIT
*mangle
:ANTREA-MANGLE - [0:0]
Expand Down

0 comments on commit b9f7b8c

Please sign in to comment.