Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race conditions in NetworkPolicyController #4028

Merged
merged 1 commit into from
Aug 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 67 additions & 109 deletions pkg/controller/networkpolicy/antreanetworkpolicy.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,70 +24,38 @@ import (
"antrea.io/antrea/pkg/apis/controlplane"
crdv1alpha1 "antrea.io/antrea/pkg/apis/crd/v1alpha1"
antreatypes "antrea.io/antrea/pkg/controller/types"
"antrea.io/antrea/pkg/util/k8s"
)

// addANP receives AntreaNetworkPolicy ADD events and creates resources
// which can be consumed by agents to configure corresponding rules on the Nodes.
func getANPReference(anp *crdv1alpha1.NetworkPolicy) *controlplane.NetworkPolicyReference {
return &controlplane.NetworkPolicyReference{
Type: controlplane.AntreaNetworkPolicy,
Namespace: anp.Namespace,
Name: anp.Name,
UID: anp.UID,
}
}

// addANP receives AntreaNetworkPolicy ADD events and enqueues a reference of
// the AntreaNetworkPolicy to trigger its process.
func (n *NetworkPolicyController) addANP(obj interface{}) {
defer n.heartbeat("addANP")
np := obj.(*crdv1alpha1.NetworkPolicy)
klog.Infof("Processing Antrea NetworkPolicy %s/%s ADD event", np.Namespace, np.Name)
// Create an internal NetworkPolicy object corresponding to this
// NetworkPolicy and enqueue task to internal NetworkPolicy Workqueue.
internalNP := n.processAntreaNetworkPolicy(np)
klog.V(2).Infof("Creating new internal NetworkPolicy %s for %s", internalNP.Name, internalNP.SourceRef.ToString())
n.internalNetworkPolicyStore.Create(internalNP)
key := internalNetworkPolicyKeyFunc(np)
n.enqueueInternalNetworkPolicy(key)
n.enqueueInternalNetworkPolicy(getANPReference(np))
}

// updateANP receives AntreaNetworkPolicy UPDATE events and updates resources
// which can be consumed by agents to configure corresponding rules on the Nodes.
// updateANP receives AntreaNetworkPolicy UPDATE events and enqueues a reference
// of the AntreaNetworkPolicy to trigger its process.
func (n *NetworkPolicyController) updateANP(old, cur interface{}) {
defer n.heartbeat("updateANP")
curNP := cur.(*crdv1alpha1.NetworkPolicy)
klog.Infof("Processing Antrea NetworkPolicy %s/%s UPDATE event", curNP.Namespace, curNP.Name)
// Update an internal NetworkPolicy, corresponding to this NetworkPolicy and
// enqueue task to internal NetworkPolicy Workqueue.
curInternalNP := n.processAntreaNetworkPolicy(curNP)
klog.V(2).Infof("Updating existing internal NetworkPolicy %s for %s", curInternalNP.Name, curInternalNP.SourceRef.ToString())
// Retrieve old crdv1alpha1.NetworkPolicy object.
oldNP := old.(*crdv1alpha1.NetworkPolicy)
// Old and current NetworkPolicy share the same key.
key := internalNetworkPolicyKeyFunc(oldNP)
// Lock access to internal NetworkPolicy store such that concurrent access
// to an internal NetworkPolicy is not allowed. This will avoid the
// case in which an Update to an internal NetworkPolicy object may
// cause the SpanMeta member to be overridden with stale SpanMeta members
// from an older internal NetworkPolicy.
n.internalNetworkPolicyMutex.Lock()
oldInternalNPObj, _, _ := n.internalNetworkPolicyStore.Get(key)
oldInternalNP := oldInternalNPObj.(*antreatypes.NetworkPolicy)
// Must preserve old internal NetworkPolicy Span.
curInternalNP.SpanMeta = oldInternalNP.SpanMeta
n.internalNetworkPolicyStore.Update(curInternalNP)
// Unlock the internal NetworkPolicy store.
n.internalNetworkPolicyMutex.Unlock()
// Enqueue addressGroup keys to update their Node span.
for _, rule := range curInternalNP.Rules {
for _, addrGroupName := range rule.From.AddressGroups {
n.enqueueAddressGroup(addrGroupName)
}
for _, addrGroupName := range rule.To.AddressGroups {
n.enqueueAddressGroup(addrGroupName)
}
}
n.enqueueInternalNetworkPolicy(key)
for _, atg := range oldInternalNP.AppliedToGroups {
// Delete the old AppliedToGroup object if it is not referenced
// by any internal NetworkPolicy.
n.deleteDereferencedAppliedToGroup(atg)
}
n.deleteDereferencedAddressGroups(oldInternalNP)
n.enqueueInternalNetworkPolicy(getANPReference(curNP))
}

