Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconciler/managed: avoid requeuing if an update event is pending #527

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 40 additions & 29 deletions pkg/reconciler/managed/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,6 +710,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot get managed resource", "error", err)
return reconcile.Result{}, errors.Wrap(resource.IgnoreNotFound(err), errGetManaged)
}
orig := managed.DeepCopyObject().(resource.Managed)

record := r.record.WithAnnotations("external-name", meta.GetExternalName(managed))
log = log.WithValues(
Expand Down Expand Up @@ -738,7 +739,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
managed.SetConditions(xpv1.ReconcilePaused())
// if the pause annotation is removed or the management policies changed, we will have a chance to reconcile
// again and resume and if status update fails, we will reconcile again to retry to update the status
return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// Check if the ManagementPolicies is set to a non-default value while the
Expand All @@ -753,7 +754,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug(err.Error())
record.Event(managed, event.Warning(reasonManagementPolicyInvalid, err))
managed.SetConditions(xpv1.ReconcileError(err))
return reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// If managed resource has a deletion timestamp and a deletion policy of
Expand All @@ -775,7 +776,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot unpublish connection details", "error", err)
record.Event(managed, event.Warning(reasonCannotUnpublish, err))
managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}
if err := r.managed.RemoveFinalizer(ctx, managed); err != nil {
// If this is the first time we encounter this issue we'll be
Expand All @@ -784,7 +785,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
// backoff.
log.Debug("Cannot remove managed resource finalizer", "error", err)
managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// We've successfully unpublished our managed resource's connection
Expand All @@ -802,7 +803,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot initialize managed resource", "error", err)
record.Event(managed, event.Warning(reasonCannotInitialize, err))
managed.SetConditions(xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// If we started but never completed creation of an external resource we
Expand All @@ -813,7 +814,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug(errCreateIncomplete)
record.Event(managed, event.Warning(reasonCannotInitialize, errors.New(errCreateIncomplete)))
managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.New(errCreateIncomplete)))
return reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: false}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// We resolve any references before observing our external resource because
Expand All @@ -835,7 +836,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot resolve managed resource references", "error", err)
record.Event(managed, event.Warning(reasonCannotResolveRefs, err))
managed.SetConditions(xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}
}

