diff --git a/pkg/agent/controller/networkpolicy/audit_logging_test.go b/pkg/agent/controller/networkpolicy/audit_logging_test.go index e2c33d8270b..67af5bd92c8 100644 --- a/pkg/agent/controller/networkpolicy/audit_logging_test.go +++ b/pkg/agent/controller/networkpolicy/audit_logging_test.go @@ -261,7 +261,7 @@ func TestGetNetworkPolicyInfo(t *testing.T) { testPriority, testRule := "61800", "test-rule" allowDispositionData := []byte{0x11, 0x00, 0x00, 0x11} dropDispositionData := []byte{0x11, 0x00, 0x08, 0x11} - redirectDispositionData := []byte{0x11, 0x08, 0x00, 0x11} + redirectDispositionData := []byte{0x11, 0x10, 0x00, 0x11} ingressData := []byte{0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11, 0x11} tests := []struct { name string diff --git a/pkg/agent/controller/networkpolicy/fqdn.go b/pkg/agent/controller/networkpolicy/fqdn.go index 7cbaa2e50c0..46282a965d3 100644 --- a/pkg/agent/controller/networkpolicy/fqdn.go +++ b/pkg/agent/controller/networkpolicy/fqdn.go @@ -756,7 +756,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error { f.onDNSResponseMsg(&dnsMsg, time.Now(), waitCh) } go func() { - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { // Can't parse the packet. Forward it to the Pod. waitCh <- nil @@ -821,7 +821,7 @@ func (f *fqdnController) handlePacketIn(pktIn *ofctrl.PacketIn) error { // sendDNSPacketout forwards the DNS response packet to the original requesting client. func (f *fqdnController) sendDNSPacketout(pktIn *ofctrl.PacketIn) error { - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return err } diff --git a/pkg/agent/controller/networkpolicy/packetin.go b/pkg/agent/controller/networkpolicy/packetin.go index 7730641f12e..abe0dbd0973 100644 --- a/pkg/agent/controller/networkpolicy/packetin.go +++ b/pkg/agent/controller/networkpolicy/packetin.go @@ -21,8 +21,6 @@ import ( "time" "antrea.io/libOpenflow/openflow15" - "antrea.io/libOpenflow/protocol" - "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "github.com/vmware/go-ipfix/pkg/registry" "k8s.io/klog/v2" @@ -210,11 +208,3 @@ func isAntreaPolicyEgressTable(tableID uint8) bool { } return false } - -func getEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) { - ethernetPkt := new(protocol.Ethernet) - if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil { - return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err) - } - return ethernetPkt, nil -} diff --git a/pkg/agent/controller/networkpolicy/reject.go b/pkg/agent/controller/networkpolicy/reject.go index 5581ac15790..70cbcb732ad 100644 --- a/pkg/agent/controller/networkpolicy/reject.go +++ b/pkg/agent/controller/networkpolicy/reject.go @@ -15,7 +15,6 @@ package networkpolicy import ( - "encoding/binary" "fmt" "net" @@ -92,7 +91,7 @@ const ( func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { // All src/dst mean the source/destination of the reject packet, which are destination/source of the incoming packet. // Get ethernet data. - ethernetPkt, err := getEthernetPacket(pktIn) + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) if err != nil { return err } @@ -191,47 +190,7 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { inPort, outPort := getRejectOFPorts(packetOutType, sIface, dIface, c.gwPort, c.tunPort) mutateFunc := getRejectPacketOutMutateFunc(packetOutType, c.nodeType, isFlexibleIPAMSrc, isFlexibleIPAMDst, ctZone) - if proto == protocol.Type_TCP { - // Get TCP data. - oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data) - if err != nil { - return err - } - // While sending TCP reject packet-out, switch original src/dst port, - // set the ackNum as original seqNum+1 and set the flag as ack+rst. - return c.ofClient.SendTCPPacketOut( - srcMAC, - dstMAC, - srcIP, - dstIP, - inPort, - outPort, - isIPv6, - oriTCPDstPort, - oriTCPSrcPort, - 0, - oriTCPSeqNum+1, - 0, - TCPAck|TCPRst, - 0, - nil, - mutateFunc) - } - // Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject. - icmpType := ICMPDstUnreachableType - icmpCode := ICMPDstHostAdminProhibitedCode - ipHdrLen := IPv4HdrLen - if isIPv6 { - icmpType = ICMPv6DstUnreachableType - icmpCode = ICMPv6DstAdminProhibitedCode - ipHdrLen = IPv6HdrLen - } - ipHdr, _ := ethernetPkt.Data.MarshalBinary() - icmpData := make([]byte, int(ICMPUnusedHdrLen+ipHdrLen+8)) - // Put ICMP unused header in Data prop and set it to zero. - binary.BigEndian.PutUint32(icmpData[:ICMPUnusedHdrLen], 0) - copy(icmpData[ICMPUnusedHdrLen:], ipHdr[:ipHdrLen+8]) - return c.ofClient.SendICMPPacketOut( + return openflow.SendRejectPacketOut(c.ofClient, srcMAC, dstMAC, srcIP, @@ -239,9 +198,8 @@ func (c *Controller) rejectRequest(pktIn *ofctrl.PacketIn) error { inPort, outPort, isIPv6, - icmpType, - icmpCode, - icmpData, + ethernetPkt, + proto, mutateFunc) } diff --git a/pkg/agent/controller/traceflow/packetin_test.go b/pkg/agent/controller/traceflow/packetin_test.go index fdd712f2ed3..e497c87577f 100644 --- a/pkg/agent/controller/traceflow/packetin_test.go +++ b/pkg/agent/controller/traceflow/packetin_test.go @@ -226,8 +226,8 @@ func getTestPacketBytes() []byte { func TestParsePacketIn(t *testing.T) { xreg0 := make([]byte, 8) - binary.BigEndian.PutUint32(xreg0[0:4], 262144) // RemoteSNATRegMark in 32bit reg0 - binary.BigEndian.PutUint32(xreg0[4:8], 2) // outputPort in 32bit reg1 + binary.BigEndian.PutUint32(xreg0[0:4], openflow.RemoteSNATRegMark.GetValue()<reg3,set_field:0x50/0xffff->reg4,resubmit:EndpointDNAT", deleteOFEntriesError: fmt.Errorf("error when deleting Openflow entries for Service Endpoints Group 100"), }, + { + name: "No Endpoints", + endpoints: []proxy.Endpoint{}, + expectedGroup: "group_id=100,type=select," + + "bucket=bucket_id:0,weight:100,actions=set_field:0x40000/0x7e000->reg0,resubmit:EndpointDNAT", + }, } for _, tc := range testCases { @@ -1290,7 +1296,7 @@ func Test_client_InstallPodSNATFlows(t *testing.T) { { name: "SNAT on Remote", expectedFlows: []string{ - "cookie=0x1040000000000, table=EgressMark, priority=200,ip,in_port=100 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:ff->eth_dst,set_field:192.168.77.101->tun_dst,set_field:0x10/0xf0->reg0,set_field:0x40000/0x40000->reg0,goto_table:L2ForwardingCalc", + "cookie=0x1040000000000, table=EgressMark, priority=200,ip,in_port=100 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:ff->eth_dst,set_field:192.168.77.101->tun_dst,set_field:0x10/0xf0->reg0,set_field:0x80000/0x80000->reg0,goto_table:L2ForwardingCalc", }, }, } @@ -1842,7 +1848,7 @@ func Test_client_InstallMulticastRemoteReportFlows(t *testing.T) { groupID := binding.GroupIDType(102) expectedFlows := []string{ "cookie=0x1050000000000, table=Classifier, priority=210,ip,in_port=1,nw_dst=224.0.0.0/4 actions=set_field:0x1/0xf->reg0,goto_table:MulticastEgressRule", - "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,in_port=4294967293 actions=set_field:0x20000/0x3e000->reg0,group:102", + "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,in_port=4294967293 actions=set_field:0x20000/0x7e000->reg0,group:102", "cookie=0x1050000000000, table=Classifier, priority=200,in_port=4294967293 actions=goto_table:PipelineIPClassifier", } @@ -2116,7 +2122,7 @@ func Test_client_InstallMulticlusterNodeFlows(t *testing.T) { expectedFlows: []string{ "cookie=0x1060000000000, table=L3Forwarding, priority=200,ip,nw_dst=10.97.0.0/16 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1060000000000, table=L3Forwarding, priority=200,ct_state=+rpl+trk,ip,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", - "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x4000/0x3e000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", + "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x4000/0x7e000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", }, }, //TODO: IPv6 @@ -2168,7 +2174,7 @@ func Test_client_InstallMulticlusterGatewayFlows(t *testing.T) { "cookie=0x1060000000000, table=UnSNAT, priority=200,ip,nw_dst=192.168.77.100 actions=ct(table=ConntrackZone,zone=65521,nat)", "cookie=0x1060000000000, table=L3Forwarding, priority=200,ip,nw_dst=10.97.0.0/16 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1060000000000, table=L3Forwarding, priority=200,ct_state=+rpl+trk,ip,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", - "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x4000/0x3e000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", + "cookie=0x1060000000000, table=L3Forwarding, priority=199,ip,reg0=0x4000/0x7e000,nw_dst=192.168.78.101 actions=set_field:0a:00:00:00:00:01->eth_src,set_field:aa:bb:cc:dd:ee:f0->eth_dst,set_field:192.168.78.101->tun_dst,set_field:0x10/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1060000000000, table=SNATMark, priority=210,ct_state=+new+trk,ip,nw_dst=10.97.0.0/16 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", "cookie=0x1060000000000, table=SNAT, priority=200,ct_state=+new+trk,ip,nw_dst=10.97.0.0/16 actions=ct(commit,table=L2ForwardingCalc,zone=65521,nat(src=192.168.77.100))", }, diff --git a/pkg/agent/openflow/fields.go b/pkg/agent/openflow/fields.go index ab21ad84624..ef30679fa57 100644 --- a/pkg/agent/openflow/fields.go +++ b/pkg/agent/openflow/fields.go @@ -68,22 +68,24 @@ var ( DispositionAllowRegMark = binding.NewRegMark(APDispositionField, DispositionAllow) DispositionDropRegMark = binding.NewRegMark(APDispositionField, DispositionDrop) DispositionPassRegMark = binding.NewRegMark(APDispositionField, DispositionPass) - // reg0[13..17]: Field to indicate the reasons of sending packet to the controller. Marks in this field include: - // - 0b00001: logging - // - 0b00010: reject - // - 0b00100: deny (used by Flow Exporter) - // - 0b01000: DNS packet (used by FQDN) - // - 0b10000: IGMP packet (used by Multicast) - CustomReasonField = binding.NewRegField(0, 13, 17) - CustomReasonLoggingRegMark = binding.NewRegMark(CustomReasonField, CustomReasonLogging) - CustomReasonRejectRegMark = binding.NewRegMark(CustomReasonField, CustomReasonReject) - CustomReasonDenyRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDeny) - CustomReasonDNSRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDNS) - CustomReasonIGMPRegMark = binding.NewRegMark(CustomReasonField, CustomReasonIGMP) - // reg0[18]: Mark to indicate remote SNAT for Egress. - RemoteSNATRegMark = binding.NewOneBitRegMark(0, 18) - // reg0[19]: Field to indicate redirect action of layer 7 NetworkPolicy. - L7NPRegField = binding.NewRegField(0, 19, 19) + // reg0[13..18]: Field to indicate the reasons of sending packet to the controller. Marks in this field include: + // - 0b000001: logging + // - 0b000010: reject + // - 0b000100: deny (used by Flow Exporter) + // - 0b001000: DNS packet (used by FQDN) + // - 0b010000: IGMP packet (used by Multicast) + // - 0b100000: reject packet to a Service without any Endpoints (used by Proxy) + CustomReasonField = binding.NewRegField(0, 13, 18) + CustomReasonLoggingRegMark = binding.NewRegMark(CustomReasonField, CustomReasonLogging) + CustomReasonRejectRegMark = binding.NewRegMark(CustomReasonField, CustomReasonReject) + CustomReasonDenyRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDeny) + CustomReasonDNSRegMark = binding.NewRegMark(CustomReasonField, CustomReasonDNS) + CustomReasonIGMPRegMark = binding.NewRegMark(CustomReasonField, CustomReasonIGMP) + CustomReasonRejectSvcNoEpMark = binding.NewRegMark(CustomReasonField, CustomReasonRejectSvcNoEp) + // reg0[19]: Mark to indicate remote SNAT for Egress. + RemoteSNATRegMark = binding.NewOneBitRegMark(0, 19) + // reg0[20]: Field to indicate redirect action of layer 7 NetworkPolicy. + L7NPRegField = binding.NewRegField(0, 20, 20) L7NPRedirectRegMark = binding.NewRegMark(L7NPRegField, DispositionL7NPRedirect) // reg1(NXM_NX_REG1) @@ -128,11 +130,11 @@ var ( // externalTrafficPolicy is Cluster. ToClusterServiceRegMark = binding.NewOneBitRegMark(4, 21) // reg4[22..23]: Field to store the action of a traffic control rule. Marks in this field include: - TrafficControlActionField = binding.NewRegField(4, 22, 23) - // reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP. - NestedServiceRegMark = binding.NewOneBitRegMark(4, 24) + TrafficControlActionField = binding.NewRegField(4, 22, 23) TrafficControlMirrorRegMark = binding.NewRegMark(TrafficControlActionField, 0b01) TrafficControlRedirectRegMark = binding.NewRegMark(TrafficControlActionField, 0b10) + // reg4[24]: Mark to indicate whether the Endpoints of a Service includes another Service's ClusterIP. + NestedServiceRegMark = binding.NewOneBitRegMark(4, 24) // reg5(NXM_NX_REG5) // Field to cache the Egress conjunction ID hit by TraceFlow packet. diff --git a/pkg/agent/openflow/multicast_test.go b/pkg/agent/openflow/multicast_test.go index 0611c56d4f6..6fcb6ae266a 100644 --- a/pkg/agent/openflow/multicast_test.go +++ b/pkg/agent/openflow/multicast_test.go @@ -39,8 +39,8 @@ func Test_featureMulticast_initFlows(t *testing.T) { expectedFlows: []string{ "cookie=0x1050000000000, table=MulticastEgressRule, priority=64990,igmp,reg0=0x3/0xf actions=goto_table:MulticastRouting", "cookie=0x1050000000000, table=MulticastEgressPodMetric, priority=210,igmp actions=goto_table:MulticastRouting", - "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x3/0xf actions=set_field:0x20000/0x3e000->reg0,controller:(reason=no_match,max_len=128,id=32776)", - "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x1/0xf actions=set_field:0x20000/0x3e000->reg0,controller:(reason=no_match,max_len=128,id=32776)", + "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x3/0xf actions=set_field:0x20000/0x7e000->reg0,controller:(reason=no_match,max_len=128,id=32776)", + "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x1/0xf actions=set_field:0x20000/0x7e000->reg0,controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1050000000000, table=MulticastRouting, priority=190,ip actions=set_field:0x100/0x100->reg0,set_field:0x2->reg1,goto_table:MulticastOutput", "cookie=0x1050000000000, table=MulticastIngressPodMetric, priority=210,igmp actions=goto_table:MulticastOutput", "cookie=0x1050000000000, table=MulticastOutput, priority=210,reg0=0x101/0x10f,reg1=0x2 actions=drop", @@ -55,7 +55,7 @@ func Test_featureMulticast_initFlows(t *testing.T) { clientOptions: []clientOptionsFn{enableMulticast}, expectedFlows: []string{ "cookie=0x1050000000000, table=MulticastIngressPodMetric, priority=210,igmp actions=goto_table:MulticastOutput", - "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x3/0xf actions=set_field:0x20000/0x3e000->reg0,controller:(reason=no_match,max_len=128,id=32776)", + "cookie=0x1050000000000, table=MulticastRouting, priority=210,igmp,reg0=0x3/0xf actions=set_field:0x20000/0x7e000->reg0,controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1050000000000, table=MulticastRouting, priority=190,ip actions=set_field:0x100/0x100->reg0,set_field:0x2->reg1,goto_table:MulticastOutput", "cookie=0x1050000000000, table=MulticastEgressPodMetric, priority=210,igmp actions=goto_table:MulticastRouting", "cookie=0x1050000000000, table=MulticastEgressRule, priority=64990,igmp,reg0=0x3/0xf actions=goto_table:MulticastRouting", diff --git a/pkg/agent/openflow/packetin.go b/pkg/agent/openflow/packetin.go index fde9ea88c75..2a5cac0c8a2 100644 --- a/pkg/agent/openflow/packetin.go +++ b/pkg/agent/openflow/packetin.go @@ -16,9 +16,12 @@ package openflow import ( "encoding/binary" + "errors" "fmt" "antrea.io/libOpenflow/openflow15" + "antrea.io/libOpenflow/protocol" + "antrea.io/libOpenflow/util" "antrea.io/ofnet/ofctrl" "golang.org/x/time/rate" "k8s.io/klog/v2" @@ -53,6 +56,10 @@ const ( // PacketInReasonMC shares PacketInReasonNP for IGMP packet_in message. This is because OVS "controller" action // only correctly supports reason 0 or 1. Change to another value after the OVS action is corrected. PacketInReasonMC = PacketInReasonNP + // PacketInReasonSvcReject shares PacketInReasonNP to process the Service packet not matching any Endpoints within + // packet_in message. This is because OVS "controller" action only correctly supports reason 0 or 1. Change to another + // value after the OVS action is corrected. + PacketInReasonSvcReject = PacketInReasonNP // PacketInQueueSize defines the size of PacketInQueue. // When PacketInQueue reaches PacketInQueueSize, new packet-in will be dropped. PacketInQueueSize = 200 @@ -81,7 +88,7 @@ type featureStartPacketIn struct { packetInQueue *openflow.PacketInQueue } -func newfeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { +func newFeatureStartPacketIn(reason uint8, stopCh <-chan struct{}) *featureStartPacketIn { featurePacketIn := featureStartPacketIn{reason: reason, stopCh: stopCh} featurePacketIn.packetInQueue = openflow.NewPacketInQueue(PacketInQueueSize, rate.Limit(PacketInQueueRate)) @@ -96,7 +103,7 @@ func (c *client) StartPacketInHandler(stopCh <-chan struct{}) { // Iterate through each feature that starts packetin. Subscribe with their specified reason. for reason := range c.packetInHandlers { - featurePacketIn := newfeatureStartPacketIn(reason, stopCh) + featurePacketIn := newFeatureStartPacketIn(reason, stopCh) err := c.subscribeFeaturePacketIn(featurePacketIn) if err != nil { klog.Errorf("received error %+v while subscribing packetin for each feature", err) @@ -148,3 +155,22 @@ func GetMatchFieldByRegID(matchers *ofctrl.Matchers, regID int) *ofctrl.MatchFie } return &ofctrl.MatchField{MatchField: openflow15.NewRegMatchFieldWithMask(regID, data, mask)} } + +func GetInfoInReg(regMatch *ofctrl.MatchField, rng *openflow15.NXRange) (uint32, error) { + regValue, ok := regMatch.GetValue().(*ofctrl.NXRegister) + if !ok { + return 0, errors.New("register value cannot be retrieved") + } + if rng != nil { + return ofctrl.GetUint32ValueWithRange(regValue.Data, rng), nil + } + return regValue.Data, nil +} + +func GetEthernetPacket(pktIn *ofctrl.PacketIn) (*protocol.Ethernet, error) { + ethernetPkt := new(protocol.Ethernet) + if err := ethernetPkt.UnmarshalBinary(pktIn.Data.(*util.Buffer).Bytes()); err != nil { + return nil, fmt.Errorf("failed to parse ethernet packet from packet-in message: %v", err) + } + return ethernetPkt, nil +} diff --git a/pkg/agent/openflow/packetout.go b/pkg/agent/openflow/packetout.go new file mode 100644 index 00000000000..2ab0a4b89c6 --- /dev/null +++ b/pkg/agent/openflow/packetout.go @@ -0,0 +1,104 @@ +// Copyright 2023 Antrea Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package openflow + +import ( + "encoding/binary" + + "antrea.io/libOpenflow/protocol" + + binding "antrea.io/antrea/pkg/ovs/openflow" +) + +const ( + ipv4HdrLen uint16 = 20 + ipv6HdrLen uint16 = 40 + + icmpUnusedHdrLen uint16 = 4 + + tcpAck uint8 = 0b010000 + tcpRst uint8 = 0b000100 + + icmpDstUnreachableType uint8 = 3 + icmpDstHostAdminProhibitedCode uint8 = 10 + + icmpv6DstUnreachableType uint8 = 1 + icmpv6DstAdminProhibitedCode uint8 = 1 +) + +func SendRejectPacketOut(ofClient Client, + srcMAC string, + dstMAC string, + srcIP string, + dstIP string, + inPort uint32, + outPort uint32, + isIPv6 bool, + ethernetPkt *protocol.Ethernet, + proto uint8, + mutateFunc func(binding.PacketOutBuilder) binding.PacketOutBuilder) error { + if proto == protocol.Type_TCP { + // Get TCP data. + oriTCPSrcPort, oriTCPDstPort, oriTCPSeqNum, _, _, _, _, err := binding.GetTCPHeaderData(ethernetPkt.Data) + if err != nil { + return err + } + // While sending TCP reject packet-out, switch original src/dst port, + // set the ackNum as original seqNum+1 and set the flag as ack+rst. + return ofClient.SendTCPPacketOut( + srcMAC, + dstMAC, + srcIP, + dstIP, + inPort, + outPort, + isIPv6, + oriTCPDstPort, + oriTCPSrcPort, + 0, + oriTCPSeqNum+1, + 0, + tcpAck|tcpRst, + 0, + nil, + mutateFunc) + } + // Use ICMP host administratively prohibited for ICMP, UDP, SCTP reject. + icmpType := icmpDstUnreachableType + icmpCode := icmpDstHostAdminProhibitedCode + ipHdrLen := ipv4HdrLen + if isIPv6 { + icmpType = icmpv6DstUnreachableType + icmpCode = icmpv6DstAdminProhibitedCode + ipHdrLen = ipv6HdrLen + } + ipHdr, _ := ethernetPkt.Data.MarshalBinary() + icmpData := make([]byte, int(icmpUnusedHdrLen+ipHdrLen+8)) + // Put ICMP unused header in Data prop and set it to zero. + binary.BigEndian.PutUint32(icmpData[:icmpUnusedHdrLen], 0) + copy(icmpData[icmpUnusedHdrLen:], ipHdr[:ipHdrLen+8]) + return ofClient.SendICMPPacketOut( + srcMAC, + dstMAC, + srcIP, + dstIP, + inPort, + outPort, + isIPv6, + icmpType, + icmpCode, + icmpData, + mutateFunc) +} diff --git a/pkg/agent/openflow/pipeline.go b/pkg/agent/openflow/pipeline.go index 71891efa825..e49db455199 100644 --- a/pkg/agent/openflow/pipeline.go +++ b/pkg/agent/openflow/pipeline.go @@ -358,9 +358,10 @@ const ( // that the corresponding connection has been dropped or rejected. It can be consumed // by the Flow Exporter to export flow records for connections denied by network // policy rules. - CustomReasonDeny = 0b100 - CustomReasonDNS = 0b1000 - CustomReasonIGMP = 0b10000 + CustomReasonDeny = 0b100 + CustomReasonDNS = 0b1000 + CustomReasonIGMP = 0b10000 + CustomReasonRejectSvcNoEp = 0b100000 // DispositionL7NPRedirect is used when sending packet-in to controller for // logging layer 7 NetworkPolicy indicating that this packet is redirected to // l7 engine to determine the disposition. @@ -2480,6 +2481,14 @@ func (f *featureService) endpointDNATFlow(endpointIP net.IP, endpointPort uint16 // EndpointDNATTable. Otherwise, buckets will resubmit packets to EndpointDNATTable directly. func (f *featureService) serviceEndpointGroup(groupID binding.GroupIDType, withSessionAffinity bool, endpoints ...proxy.Endpoint) binding.Group { group := f.bridge.CreateGroup(groupID).ResetBuckets() + + if len(endpoints) == 0 { + return group.Bucket().Weight(100). + LoadToRegField(CustomReasonField, CustomReasonRejectSvcNoEp). + ResubmitToTable(EndpointDNATTable.GetID()). + Done() + } + var resubmitTableID uint8 if withSessionAffinity { resubmitTableID = ServiceLBTable.GetID() diff --git a/pkg/agent/openflow/service.go b/pkg/agent/openflow/service.go index 5f7e4048972..2284672a8c0 100644 --- a/pkg/agent/openflow/service.go +++ b/pkg/agent/openflow/service.go @@ -124,6 +124,15 @@ func newFeatureService( } } +// serviceNoEndpointFlow generates the flow to match the packets to Service without Endpoint and send them to controller. +func (f *featureService) serviceNoEndpointFlow() binding.Flow { + return EndpointDNATTable.ofTable.BuildFlow(priorityNormal). + Cookie(f.cookieAllocator.Request(f.category).Raw()). + MatchRegMark(CustomReasonRejectSvcNoEpMark). + Action().SendToController(uint8(PacketInReasonSvcReject)). + Done() +} + func (f *featureService) initFlows() []binding.Flow { var flows []binding.Flow if f.enableProxy { @@ -134,6 +143,7 @@ func (f *featureService) initFlows() []binding.Flow { flows = append(flows, f.snatConntrackFlows()...) flows = append(flows, f.serviceNeedLBFlow()) flows = append(flows, f.sessionAffinityReselectFlow()) + flows = append(flows, f.serviceNoEndpointFlow()) flows = append(flows, f.l2ForwardOutputHairpinServiceFlow()) if f.proxyAll { // This installs the flows to match the first packet of NodePort connection. The flows set a bit of a register diff --git a/pkg/agent/openflow/service_test.go b/pkg/agent/openflow/service_test.go index afe2f87e34b..5fcb29d6133 100644 --- a/pkg/agent/openflow/service_test.go +++ b/pkg/agent/openflow/service_test.go @@ -41,6 +41,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ip actions=resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg0=0x40000/0x7e000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", @@ -62,6 +63,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=PreRoutingClassifier, priority=200,ipv6 actions=resubmit:SessionAffinity,resubmit:ServiceLB", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg0=0x40000/0x7e000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))", @@ -86,6 +88,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=NodePortMark, priority=200,ip,nw_dst=169.254.0.252 actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg0=0x40000/0x7e000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ip,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65520,exec(set_field:0x20/0x20->ct_mark))", @@ -110,6 +113,7 @@ func Test_featureService_initFlows(t *testing.T) { "cookie=0x1030000000000, table=NodePortMark, priority=200,ipv6,ipv6_dst=fc01::aabb:ccdd:eefe actions=set_field:0x80000/0x80000->reg4", "cookie=0x1030000000000, table=SessionAffinity, priority=0 actions=set_field:0x10000/0x70000->reg4", "cookie=0x1030000000000, table=EndpointDNAT, priority=190,reg4=0x20000/0x70000 actions=set_field:0x10000/0x70000->reg4,resubmit:ServiceLB", + "cookie=0x1030000000000, table=EndpointDNAT, priority=200,reg0=0x40000/0x7e000 actions=controller:(reason=no_match,max_len=128,id=32776)", "cookie=0x1030000000000, table=L3Forwarding, priority=190,ct_mark=0x10/0x10,reg0=0x202/0x20f actions=set_field:0a:00:00:00:00:01->eth_dst,set_field:0x20/0xf0->reg0,goto_table:L3DecTTL", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x22/0xff actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark,set_field:0x40/0x40->ct_mark))", "cookie=0x1030000000000, table=SNATMark, priority=200,ct_state=+new+trk,ipv6,reg0=0x12/0xff,reg4=0x200000/0x200000 actions=ct(commit,table=SNAT,zone=65510,exec(set_field:0x20/0x20->ct_mark))", diff --git a/pkg/agent/proxy/proxier.go b/pkg/agent/proxy/proxier.go index 09a80478d1e..1489e1c3a37 100644 --- a/pkg/agent/proxy/proxier.go +++ b/pkg/agent/proxy/proxier.go @@ -15,6 +15,7 @@ package proxy import ( + "errors" "fmt" "math" "net" @@ -23,6 +24,8 @@ import ( "sync" "time" + "antrea.io/libOpenflow/protocol" + "antrea.io/ofnet/ofctrl" corev1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/runtime" @@ -409,10 +412,6 @@ func (p *proxier) installServices() { p.endpointsInstalledMap[svcPortName] = endpointsInstalled } endpointsToInstall := p.endpointsMap[svcPortName] - // If both expected Endpoints number and installed Endpoints number are 0, we don't need to take care of this Service. - if len(endpointsToInstall) == 0 && len(endpointsInstalled) == 0 { - continue - } installedSvcPort, ok := p.serviceInstalledMap[svcPortName] var pSvcInfo *types.ServiceInfo @@ -960,6 +959,7 @@ func (p *proxier) deleteServiceByIP(serviceStr string) { func (p *proxier) Run(stopCh <-chan struct{}) { p.once.Do(func() { + p.ofClient.RegisterPacketInHandler(uint8(openflow.PacketInReasonSvcReject), "svc-reject", p) go p.serviceConfig.Run(stopCh) if p.endpointSliceEnabled { go p.endpointSliceConfig.Run(stopCh) @@ -1023,6 +1023,70 @@ func (p *proxier) GetServiceFlowKeys(serviceName, namespace string) ([]string, [ return flows, groups, found } +func (p *proxier) HandlePacketIn(pktIn *ofctrl.PacketIn) error { + if pktIn == nil { + return errors.New("empty packetin for Antrea Proxy") + } + matches := pktIn.GetMatches() + + match := openflow.GetMatchFieldByRegID(matches, openflow.CustomReasonField.GetRegID()) + if match == nil { + return fmt.Errorf("error getting match mark in CustomField") + } + + customReasons, err := openflow.GetInfoInReg(match, openflow.CustomReasonField.GetRange().ToNXRange()) + if err != nil { + klog.ErrorS(err, "Received error while unloading customReason from OVS reg") + return err + } + if customReasons&openflow.CustomReasonRejectSvcNoEp != openflow.CustomReasonRejectSvcNoEp { + return nil + } + + // Get Ethernet data. + ethernetPkt, err := openflow.GetEthernetPacket(pktIn) + if err != nil { + return err + } + srcMAC := ethernetPkt.HWDst.String() + dstMAC := ethernetPkt.HWSrc.String() + + var ( + srcIP, dstIP string + proto uint8 + isIPv6 bool + ) + switch ipPkt := ethernetPkt.Data.(type) { + case *protocol.IPv4: + srcIP = ipPkt.NWDst.String() + dstIP = ipPkt.NWSrc.String() + proto = ipPkt.Protocol + isIPv6 = false + case *protocol.IPv6: + srcIP = ipPkt.NWDst.String() + dstIP = ipPkt.NWSrc.String() + proto = ipPkt.NextHeader + isIPv6 = true + } + + inPortField := matches.GetMatchByName(binding.OxmFieldInPort) + if inPortField == nil { + return fmt.Errorf("error when getting match field inPort") + } + outPort := inPortField.GetValue().(uint32) + return openflow.SendRejectPacketOut(p.ofClient, + srcMAC, + dstMAC, + srcIP, + dstIP, + 0, + outPort, + isIPv6, + ethernetPkt, + proto, + nil) +} + func NewProxier( hostname string, informerFactory informers.SharedInformerFactory, diff --git a/pkg/agent/proxy/proxier_test.go b/pkg/agent/proxy/proxier_test.go index 52f88619ba1..f6e64577d2f 100644 --- a/pkg/agent/proxy/proxier_test.go +++ b/pkg/agent/proxy/proxier_test.go @@ -1360,10 +1360,20 @@ func testClusterIPNoEndpoint(t *testing.T, svcIP net.IP, isIPv6 bool) { fp := NewFakeProxier(mockRouteClient, mockOFClient, nil, groupAllocator, isIPv6) svc := makeTestClusterIPService(&svcPortName, svcIP, int32(svcPort), corev1.ProtocolTCP, nil, nil, false) + updatedSvc := makeTestClusterIPService(&svcPortName, svcIP, int32(svcPort+1), corev1.ProtocolTCP, nil, nil, false) makeServiceMap(fp, svc) makeEndpointSliceMap(fp) + + groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, []k8sproxy.Endpoint{}).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), gomock.Any(), uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) + fp.syncProxyRules() + assert.Contains(t, fp.serviceInstalledMap, svcPortName) + + mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) fp.syncProxyRules() - assert.NotContains(t, fp.serviceInstalledMap, svcPortName) } func TestClusterIPNoEndpoint(t *testing.T) { @@ -1375,6 +1385,137 @@ func TestClusterIPNoEndpoint(t *testing.T) { }) } +func testNodePortNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, isIPv6 bool) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(isIPv6) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + + svc := makeTestNodePortService(&svcPortName, svcIP, + int32(svcPort), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + corev1.ServiceInternalTrafficPolicyCluster, + corev1.ServiceExternalTrafficPolicyTypeLocal) + updatedSvc := makeTestNodePortService(&svcPortName, svcIP, + int32(svcPort+1), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + corev1.ServiceInternalTrafficPolicyCluster, + corev1.ServiceExternalTrafficPolicyTypeLocal) + makeServiceMap(fp, svc) + makeEndpointSliceMap(fp) + + vIP := agentconfig.VirtualNodePortDNATIPv4 + if isIPv6 { + vIP = agentconfig.VirtualNodePortDNATIPv6 + } + + groupIDCluster := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + mockOFClient.EXPECT().InstallServiceGroup(groupIDCluster, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + fp.syncProxyRules() + + mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) + fp.syncProxyRules() +} + +func TestNodePortNoEndpoint(t *testing.T) { + t.Run("IPv4", func(t *testing.T) { + testNodePortNoEndpoint(t, nodePortAddressesIPv4, svc1IPv4, false) + }) + t.Run("IPv6", func(t *testing.T) { + testNodePortNoEndpoint(t, nodePortAddressesIPv6, svc1IPv6, true) + }) +} + +func testLoadBalancerNoEndpoint(t *testing.T, nodePortAddresses []net.IP, svcIP net.IP, loadBalancerIP net.IP, isIPv6 bool) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + mockOFClient, mockRouteClient := getMockClients(ctrl) + groupAllocator := openflow.NewGroupAllocator(isIPv6) + fp := NewFakeProxier(mockRouteClient, mockOFClient, nodePortAddresses, groupAllocator, isIPv6, withProxyAll) + + internalTrafficPolicy := corev1.ServiceInternalTrafficPolicyCluster + externalTrafficPolicy := corev1.ServiceExternalTrafficPolicyTypeLocal + + svc := makeTestLoadBalancerService(&svcPortName, svcIP, + []net.IP{loadBalancerIP}, + int32(svcPort), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + &internalTrafficPolicy, + externalTrafficPolicy) + updatedSvc := makeTestLoadBalancerService(&svcPortName, svcIP, + []net.IP{loadBalancerIP}, + int32(svcPort+1), + int32(svcNodePort), + corev1.ProtocolTCP, + nil, + &internalTrafficPolicy, + externalTrafficPolicy) + makeServiceMap(fp, svc) + makeEndpointSliceMap(fp) + + vIP := agentconfig.VirtualNodePortDNATIPv4 + if isIPv6 { + vIP = agentconfig.VirtualNodePortDNATIPv6 + } + + groupIDCluster := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + groupIDLocal := fp.groupCounter.AllocateIfNotExist(svcPortName, true) + mockOFClient.EXPECT().InstallServiceGroup(groupIDCluster, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceGroup(groupIDLocal, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort), gomock.Any(), uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) + fp.syncProxyRules() + + mockOFClient.EXPECT().UninstallServiceFlows(svcIP, uint16(svcPort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(vIP, uint16(svcNodePort), gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallServiceFlows(loadBalancerIP, uint16(svcPort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().DeleteNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().DeleteLoadBalancer(loadBalancerIP).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDCluster, svcIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeClusterIP, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, vIP, uint16(svcNodePort), gomock.Any(), uint16(0), true, corev1.ServiceTypeNodePort, false).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupIDLocal, loadBalancerIP, uint16(svcPort+1), gomock.Any(), uint16(0), true, corev1.ServiceTypeLoadBalancer, false).Times(1) + mockRouteClient.EXPECT().AddClusterIPRoute(svcIP).Times(1) + mockRouteClient.EXPECT().AddNodePort(nodePortAddresses, uint16(svcNodePort), gomock.Any()).Times(1) + mockRouteClient.EXPECT().AddLoadBalancer(loadBalancerIP).Times(1) + fp.serviceChanges.OnServiceUpdate(svc, updatedSvc) + fp.syncProxyRules() +} + +func TestLoadBalancerNoEndpoint(t *testing.T) { + t.Run("IPv4", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, nodePortAddressesIPv4, svc1IPv4, loadBalancerIPv4, false) + }) + t.Run("IPv6", func(t *testing.T) { + testLoadBalancerNoEndpoint(t, nodePortAddressesIPv6, svc1IPv6, loadBalancerIPv6, true) + }) +} + func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP, isIPv6 bool) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -1406,16 +1547,22 @@ func testClusterIPRemoveSamePortEndpoint(t *testing.T, svcIP net.IP, epIP net.IP groupID := fp.groupCounter.AllocateIfNotExist(svcPortNameTCP, false) groupIDUDP := fp.groupCounter.AllocateIfNotExist(svcPortNameUDP, false) mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) - mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(2) + mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), protocolTCP, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupIDUDP, svcIP, uint16(svcPort), protocolUDP, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) fp.syncProxyRules() + mockOFClient.EXPECT().InstallServiceGroup(groupIDUDP, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(protocolUDP, gomock.Any()).Times(1) fp.endpointsChanges.OnEndpointSliceUpdate(epsUDP, true) fp.syncProxyRules() + + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(protocolTCP, gomock.Any()).Times(1) + fp.endpointsChanges.OnEndpointSliceUpdate(epsTCP, true) + fp.syncProxyRules() } func TestClusterIPRemoveSamePortEndpoint(t *testing.T) { @@ -1447,15 +1594,16 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv } groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) - mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(2) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), bindingProtocol, uint16(0), false, corev1.ServiceTypeClusterIP, false).Times(1) - mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.syncProxyRules() assert.Contains(t, fp.serviceInstalledMap, svcPortName) assert.Contains(t, fp.endpointsInstalledMap, svcPortName) + mockOFClient.EXPECT().InstallServiceGroup(groupID, false, gomock.Any()).Times(1) + mockOFClient.EXPECT().UninstallEndpointFlows(bindingProtocol, gomock.Any()).Times(1) fp.endpointsChanges.OnEndpointSliceUpdate(eps, true) fp.syncProxyRules() @@ -1463,6 +1611,7 @@ func testClusterIPRemoveEndpoints(t *testing.T, svcIP net.IP, epIP net.IP, isIPv endpointsMap, ok := fp.endpointsInstalledMap[svcPortName] assert.True(t, ok) assert.Equal(t, 0, len(endpointsMap)) + fp.syncProxyRules() } func TestClusterIPRemoveEndpoints(t *testing.T) { @@ -1566,6 +1715,10 @@ func testSessionAffinityNoEndpoint(t *testing.T, svcExternalIPs net.IP, svcIP ne }) makeServiceMap(fp, svc) makeEndpointsMap(fp) + + groupID := fp.groupCounter.AllocateIfNotExist(svcPortName, false) + mockOFClient.EXPECT().InstallServiceGroup(groupID, true, []k8sproxy.Endpoint{}).Times(1) + mockOFClient.EXPECT().InstallServiceFlows(groupID, svcIP, uint16(svcPort), gomock.Any(), uint16(10800), false, gomock.Any(), false).Times(1) fp.syncProxyRules() } diff --git a/pkg/ovs/openflow/interfaces.go b/pkg/ovs/openflow/interfaces.go index f9656162a0a..bd1ded899ba 100644 --- a/pkg/ovs/openflow/interfaces.go +++ b/pkg/ovs/openflow/interfaces.go @@ -77,6 +77,7 @@ const ( NxmFieldDstIPv4 = "NXM_OF_IP_DST" NxmFieldSrcIPv6 = "NXM_NX_IPV6_SRC" NxmFieldDstIPv6 = "NXM_NX_IPV6_DST" + NxmFieldTunIPv4Src = "NXM_NX_TUN_IPV4_SRC" OxmFieldVLANVID = "OXM_OF_VLAN_VID" OxmFieldInPort = "OXM_OF_IN_PORT" diff --git a/pkg/ovs/openflow/testing/mock_openflow.go b/pkg/ovs/openflow/testing/mock_openflow.go index b0e78e64c2f..5d618fe8c5b 100644 --- a/pkg/ovs/openflow/testing/mock_openflow.go +++ b/pkg/ovs/openflow/testing/mock_openflow.go @@ -1,4 +1,4 @@ -// Copyright 2022 Antrea Authors +// Copyright 2023 Antrea Authors // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. diff --git a/test/integration/agent/openflow_test.go b/test/integration/agent/openflow_test.go index 3e4ddeca89b..e1702a221f9 100644 --- a/test/integration/agent/openflow_test.go +++ b/test/integration/agent/openflow_test.go @@ -1718,7 +1718,7 @@ func prepareEgressMarkFlows(snatIP net.IP, mark, podOFPort, podOFPortRemote uint }, { MatchStr: fmt.Sprintf("priority=200,%s,in_port=%d", ipProtoStr, podOFPortRemote), - ActStr: fmt.Sprintf("set_field:%s->eth_src,set_field:%s->eth_dst,set_field:%s->%s,set_field:0x10/0xf0->reg0,set_field:0x40000/0x40000->reg0,goto_table:L2ForwardingCalc", localGwMAC.String(), vMAC.String(), snatIP, tunDstFieldName), + ActStr: fmt.Sprintf("set_field:%s->eth_src,set_field:%s->eth_dst,set_field:%s->%s,set_field:0x10/0xf0->reg0,set_field:0x80000/0x80000->reg0,goto_table:L2ForwardingCalc", localGwMAC.String(), vMAC.String(), snatIP, tunDstFieldName), }, }, },