Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Trim unneeded fields stored in informers to reduce memory footprint #6317

Merged
merged 1 commit into from
May 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ func run(o *Options) error {
}
k8s.OverrideKubeAPIServer(o.config.KubeAPIServerOverride)

informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync)
informerFactory := informers.NewSharedInformerFactoryWithOptions(k8sClient, informerDefaultResync, informers.WithTransform(k8s.NewTrimmer()))
crdInformerFactory := crdinformers.NewSharedInformerFactoryWithOptions(crdClient, informerDefaultResync, crdinformers.WithTransform(k8s.NewTrimmer()))
traceflowInformer := crdInformerFactory.Crd().V1beta1().Traceflows()
egressInformer := crdInformerFactory.Crd().V1beta1().Egresses()
externalIPPoolInformer := crdInformerFactory.Crd().V1beta1().ExternalIPPools()
Expand Down Expand Up @@ -347,13 +347,15 @@ func run(o *Options) error {
listOptions := func(options *metav1.ListOptions) {
options.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", nodeConfig.Name).String()
}
return coreinformers.NewFilteredPodInformer(
informer := coreinformers.NewFilteredPodInformer(
k8sClient,
metav1.NamespaceAll,
resyncPeriodDisabled,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, // NamespaceIndex is used in NPLController.
listOptions,
)
informer.SetTransform(k8s.NewTrimmer(k8s.TrimPod))
return informer
})

var mcDefaultRouteController *mcroute.MCDefaultRouteController
Expand All @@ -370,6 +372,7 @@ func run(o *Options) error {
mcInformerFactoryWithNamespaceOption = mcinformers.NewSharedInformerFactoryWithOptions(mcClient,
informerDefaultResync,
mcinformers.WithNamespace(o.config.Multicluster.Namespace),
mcinformers.WithTransform(k8s.NewTrimmer()),
)
gwInformer := mcInformerFactoryWithNamespaceOption.Multicluster().V1alpha1().Gateways()
ciImportInformer := mcInformerFactoryWithNamespaceOption.Multicluster().V1alpha1().ClusterInfoImports()
Expand All @@ -393,7 +396,7 @@ func run(o *Options) error {
}
}
if enableMulticlusterNP {
mcInformerFactory = mcinformers.NewSharedInformerFactory(mcClient, informerDefaultResync)
mcInformerFactory = mcinformers.NewSharedInformerFactoryWithOptions(mcClient, informerDefaultResync, mcinformers.WithTransform(k8s.NewTrimmer()))
labelIDInformer := mcInformerFactory.Multicluster().V1alpha1().LabelIdentities()
mcStrechedNetworkPolicyController = mcroute.NewMCAgentStretchedNetworkPolicyController(
ofClient,
Expand Down Expand Up @@ -605,6 +608,7 @@ func run(o *Options) error {
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
listOptions,
)
localExternalNodeInformer.SetTransform(k8s.NewTrimmer())
externalNodeController, err = externalnode.NewExternalNodeController(ovsBridgeClient, ofClient, localExternalNodeInformer,
ifaceStore, externalEntityUpdateChannel, o.config.ExternalNode.ExternalNodeNamespace, o.config.ExternalNode.PolicyBypassRules)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions cmd/antrea-controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,8 +131,8 @@ func run(o *Options) error {
return fmt.Errorf("error creating K8s clients: %v", err)
}
k8s.OverrideKubeAPIServer(o.config.KubeAPIServerOverride)
informerFactory := informers.NewSharedInformerFactory(client, informerDefaultResync)
crdInformerFactory := crdinformers.NewSharedInformerFactory(crdClient, informerDefaultResync)
informerFactory := informers.NewSharedInformerFactoryWithOptions(client, informerDefaultResync, informers.WithTransform(k8s.NewTrimmer(k8s.TrimPod)))
crdInformerFactory := crdinformers.NewSharedInformerFactoryWithOptions(crdClient, informerDefaultResync, crdinformers.WithTransform(k8s.NewTrimmer()))
policyInformerFactory := policyv1a1informers.NewSharedInformerFactory(policyClient, informerDefaultResync)
podInformer := informerFactory.Core().V1().Pods()
namespaceInformer := informerFactory.Core().V1().Namespaces()
Expand Down
91 changes: 91 additions & 0 deletions pkg/util/k8s/transform.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8s

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/client-go/tools/cache"
)

// NewTrimmer returns a cache.TransformFunc that can be used to trim objects stored in informers.
// The function must be idempotent before client-go v0.31.0 to avoid a race condition happening when objects were
// accessed during Resync operation, see https://github.com/kubernetes/kubernetes/issues/124337.
// But it's generally more efficient to avoid trimming the same object more than once.
//
// Note: be cautious when adding fields to be trimmed, ensuring they do not inadvertently clear the original values when
// objects are updated by Antrea to kube-apiserver.
func NewTrimmer(extraTrimmers ...cache.TransformFunc) cache.TransformFunc {
return func(obj interface{}) (interface{}, error) {
accessor, err := meta.Accessor(obj)
if err != nil {
return obj, nil
}
// It means the objects has been trimmed.
if accessor.GetManagedFields() == nil {
return obj, nil
}
// Trim common fields for all objects.
// According to https://kubernetes.io/docs/reference/using-api/server-side-apply/#clearing-managedfields,
// setting the managedFields to an empty list will not reset the field when the objects are updated, so it's
// safe to trim it for all objects.
accessor.SetManagedFields(nil)
antoninbas marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So just to confirm, we don't have the same issue for managedFields as for ownerReference?
Is it because of this:

Note that setting the managedFields to an empty list will not reset the field. This is on purpose, so managedFields never get stripped by clients not aware of the field.

From https://kubernetes.io/docs/reference/using-api/server-side-apply/#clearing-managedfields

If yes, maybe we could add a comment explaining why this is safe?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, added a few notes and explanations.


// Trim type specific fields for each type.
for _, trimmer := range extraTrimmers {
trimmer(obj)
}
return obj, nil
}
}

// TrimPod clears unused fields from a Pod that are not required by Antrea.
// It's safe to do so because Antrea only patches Pod.
func TrimPod(obj interface{}) (interface{}, error) {
pod, ok := obj.(*corev1.Pod)
if !ok {
return obj, nil
}

pod.OwnerReferences = nil
pod.Spec.Volumes = nil
pod.Spec.InitContainers = nil
for i := range pod.Spec.Containers {
container := &pod.Spec.Containers[i]
container.Command = nil
container.Args = nil
container.EnvFrom = nil
container.Env = nil
container.VolumeMounts = nil
container.VolumeDevices = nil
container.LivenessProbe = nil
container.ReadinessProbe = nil
container.StartupProbe = nil
container.Lifecycle = nil
container.SecurityContext = nil
}
pod.Spec.EphemeralContainers = nil
pod.Spec.Affinity = nil
pod.Spec.Tolerations = nil
pod.Spec.ResourceClaims = nil

pod.Status.Conditions = nil
pod.Status.StartTime = nil
pod.Status.InitContainerStatuses = nil
pod.Status.ContainerStatuses = nil
pod.Status.EphemeralContainerStatuses = nil
pod.Status.ResourceClaimStatuses = nil
return pod, nil
}
112 changes: 112 additions & 0 deletions pkg/util/k8s/transform_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright 2024 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package k8s

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/tools/cache"
)