// deleteANP receives AntreaNetworkPolicy DELETED events and deletes resources
// which can be consumed by agents to delete corresponding rules on the Nodes.
// deleteANP receives AntreaNetworkPolicy DELETE events and enqueues a reference
// of the AntreaNetworkPolicy to trigger its process.
func (n *NetworkPolicyController) deleteANP(old interface{}) {
np, ok := old.(*crdv1alpha1.NetworkPolicy)
if !ok {
Expand All @@ -104,33 +72,20 @@ func (n *NetworkPolicyController) deleteANP(old interface{}) {
}
defer n.heartbeat("deleteANP")
klog.Infof("Processing Antrea NetworkPolicy %s/%s DELETE event", np.Namespace, np.Name)
key := internalNetworkPolicyKeyFunc(np)
oldInternalNPObj, _, _ := n.internalNetworkPolicyStore.Get(key)
oldInternalNP := oldInternalNPObj.(*antreatypes.NetworkPolicy)
klog.V(2).Infof("Deleting internal NetworkPolicy %s for %s", oldInternalNP.Name, oldInternalNP.SourceRef.ToString())
err := n.internalNetworkPolicyStore.Delete(key)
if err != nil {
klog.Errorf("Error deleting internal NetworkPolicy during Antrea NetworkPolicy %s delete: %v", np.Name, err)
return
}
for _, atg := range oldInternalNP.AppliedToGroups {
n.deleteDereferencedAppliedToGroup(atg)
}
n.deleteDereferencedAddressGroups(oldInternalNP)
n.enqueueInternalNetworkPolicy(getANPReference(np))
}

// processAntreaNetworkPolicy creates an internal NetworkPolicy instance
// corresponding to the crdv1alpha1.NetworkPolicy object. This method
// does not commit the internal NetworkPolicy in store, instead returns an
// instance to the caller wherein, it will be either stored as a new Object
// in case of ADD event or modified and store the updated instance, in case
// of an UPDATE event.
func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.NetworkPolicy) *antreatypes.NetworkPolicy {
// instance to the caller.
func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.NetworkPolicy) (*antreatypes.NetworkPolicy, map[string]*antreatypes.AppliedToGroup, map[string]*antreatypes.AddressGroup) {
appliedToPerRule := len(np.Spec.AppliedTo) == 0
// appliedToGroupNames tracks all distinct appliedToGroups referred to by the Antrea NetworkPolicy,
// appliedToGroups tracks all distinct appliedToGroups referred to by the Antrea NetworkPolicy,
// either in the spec section or in ingress/egress rules.
// The span calculation and stale appliedToGroup cleanup logic would work seamlessly for both cases.
appliedToGroupNamesSet := sets.String{}
appliedToGroups := map[string]*antreatypes.AppliedToGroup{}
addressGroups := map[string]*antreatypes.AddressGroup{}
rules := make([]controlplane.NetworkPolicyRule, 0, len(np.Spec.Ingress)+len(np.Spec.Egress))
newUnrealizableInternalNetworkPolicy := func(err error) *antreatypes.NetworkPolicy {
return &antreatypes.NetworkPolicy{
Expand All @@ -147,54 +102,61 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
}
}
// Create AppliedToGroup for each AppliedTo present in AntreaNetworkPolicy spec.
_, err := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo, appliedToGroupNamesSet)
atgs, err := n.processAppliedTo(np.Namespace, np.Spec.AppliedTo)
if err != nil {
return newUnrealizableInternalNetworkPolicy(err)
return newUnrealizableInternalNetworkPolicy(err), nil, nil
}
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
// Compute NetworkPolicyRule for Ingress Rule.
for idx, ingressRule := range np.Spec.Ingress {
// Set default action to ALLOW to allow traffic.
services, namedPortExists := toAntreaServicesForCRD(ingressRule.Ports, ingressRule.Protocols)
// Create AppliedToGroup for each AppliedTo present in the ingress rule.
atGroups, err := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo, appliedToGroupNamesSet)
atgs, err := n.processAppliedTo(np.Namespace, ingressRule.AppliedTo)
if err != nil {
return newUnrealizableInternalNetworkPolicy(err)
return newUnrealizableInternalNetworkPolicy(err), nil, nil
}
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
peer, ags := n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists)
addressGroups = mergeAddressGroups(addressGroups, ags...)
rules = append(rules, controlplane.NetworkPolicyRule{
Direction: controlplane.DirectionIn,
From: *n.toAntreaPeerForCRD(ingressRule.From, np, controlplane.DirectionIn, namedPortExists),
From: *peer,
Services: services,
Name: ingressRule.Name,
Action: ingressRule.Action,
Priority: int32(idx),
EnableLogging: ingressRule.EnableLogging,
AppliedToGroups: atGroups,
AppliedToGroups: getAppliedToGroupNames(atgs),
})
}
// Compute NetworkPolicyRule for Egress Rule.
for idx, egressRule := range np.Spec.Egress {
// Set default action to ALLOW to allow traffic.
services, namedPortExists := toAntreaServicesForCRD(egressRule.Ports, egressRule.Protocols)
// Create AppliedToGroup for each AppliedTo present in the egress rule.
atGroups, err := n.processAppliedTo(np.Namespace, egressRule.AppliedTo, appliedToGroupNamesSet)
atgs, err := n.processAppliedTo(np.Namespace, egressRule.AppliedTo)
if err != nil {
return newUnrealizableInternalNetworkPolicy(err)
return newUnrealizableInternalNetworkPolicy(err), nil, nil
}
var peers *controlplane.NetworkPolicyPeer
appliedToGroups = mergeAppliedToGroups(appliedToGroups, atgs...)
var peer *controlplane.NetworkPolicyPeer
if egressRule.ToServices != nil {
peers = n.svcRefToPeerForCRD(egressRule.ToServices, np.Namespace)
peer = n.svcRefToPeerForCRD(egressRule.ToServices, np.Namespace)
} else {
peers = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists)
var ags []*antreatypes.AddressGroup
peer, ags = n.toAntreaPeerForCRD(egressRule.To, np, controlplane.DirectionOut, namedPortExists)
addressGroups = mergeAddressGroups(addressGroups, ags...)
}
rules = append(rules, controlplane.NetworkPolicyRule{
Direction: controlplane.DirectionOut,
To: *peers,
To: *peer,
Services: services,
Name: egressRule.Name,
Action: egressRule.Action,
Priority: int32(idx),
EnableLogging: egressRule.EnableLogging,
AppliedToGroups: atGroups,
AppliedToGroups: getAppliedToGroupNames(atgs),
})
}
tierPriority := n.getTierPriority(np.Spec.Tier)
Expand All @@ -208,34 +170,33 @@ func (n *NetworkPolicyController) processAntreaNetworkPolicy(np *crdv1alpha1.Net
Name: internalNetworkPolicyKeyFunc(np),
UID: np.UID,
Generation: np.Generation,
AppliedToGroups: appliedToGroupNamesSet.List(),
AppliedToGroups: sets.StringKeySet(appliedToGroups).List(),
Rules: rules,
Priority: &np.Spec.Priority,
TierPriority: &tierPriority,
AppliedToPerRule: appliedToPerRule,
}
return internalNetworkPolicy
return internalNetworkPolicy, appliedToGroups, addressGroups
}