Expand All @@ -849,7 +850,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot connect to provider", "error", err)
record.Event(managed, event.Warning(reasonCannotConnect, err))
managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileConnect)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}
defer func() {
if err := r.external.Disconnect(ctx); err != nil {
Expand All @@ -869,15 +870,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot observe external resource", "error", err)
record.Event(managed, event.Warning(reasonCannotObserve, err))
managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileObserve)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// In the observe-only mode, !observation.ResourceExists will be an error
// case, and we will explicitly return this information to the user.
if !observation.ResourceExists && policy.ShouldOnlyObserve() {
record.Event(managed, event.Warning(reasonCannotObserve, errors.New(errExternalResourceNotExist)))
managed.SetConditions(xpv1.ReconcileError(errors.Wrap(errors.New(errExternalResourceNotExist), errReconcileObserve)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// If this resource has a non-zero creation grace period we want to wait
Expand All @@ -888,7 +889,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
if !observation.ResourceExists && meta.ExternalCreateSucceededDuring(managed, r.creationGracePeriod) {
log.Debug("Waiting for external resource existence to be confirmed")
record.Event(managed, event.Normal(reasonPending, "Waiting for external resource existence to be confirmed"))
return reconcile.Result{Requeue: true}, nil
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, nil)
}

if meta.WasDeleted(managed) {
Expand All @@ -905,7 +906,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot delete external resource", "error", err)
record.Event(managed, event.Warning(reasonCannotDelete, err))
managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(errors.Wrap(err, errReconcileDelete)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// We've successfully requested deletion of our external resource.
Expand All @@ -918,7 +919,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Successfully requested deletion of external resource")
record.Event(managed, event.Normal(reasonDeleted, "Successfully requested deletion of external resource"))
managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileSuccess())
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}
if err := r.managed.UnpublishConnection(ctx, managed, observation.ConnectionDetails); err != nil {
// If this is the first time we encounter this issue we'll be
Expand All @@ -928,7 +929,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot unpublish connection details", "error", err)
record.Event(managed, event.Warning(reasonCannotUnpublish, err))
managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}
if err := r.managed.RemoveFinalizer(ctx, managed); err != nil {
// If this is the first time we encounter this issue we'll be
Expand All @@ -937,15 +938,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
// backoff.
log.Debug("Cannot remove managed resource finalizer", "error", err)
managed.SetConditions(xpv1.Deleting(), xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// We've successfully deleted our external resource (if necessary) and
// removed our finalizer. If we assume we were the only controller that
// added a finalizer to this resource then it should no longer exist and
// thus there is no point trying to update its status.
log.Debug("Successfully deleted managed resource")
return reconcile.Result{Requeue: false}, nil
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: false}, nil)
}

if _, err := r.managed.PublishConnection(ctx, managed, observation.ConnectionDetails); err != nil {
Expand All @@ -955,7 +956,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot publish connection details", "error", err)
record.Event(managed, event.Warning(reasonCannotPublish, err))
managed.SetConditions(xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

if err := r.managed.AddFinalizer(ctx, managed); err != nil {
Expand All @@ -964,7 +965,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
// not, we requeue explicitly, which will trigger backoff.
log.Debug("Cannot add finalizer", "error", err)
managed.SetConditions(xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

if !observation.ResourceExists && policy.ShouldCreate() {
Expand All @@ -980,7 +981,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug(errUpdateManaged, "error", err)
record.Event(managed, event.Warning(reasonCannotUpdateManaged, errors.Wrap(err, errUpdateManaged)))
managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errUpdateManaged)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

creation, err := external.Create(externalCtx, managed)
Expand Down Expand Up @@ -1012,7 +1013,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
}

managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errReconcileCreate)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// In some cases our external-name may be set by Create above.
Expand All @@ -1034,7 +1035,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug(errUpdateManagedAnnotations, "error", err)
record.Event(managed, event.Warning(reasonCannotUpdateManaged, errors.Wrap(err, errUpdateManagedAnnotations)))
managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(errors.Wrap(err, errUpdateManagedAnnotations)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

if _, err := r.managed.PublishConnection(ctx, managed, creation.ConnectionDetails); err != nil {
Expand All @@ -1044,7 +1045,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot publish connection details", "error", err)
record.Event(managed, event.Warning(reasonCannotPublish, err))
managed.SetConditions(xpv1.Creating(), xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// We've successfully created our external resource. In many cases the
Expand All @@ -1054,7 +1055,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Successfully requested creation of external resource")
record.Event(managed, event.Normal(reasonCreated, "Successfully requested creation of external resource"))
managed.SetConditions(xpv1.Creating(), xpv1.ReconcileSuccess())
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

if observation.ResourceLateInitialized && policy.ShouldLateInitialize() {
Expand All @@ -1069,7 +1070,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug(errUpdateManaged, "error", err)
record.Event(managed, event.Warning(reasonCannotUpdateManaged, err))
managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errUpdateManaged)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}
}

Expand All @@ -1082,7 +1083,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
// https://github.com/crossplane/crossplane/issues/289
log.Debug("External resource is up to date", "requeue-after", time.Now().Add(r.pollInterval))
managed.SetConditions(xpv1.ReconcileSuccess())
return reconcile.Result{RequeueAfter: r.pollInterval}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{RequeueAfter: r.pollInterval}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

if observation.Diff != "" {
Expand All @@ -1093,7 +1094,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
if !policy.ShouldUpdate() {
log.Debug("Skipping update due to managementPolicies. Reconciliation succeeded", "requeue-after", time.Now().Add(r.pollInterval))
managed.SetConditions(xpv1.ReconcileSuccess())
return reconcile.Result{RequeueAfter: r.pollInterval}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{RequeueAfter: r.pollInterval}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

update, err := external.Update(externalCtx, managed)
Expand All @@ -1106,7 +1107,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot update external resource")
record.Event(managed, event.Warning(reasonCannotUpdate, err))
managed.SetConditions(xpv1.ReconcileError(errors.Wrap(err, errReconcileUpdate)))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

if _, err := r.managed.PublishConnection(ctx, managed, update.ConnectionDetails); err != nil {
Expand All @@ -1116,7 +1117,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Cannot publish connection details", "error", err)
record.Event(managed, event.Warning(reasonCannotPublish, err))
managed.SetConditions(xpv1.ReconcileError(err))
return reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{Requeue: true}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

// We've successfully updated our external resource. Per the below issue
Expand All @@ -1127,5 +1128,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req reconcile.Request) (resu
log.Debug("Successfully requested update of external resource", "requeue-after", time.Now().Add(r.pollInterval))
record.Event(managed, event.Normal(reasonUpdated, "Successfully requested update of external resource"))
managed.SetConditions(xpv1.ReconcileSuccess())
return reconcile.Result{RequeueAfter: r.pollInterval}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus)
return skipRequeueOnUpdate(orig, managed, reconcile.Result{RequeueAfter: r.pollInterval}, errors.Wrap(r.client.Status().Update(ctx, managed), errUpdateManagedStatus))
}

func skipRequeueOnUpdate(orig, o resource.Managed, result reconcile.Result, err error) (reconcile.Result, error) {
if err == nil && orig.GetResourceVersion() != o.GetResourceVersion() {
// the object has changed. We get a watch event and do not need to
// requeue. This helps to avoid a reconcile on a stale read when the
// informer has not caught up.
Comment on lines +1136 to +1138
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under the impression that controller-runtime would deduplicate multiple events for the same object in the queue. i.e. If we return Requeue: true and a watch event is triggered we'd only reconcile once, not twice - https://github.com/kubernetes-sigs/controller-runtime/blob/c20ea143/pkg/doc.go#L184

Why isn't this working? Is the issue that the watch-triggered reconcile has potentially already been popped from the queue and processed by another goroutine before we return Requeue: true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and a watch event is triggered we'd only reconcile once, not twice

The requeue is instant. The informer has delay.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the issue that the watch-triggered reconcile has potentially already been popped from the queue and processed by another goroutine before we return Requeue: true

No. There is always just one reconcile per key. Only when we return it to the queue, another go routine could take over immediately. But this PR is about the case when there is no event yet, but the key is immediately popped from the queue. That work is both unnecessary and will very likely run into another conflict error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The requeue is instant. The informer has delay.

I think I understand now. You're saying the issue is that we'll requeue so fast (i.e. instantly) that we'll just read the same stale resource from the informer on the next reconcile, and essentially keep doing that until the informer cache is updated.

result.Requeue = false
}
return result, err
}
Loading