Skip to content

Commit

Permalink
Look up Pods by name to fetch labels in Flow Aggregator (#4942)
Browse files Browse the repository at this point in the history
Currently Flow Aggregator looks up Pods by IP address to fetch labels,
it is very possible to obtain incorrect Pods when Pod turnover is high.
In this commit, we instead to look up Pods by Pod Name and Namespace
to fetch labels in Flow Aggregator.

Signed-off-by: Yongming Ding <dyongming@vmware.com>
  • Loading branch information
dreamtalen committed May 16, 2023
1 parent 1953c9f commit 197d282
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 47 deletions.
67 changes: 34 additions & 33 deletions pkg/flowaggregator/flowaggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
corev1 "k8s.io/api/core/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
corelisters "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"

Expand Down Expand Up @@ -126,6 +127,7 @@ type flowAggregator struct {
s3Exporter exporter.Interface
logExporter exporter.Interface
logTickerDuration time.Duration
podLister corelisters.PodLister
}

func NewFlowAggregator(
Expand Down Expand Up @@ -175,6 +177,7 @@ func NewFlowAggregator(
configData: data,
APIServer: opt.Config.APIServer,
logTickerDuration: time.Minute,
podLister: podInformer.Lister(),
}
err = fa.InitCollectingProcess()
if err != nil {
Expand Down Expand Up @@ -405,7 +408,7 @@ func (fa *flowAggregator) sendFlowKeyRecord(key ipfixintermediate.FlowKey, recor
fa.aggregationProcess.SetCorrelatedFieldsFilled(record, true)
}
if fa.includePodLabels && !fa.aggregationProcess.AreExternalFieldsFilled(*record) {
fa.fillPodLabels(key, record.Record)
fa.fillPodLabels(record.Record)
fa.aggregationProcess.SetExternalFieldsFilled(record, true)
}
if fa.ipfixExporter != nil {
Expand Down Expand Up @@ -482,55 +485,53 @@ func (fa *flowAggregator) fillK8sMetadata(key ipfixintermediate.FlowKey, record
}
}

func (fa *flowAggregator) fetchPodLabels(podAddress string) string {
pods, err := fa.podInformer.Informer().GetIndexer().ByIndex(podInfoIndex, podAddress)
func (fa *flowAggregator) fetchPodLabels(podNamespace string, podName string) string {
pod, err := fa.podLister.Pods(podNamespace).Get(podName)
if err != nil {
klog.Warning(err)
klog.InfoS("Failed to get Pod", "namespace", podNamespace, "name", podName, "err", err)
return ""
} else if len(pods) == 0 {
klog.InfoS("No Pod objects found for Pod Address", "podAddress", podAddress)
return ""
}
pod, ok := pods[0].(*corev1.Pod)
if !ok {
klog.Warningf("Invalid Pod obj in cache")
}
labelsJSON, err := json.Marshal(pod.GetLabels())
if err != nil {
klog.Warningf("JSON encoding of Pod labels failed: %v", err)
klog.ErrorS(err, "Error when JSON encoding of Pod labels")
return ""
}
return string(labelsJSON)
}

func (fa *flowAggregator) fillPodLabels(key ipfixintermediate.FlowKey, record ipfixentities.Record) {
podLabelString := fa.fetchPodLabels(key.SourceAddress)
sourcePodLabelsElement, err := fa.registry.GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID)
if err == nil {
sourcePodLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString(podLabelString).Bytes())
if err != nil {
klog.Warningf("Create sourcePodLabels InfoElementWithValue failed: %v", err)
}
err = record.AddInfoElement(sourcePodLabelsIE)
if err != nil {
klog.Warningf("Add sourcePodLabels InfoElementWithValue failed: %v", err)
func (fa *flowAggregator) fillPodLabelsForSide(record ipfixentities.Record, podNamespaceIEName, podNameIEName, podLabelsIEName string) error {
podLabelsString := ""
if podName, _, ok := record.GetInfoElementWithValue(podNameIEName); ok {
podNameString := podName.GetStringValue()
if podNamespace, _, ok := record.GetInfoElementWithValue(podNamespaceIEName); ok {
podNamespaceString := podNamespace.GetStringValue()
if podNameString != "" && podNamespaceString != "" {
podLabelsString = fa.fetchPodLabels(podNamespaceString, podNameString)
}
}
} else {
klog.Warningf("Get sourcePodLabels InfoElement failed: %v", err)
}
podLabelString = fa.fetchPodLabels(key.DestinationAddress)
destinationPodLabelsElement, err := fa.registry.GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID)
podLabelsElement, err := fa.registry.GetInfoElement(podLabelsIEName, ipfixregistry.AntreaEnterpriseID)
if err == nil {
destinationPodLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString(podLabelString).Bytes())
podLabelsIE, err := ipfixentities.DecodeAndCreateInfoElementWithValue(podLabelsElement, bytes.NewBufferString(podLabelsString).Bytes())
if err != nil {
klog.Warningf("Create destinationPodLabelsIE InfoElementWithValue failed: %v", err)
return fmt.Errorf("error when creating podLabels InfoElementWithValue: %v", err)
}
err = record.AddInfoElement(destinationPodLabelsIE)
if err != nil {
klog.Warningf("Add destinationPodLabels InfoElementWithValue failed: %v", err)
if err := record.AddInfoElement(podLabelsIE); err != nil {
return fmt.Errorf("error when adding podLabels InfoElementWithValue: %v", err)
}
} else {
klog.Warningf("Get destinationPodLabels InfoElement failed: %v", err)
return fmt.Errorf("error when getting podLabels InfoElementWithValue: %v", err)
}

return nil
}

func (fa *flowAggregator) fillPodLabels(record ipfixentities.Record) {
if err := fa.fillPodLabelsForSide(record, "sourcePodNamespace", "sourcePodName", "sourcePodLabels"); err != nil {
klog.ErrorS(err, "Error when filling pod labels", "side", "source")
}
if err := fa.fillPodLabelsForSide(record, "destinationPodNamespace", "destinationPodName", "destinationPodLabels"); err != nil {
klog.ErrorS(err, "Error when filling pod labels", "side", "destination")
}
}

Expand Down
38 changes: 24 additions & 14 deletions pkg/flowaggregator/flowaggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync)

newFlowAggregator := func(includePodLabels bool) *flowAggregator {
podInformer := informerFactory.Core().V1().Pods()
return &flowAggregator{
aggregatorTransportProtocol: "tcp",
aggregationProcess: mockAggregationProcess,
Expand All @@ -79,7 +80,8 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
registry: mockIPFIXRegistry,
flowAggregatorAddress: "",
includePodLabels: includePodLabels,
podInformer: informerFactory.Core().V1().Pods(),
podInformer: podInformer,
podLister: podInformer.Lister(),
}
}

Expand Down Expand Up @@ -158,12 +160,14 @@ func TestFlowAggregator_sendFlowKeyRecord(t *testing.T) {
mockAggregationProcess.EXPECT().SetCorrelatedFieldsFilled(tc.flowRecord, true)
if tc.includePodLabels {
mockAggregationProcess.EXPECT().AreExternalFieldsFilled(*tc.flowRecord).Return(false)
sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0)
mockRecord.EXPECT().GetInfoElementWithValue("sourcePodName").Return(sourcePodNameElem, 0, false)
sourcePodLabelsElement := ipfixentities.NewInfoElement("sourcePodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("sourcePodLabels", ipfixregistry.AntreaEnterpriseID).Return(sourcePodLabelsElement, nil)
sourcePodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(sourcePodLabelsElement, bytes.NewBufferString("").Bytes())
mockRecord.EXPECT().AddInfoElement(sourcePodLabelsIE).Return(nil)
destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(ipfixentities.NewInfoElement("destinationPodLabels", 0, 0, ipfixregistry.AntreaEnterpriseID, 0), nil)
mockRecord.EXPECT().GetInfoElementWithValue("destinationPodName").Return(destPodNameElem, 0, false)
destinationPodLabelsElement := ipfixentities.NewInfoElement("destinationPodLabels", 0, ipfixentities.String, ipfixregistry.AntreaEnterpriseID, 0)
mockIPFIXRegistry.EXPECT().GetInfoElement("destinationPodLabels", ipfixregistry.AntreaEnterpriseID).Return(destinationPodLabelsElement, nil)
destinationPodLabelsIE, _ := ipfixentities.DecodeAndCreateInfoElementWithValue(destinationPodLabelsElement, bytes.NewBufferString("").Bytes())
mockRecord.EXPECT().AddInfoElement(destinationPodLabelsIE).Return(nil)
mockAggregationProcess.EXPECT().SetExternalFieldsFilled(tc.flowRecord, true)
Expand Down Expand Up @@ -747,29 +751,35 @@ func TestFlowAggregator_fetchPodLabels(t *testing.T) {
informerFactory.Core().V1().Pods().Informer().GetIndexer().Add(pod)

tests := []struct {
name string
podAddress string
want string
name string
podName string
podNamespace string
want string
}{
{
name: "no pod object",
podAddress: "192.168.1.3",
name: "no pod object",
podName: "",
podNamespace: "",
want: "",
},
{
name: "pod with label",
podAddress: "192.168.1.2",
want: "{\"test\":\"ut\"}",
name: "pod with label",
podName: "testPod",
podNamespace: "default",
want: "{\"test\":\"ut\"}",
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
podInformer := informerFactory.Core().V1().Pods()
fa := &flowAggregator{
k8sClient: client,
includePodLabels: true,
podInformer: informerFactory.Core().V1().Pods(),
podInformer: podInformer,
podLister: podInformer.Lister(),
}
got := fa.fetchPodLabels(tt.podAddress)
got := fa.fetchPodLabels(tt.podNamespace, tt.podName)
assert.Equal(t, tt.want, got)
})
}
Expand Down

0 comments on commit 197d282

Please sign in to comment.