func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer, appliedToGroupNamesSet sets.String) ([]string, error) {
var appliedToGroupNames []string
func (n *NetworkPolicyController) processAppliedTo(namespace string, appliedTo []crdv1alpha1.NetworkPolicyPeer) ([]*antreatypes.AppliedToGroup, error) {
var appliedToGroups []*antreatypes.AppliedToGroup
for _, at := range appliedTo {
var atg string
var atg *antreatypes.AppliedToGroup
if at.Group != "" {
var err error
atg, err = n.processAppliedToGroupForNamespacedGroup(namespace, at.Group)
atg, err = n.createAppliedToGroupForNamespacedGroup(namespace, at.Group)
if err != nil {
return appliedToGroupNames, err
return nil, err
}
} else {
atg = n.createAppliedToGroup(namespace, at.PodSelector, at.NamespaceSelector, at.ExternalEntitySelector)
}
if atg != "" {
appliedToGroupNames = append(appliedToGroupNames, atg)
appliedToGroupNamesSet.Insert(atg)
if atg != nil {
appliedToGroups = append(appliedToGroups, atg)
}
}
return appliedToGroupNames, nil
return appliedToGroups, nil
}

// ErrNetworkPolicyAppliedToUnsupportedGroup is an error response when
Expand All @@ -249,26 +210,23 @@ func (e ErrNetworkPolicyAppliedToUnsupportedGroup) Error() string {
return fmt.Sprintf("group %s/%s with IPBlocks or NamespaceSelector can not be used as AppliedTo", e.namespace, e.groupName)
}

func (n *NetworkPolicyController) processAppliedToGroupForNamespacedGroup(namespace, groupName string) (string, error) {
// Retrieve Group for corresponding entry in the AppliedToGroup.
g, err := n.grpLister.Groups(namespace).Get(groupName)
if err != nil {
// The Group referred to has not been created yet.
return "", nil
}
key := internalGroupKeyFunc(g)
// Find the internal Group corresponding to this Group
func (n *NetworkPolicyController) createAppliedToGroupForNamespacedGroup(namespace, groupName string) (*antreatypes.AppliedToGroup, error) {
// Namespaced group uses NAMESPACE/NAME as the key of the corresponding internal group.
key := k8s.NamespacedName(namespace, groupName)
Dyanngg marked this conversation as resolved.
Show resolved Hide resolved
// Find the internal Group corresponding to this Group.
// There is no need to check if the namespaced group exists in groupLister because its existence will eventually be
// reflected in internalGroupStore.
ig, found, _ := n.internalGroupStore.Get(key)
if !found {
// Internal Group was not found. Once the internal Group is created, the sync
// worker for internal group will re-enqueue the ClusterNetworkPolicy processing
// which will trigger the creation of AddressGroup.
return "", nil
// Internal Group is not found, which means the corresponding namespaced group is either not created yet or not
// processed yet. Once the internal Group is created and processed, the sync worker for internal group will
// re-enqueue the ClusterNetworkPolicy processing which will trigger the creation of AppliedToGroup.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// re-enqueue the ClusterNetworkPolicy processing which will trigger the creation of AppliedToGroup.
// re-enqueue the AntreaNetworkPolicy processing which will trigger the creation of AppliedToGroup.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing it out, will fix it in other PR

return nil, nil
}
intGrp := ig.(*antreatypes.Group)
if len(intGrp.IPBlocks) > 0 || (intGrp.Selector != nil && intGrp.Selector.NamespaceSelector != nil) {
klog.V(2).InfoS("Group with IPBlocks or NamespaceSelector can not be used as AppliedTo", "Group", g)
return "", ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: namespace, groupName: groupName}
klog.V(2).InfoS("Group with IPBlocks or NamespaceSelector can not be used as AppliedTo", "Group", key)
return nil, ErrNetworkPolicyAppliedToUnsupportedGroup{namespace: namespace, groupName: groupName}
}
return n.createAppliedToGroupForInternalGroup(intGrp), nil
return &antreatypes.AppliedToGroup{UID: intGrp.UID, Name: key}, nil
}
Loading