Skip to content

Commit

Permalink
Store NetworkPolicy in filesystem as fallback data source (antrea-io#…
Browse files Browse the repository at this point in the history
…5739)

In the previous implementation, traffic from/to a Pod may bypass
NetworkPolicies applied to the Pod in a time window when the agent
restarts because realizing NetworkPolicies and enabling forwarding are
asynchronous.

This patch stores NetworkPolicy data in files when they are received,
and makes antre-agent fallback to use the files as data source if it
can't connect to antrea-controller on startup. This prevents security
regression: a NetworkPolicy that has been realized on a Node will
continue to work even if antrea-controller is not available after
antrea-agent restarts.

The benchmark results of the storage's operations are as below:

BenchmarkFileStoreAddNetworkPolicy-40              70383             16102 ns/op             520 B/op          9 allocs/op
BenchmarkFileStoreAddAppliedToGroup-40             45382             25880 ns/op            3019 B/op          9 allocs/op
BenchmarkFileStoreAddAddressGroup-40                7400            180000 ns/op           49538 B/op          9 allocs/op
BenchmarkFileStoreReplaceAll-40                       10         127088004 ns/op        17815943 B/op      33099 allocs/op

The disk usage when storing 1k NetworkPolicies, AddressGroups, and
AppliedToGroups created by BenchmarkFileStoreReplaceAll is as below:

16M     /var/run/antrea-test/file-store/address-groups
4.0M    /var/run/antrea-test/file-store/applied-to-groups
4.0M    /var/run/antrea-test/file-store/network-policies

Signed-off-by: Quan Tian <qtian@vmware.com>
  • Loading branch information
tnqn committed Nov 29, 2023
1 parent b1fa272 commit f9fc979
Show file tree
Hide file tree
Showing 8 changed files with 781 additions and 24 deletions.
2 changes: 2 additions & 0 deletions cmd/antrea-agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net"
"time"

"github.com/spf13/afero"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -459,6 +460,7 @@ func run(o *Options) error {
antreaClientProvider,
ofClient,
ifaceStore,
afero.NewOsFs(),
nodeKey,
podUpdateChannel,
externalEntityUpdateChannel,
Expand Down
32 changes: 26 additions & 6 deletions pkg/agent/controller/networkpolicy/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -551,13 +551,14 @@ func (c *ruleCache) addAddressGroupLocked(group *v1beta.AddressGroup) error {

// PatchAddressGroup updates a cached *v1beta.AddressGroup.
// The rules referencing it will be regarded as dirty.
func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error {
// It returns a copy of the patched AddressGroup, or an error if the AddressGroup doesn't exist.
func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) (*v1beta.AddressGroup, error) {
c.addressSetLock.Lock()
defer c.addressSetLock.Unlock()

groupMemberSet, exists := c.addressSetByGroup[patch.Name]
if !exists {
return fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name)
return nil, fmt.Errorf("AddressGroup %v doesn't exist in cache, can't be patched", patch.Name)
}
for i := range patch.AddedGroupMembers {
groupMemberSet.Insert(&patch.AddedGroupMembers[i])
Expand All @@ -567,7 +568,16 @@ func (c *ruleCache) PatchAddressGroup(patch *v1beta.AddressGroupPatch) error {
}

c.onAddressGroupUpdate(patch.Name)
return nil

members := make([]v1beta.GroupMember, 0, len(groupMemberSet))
for _, member := range groupMemberSet {
members = append(members, *member)
}
group := &v1beta.AddressGroup{
ObjectMeta: patch.ObjectMeta,
GroupMembers: members,
}
return group, nil
}

// DeleteAddressGroup deletes a cached *v1beta.AddressGroup.
Expand Down Expand Up @@ -639,13 +649,14 @@ func (c *ruleCache) addAppliedToGroupLocked(group *v1beta.AppliedToGroup) error

// PatchAppliedToGroup updates a cached *v1beta.AppliedToGroupPatch.
// The rules referencing it will be regarded as dirty.
func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error {
// It returns a copy of the patched AppliedToGroup, or an error if the AppliedToGroup doesn't exist.
func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) (*v1beta.AppliedToGroup, error) {
c.appliedToSetLock.Lock()
defer c.appliedToSetLock.Unlock()

memberSet, exists := c.appliedToSetByGroup[patch.Name]
if !exists {
return fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name)
return nil, fmt.Errorf("AppliedToGroup %v doesn't exist in cache, can't be patched", patch.Name)
}
for i := range patch.AddedGroupMembers {
memberSet.Insert(&patch.AddedGroupMembers[i])
Expand All @@ -654,7 +665,16 @@ func (c *ruleCache) PatchAppliedToGroup(patch *v1beta.AppliedToGroupPatch) error
memberSet.Delete(&patch.RemovedGroupMembers[i])
}
c.onAppliedToGroupUpdate(patch.Name)
return nil

members := make([]v1beta.GroupMember, 0, len(memberSet))
for _, member := range memberSet {
members = append(members, *member)
}
group := &v1beta.AppliedToGroup{
ObjectMeta: patch.ObjectMeta,
GroupMembers: members,
}
return group, nil
}

// DeleteAppliedToGroup deletes a cached *v1beta.AppliedToGroup.
Expand Down
10 changes: 8 additions & 2 deletions pkg/agent/controller/networkpolicy/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1039,7 +1039,7 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) {
for _, rule := range tt.rules {
c.rules.Add(rule)
}
err := c.PatchAppliedToGroup(tt.args)
ret, err := c.PatchAppliedToGroup(tt.args)
if (err == nil) == tt.expectedErr {
t.Fatalf("Got error %v, expected %t", err, tt.expectedErr)
}
Expand All @@ -1048,6 +1048,9 @@ func TestRuleCachePatchAppliedToGroup(t *testing.T) {
}
actualPods, _ := c.appliedToSetByGroup[tt.args.Name]
assert.ElementsMatch(t, tt.expectedPods, actualPods.Items(), "stored Pods not equal")
if !tt.expectedErr {
assert.Equal(t, len(ret.GroupMembers), len(actualPods))
}
})
}
}
Expand Down Expand Up @@ -1116,7 +1119,7 @@ func TestRuleCachePatchAddressGroup(t *testing.T) {
for _, rule := range tt.rules {
c.rules.Add(rule)
}
err := c.PatchAddressGroup(tt.args)
ret, err := c.PatchAddressGroup(tt.args)
if (err == nil) == tt.expectedErr {
t.Fatalf("Got error %v, expected %t", err, tt.expectedErr)
}
Expand All @@ -1125,6 +1128,9 @@ func TestRuleCachePatchAddressGroup(t *testing.T) {
}
actualAddresses, _ := c.addressSetByGroup[tt.args.Name]
assert.ElementsMatch(t, tt.expectedAddresses, actualAddresses.Items(), "stored addresses not equal")
if !tt.expectedErr {
assert.Equal(t, len(ret.GroupMembers), len(actualAddresses))
}
})
}
}
Expand Down
134 changes: 134 additions & 0 deletions pkg/agent/controller/networkpolicy/filestore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright 2023 Antrea Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package networkpolicy