func TestTrimK8sObject(t *testing.T) {
tests := []struct {
name string
trimmer cache.TransformFunc
obj interface{}
want interface{}
}{
{
name: "pod",
trimmer: NewTrimmer(TrimPod),
obj: &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
UID: "test-uid",
OwnerReferences: []metav1.OwnerReference{
{
APIVersion: "apps/v1",
Kind: "DaemonSet",
Name: "test-daemonset",
UID: "5a39d3c8-0f5f-4aad-94bf-315c4fe11320",
},
},
ManagedFields: []metav1.ManagedFieldsEntry{
{
APIVersion: "v1",
FieldsType: "FieldsV1",
},
},
},
Spec: corev1.PodSpec{
InitContainers: []corev1.Container{{
Name: "container-0",
}},
Containers: []corev1.Container{{
Name: "container-1",
Command: []string{"foo", "bar"},
Args: []string{"--a=b"},
Env: []corev1.EnvVar{{Name: "foo", Value: "bar"}},
}},
NodeName: "nodeA",
},
Status: corev1.PodStatus{
Conditions: []corev1.PodCondition{
{
Type: corev1.PodReady,
Status: corev1.ConditionTrue,
},
},
PodIP: "1.2.3.4",
PodIPs: []corev1.PodIP{
{IP: "1.2.3.4"},
},
},
},
want: &corev1.Pod{
TypeMeta: metav1.TypeMeta{},
ObjectMeta: metav1.ObjectMeta{
Name: "test-pod",
Namespace: "default",
UID: "test-uid",
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{{
Name: "container-1",
}},
NodeName: "nodeA",
},
Status: corev1.PodStatus{
PodIP: "1.2.3.4",
PodIPs: []corev1.PodIP{
{IP: "1.2.3.4"},
},
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := tt.trimmer(tt.obj)
require.NoError(t, err)
assert.Equal(t, tt.want, got)
})
}
}
Loading