import (
"fmt"
"io"
"io/fs"
"os"
"path/filepath"

"github.com/spf13/afero"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2"
)

// fileStore encodes and stores runtime.Objects in files. Each object will be stored in a separate file under the given
// directory.
type fileStore struct {
fs afero.Fs
// The directory to store the files.
dir string
// serializer knows how to encode and decode the objects.
serializer runtime.Serializer
}

func newFileStore(fs afero.Fs, dir string, serializer runtime.Serializer) (*fileStore, error) {
s := &fileStore{
fs: fs,
dir: dir,
serializer: serializer,
}
klog.V(2).InfoS("Creating directory for NetworkPolicy cache", "dir", dir)
if err := s.fs.MkdirAll(dir, 0o600); err != nil {
return nil, err
}
return s, nil
}

// save stores the given object in file with the object's UID as the file name, overwriting any existing content if the
// file already exists. Note the method may update the object's GroupVersionKind in-place during serialization.
func (s fileStore) save(item runtime.Object) error {
object := item.(metav1.Object)
path := filepath.Join(s.dir, string(object.GetUID()))
file, err := s.fs.OpenFile(path, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0o600)
if err != nil {
return fmt.Errorf("error opening file for writing object %v: %w", object.GetUID(), err)
}
defer file.Close()
// Encode may update the object's GroupVersionKind in-place during serialization.
err = s.serializer.Encode(item, file)
if err != nil {
return fmt.Errorf("error writing object %v to file: %w", object.GetUID(), err)
}
return nil
}

// delete removes the file with the object's UID as the file name if it exists.
func (s fileStore) delete(item runtime.Object) error {
object := item.(metav1.Object)
path := filepath.Join(s.dir, string(object.GetUID()))
err := s.fs.Remove(path)
if err != nil {
if os.IsNotExist(err) {
return nil
}
return err
}
return nil
}

// replaceAll replaces all files under the directory with the given objects. Existing files not in the given objects
// will be removed. Note the method may update the object's GroupVersionKind in-place during serialization.
func (s fileStore) replaceAll(items []runtime.Object) error {
if err := s.fs.RemoveAll(s.dir); err != nil {
return err
}
if err := s.fs.MkdirAll(s.dir, 0o600); err != nil {
return err
}
for _, item := range items {
if err := s.save(item); err != nil {
return err
}
}
return nil
}

func (s fileStore) loadAll() ([]runtime.Object, error) {
var objects []runtime.Object
err := afero.Walk(s.fs, s.dir, func(path string, info fs.FileInfo, err error) error {
if info.IsDir() {
return nil
}
file, err2 := s.fs.Open(path)
if err2 != nil {
return err2
}
defer file.Close()
data, err2 := io.ReadAll(file)
if err2 != nil {
return err2
}

object, gkv, err2 := s.serializer.Decode(data, nil, nil)
// If the data is corrupted somehow, we still want to load other data and continue the process.
if err2 != nil {
klog.ErrorS(err2, "Failed to decode data from file, ignore it", "file", path)
return nil
}
// Note: we haven't stored a different version so far but version conversion should be performed when the used
// version is upgraded in the future.
klog.V(2).InfoS("Loaded object from file", "gkv", gkv, "object", object)
objects = append(objects, object)
return nil
})
if err != nil {
return nil, err
}
return objects, nil
}
Loading

0 comments on commit f9fc979

Please sign in to comment.