Skip to content

Commit

Permalink
App restart is synchronous
Browse files Browse the repository at this point in the history
* Introduce `CFApp.Status.ActualState` to reflect the actual state of
  the app. The start/stop/restart operations waits for that field to
  become equal to the desired app state
* Introduce `CFProcess.Status.ActualInstances` to reflect the actual app
  workload instances
* Introduce `AppWorkload.Status.ActualInstances` to reflect the actual
  statefulset replicas
* The state chain is tested by a test in the `crds` suite
* The existing crd test is deleted as it has become obsolete by the test
  above

fixes #3036

Co-authored-by: Georgi Sabev <georgethebeatle@gmail.com>
  • Loading branch information
danail-branekov and georgethebeatle committed Apr 5, 2024
1 parent 43d794b commit 5beea86
Show file tree
Hide file tree
Showing 42 changed files with 687 additions and 438 deletions.
14 changes: 7 additions & 7 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func main() {
privilegedCRClient,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFOrg, korifiv1alpha1.CFOrgList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFOrg, korifiv1alpha1.CFOrgList](conditionTimeout),
)
spaceRepo := repositories.NewSpaceRepo(
namespaceRetriever,
orgRepo,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFSpace, korifiv1alpha1.CFSpaceList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFSpace, korifiv1alpha1.CFSpaceList](conditionTimeout),
)
processRepo := repositories.NewProcessRepo(
namespaceRetriever,
Expand All @@ -148,7 +148,7 @@ func main() {
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFApp, korifiv1alpha1.CFAppList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFApp, korifiv1alpha1.CFAppList](conditionTimeout),
)
dropletRepo := repositories.NewDropletRepo(
userClientFactory,
Expand Down Expand Up @@ -184,19 +184,19 @@ func main() {
nsPermissions,
toolsregistry.NewRepositoryCreator(cfg.ContainerRegistryType),
cfg.ContainerRepositoryPrefix,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFPackage, korifiv1alpha1.CFPackageList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFPackage, korifiv1alpha1.CFPackageList](conditionTimeout),
)
serviceInstanceRepo := repositories.NewServiceInstanceRepo(
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFServiceInstance, korifiv1alpha1.CFServiceInstanceList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFServiceInstance, korifiv1alpha1.CFServiceInstanceList](conditionTimeout),
)
serviceBindingRepo := repositories.NewServiceBindingRepo(
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFServiceBinding, korifiv1alpha1.CFServiceBindingList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFServiceBinding, korifiv1alpha1.CFServiceBindingList](conditionTimeout),
)
buildpackRepo := repositories.NewBuildpackRepository(cfg.BuilderName,
userClientFactory,
Expand All @@ -223,7 +223,7 @@ func main() {
userClientFactory,
namespaceRetriever,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](conditionTimeout),
)
metricsRepo := repositories.NewMetricsRepo(userClientFactory)

Expand Down
20 changes: 15 additions & 5 deletions api/repositories/app_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ type AppRepo struct {
namespaceRetriever NamespaceRetriever
userClientFactory authorization.UserK8sClientFactory
namespacePermissions *authorization.NamespacePermissions
appConditionAwaiter ConditionAwaiter[*korifiv1alpha1.CFApp]
appAwaiter Awaiter[*korifiv1alpha1.CFApp]
}

func NewAppRepo(
namespaceRetriever NamespaceRetriever,
userClientFactory authorization.UserK8sClientFactory,
authPerms *authorization.NamespacePermissions,
appConditionAwaiter ConditionAwaiter[*korifiv1alpha1.CFApp],
appAwaiter Awaiter[*korifiv1alpha1.CFApp],
) *AppRepo {
return &AppRepo{
namespaceRetriever: namespaceRetriever,
userClientFactory: userClientFactory,
namespacePermissions: authPerms,
appConditionAwaiter: appConditionAwaiter,
appAwaiter: appAwaiter,
}
}

Expand Down Expand Up @@ -431,7 +431,7 @@ func (f *AppRepo) SetCurrentDroplet(ctx context.Context, authInfo authorization.
return CurrentDropletRecord{}, fmt.Errorf("failed to set app droplet: %w", apierrors.FromK8sError(err, AppResourceType))
}

_, err = f.appConditionAwaiter.AwaitCondition(ctx, userClient, cfApp, shared.StatusConditionReady)
_, err = f.appAwaiter.AwaitCondition(ctx, userClient, cfApp, shared.StatusConditionReady)
if err != nil {
return CurrentDropletRecord{}, fmt.Errorf("failed to await the app staged condition: %w", apierrors.FromK8sError(err, AppResourceType))
}
Expand Down Expand Up @@ -462,6 +462,16 @@ func (f *AppRepo) SetAppDesiredState(ctx context.Context, authInfo authorization
return AppRecord{}, fmt.Errorf("failed to set app desired state: %w", apierrors.FromK8sError(err, AppResourceType))
}

if _, err := f.appAwaiter.AwaitState(ctx, userClient, cfApp, func(actualApp *korifiv1alpha1.CFApp) error {
desiredState := korifiv1alpha1.DesiredState(message.DesiredState)
if (actualApp.Spec.DesiredState == desiredState) && (actualApp.Status.ActualState == desiredState) {
return nil
}
return fmt.Errorf("expected actual state to be %s; it is currently %s", message.DesiredState, actualApp.Status.ActualState)
}); err != nil {
return AppRecord{}, apierrors.FromK8sError(err, AppResourceType)
}

return cfAppToAppRecord(*cfApp), nil
}

Expand Down Expand Up @@ -653,7 +663,7 @@ func cfAppToAppRecord(cfApp korifiv1alpha1.CFApp) AppRecord {
DropletGUID: cfApp.Spec.CurrentDropletRef.Name,
Labels: cfApp.Labels,
Annotations: cfApp.Annotations,
State: DesiredState(cfApp.Spec.DesiredState),
State: DesiredState(cfApp.Status.ActualState),
Lifecycle: Lifecycle{
Type: string(cfApp.Spec.Lifecycle.Type),
Data: LifecycleData{
Expand Down
13 changes: 7 additions & 6 deletions api/repositories/app_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"code.cloudfoundry.org/korifi/api/authorization"
apierrors "code.cloudfoundry.org/korifi/api/errors"
. "code.cloudfoundry.org/korifi/api/repositories"
"code.cloudfoundry.org/korifi/api/repositories/fakeawaiter"
korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
"code.cloudfoundry.org/korifi/controllers/controllers/shared"
"code.cloudfoundry.org/korifi/controllers/controllers/workloads/env"
Expand All @@ -37,7 +38,7 @@ const (

var _ = Describe("AppRepository", func() {
var (
conditionAwaiter *FakeAwaiter[
appAwaiter *fakeawaiter.FakeAwaiter[
*korifiv1alpha1.CFApp,
korifiv1alpha1.CFAppList,
*korifiv1alpha1.CFAppList,
Expand All @@ -49,12 +50,12 @@ var _ = Describe("AppRepository", func() {
)

BeforeEach(func() {
conditionAwaiter = &FakeAwaiter[
appAwaiter = &fakeawaiter.FakeAwaiter[
*korifiv1alpha1.CFApp,
korifiv1alpha1.CFAppList,
*korifiv1alpha1.CFAppList,
]{}
appRepo = NewAppRepo(namespaceRetriever, userClientFactory, nsPerms, conditionAwaiter)
appRepo = NewAppRepo(namespaceRetriever, userClientFactory, nsPerms, appAwaiter)

cfOrg = createOrgWithCleanup(ctx, prefixedGUID("org"))
cfSpace = createSpaceWithCleanup(ctx, cfOrg.Name, prefixedGUID("space1"))
Expand Down Expand Up @@ -1115,8 +1116,8 @@ var _ = Describe("AppRepository", func() {
})

It("awaits the ready condition", func() {
Expect(conditionAwaiter.AwaitConditionCallCount()).To(Equal(1))
obj, conditionType := conditionAwaiter.AwaitConditionArgsForCall(0)
Expect(appAwaiter.AwaitConditionCallCount()).To(Equal(1))
obj, conditionType := appAwaiter.AwaitConditionArgsForCall(0)
Expect(obj.GetName()).To(Equal(appGUID))
Expect(obj.GetNamespace()).To(Equal(cfSpace.Name))
Expect(conditionType).To(Equal(shared.StatusConditionReady))
Expand All @@ -1139,7 +1140,7 @@ var _ = Describe("AppRepository", func() {

When("the app never becomes ready", func() {
BeforeEach(func() {
conditionAwaiter.AwaitConditionReturns(&korifiv1alpha1.CFApp{}, errors.New("time-out-err"))
appAwaiter.AwaitConditionReturns(&korifiv1alpha1.CFApp{}, errors.New("time-out-err"))
})

It("returns an error", func() {
Expand Down
21 changes: 16 additions & 5 deletions api/repositories/conditions/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type Awaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]] struc
timeout time.Duration
}

func NewConditionAwaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]](timeout time.Duration) *Awaiter[T, L, PL] {
func NewStateAwaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]](timeout time.Duration) *Awaiter[T, L, PL] {
return &Awaiter[T, L, PL]{
timeout: timeout,
}
}

func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
func (a *Awaiter[T, L, PL]) AwaitState(ctx context.Context, k8sClient client.WithWatch, object client.Object, checkState func(T) error) (T, error) {
var empty T
objList := PL(new(L))

Expand All @@ -47,18 +47,29 @@ func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client
}
defer watch.Stop()

var stateCheckErr error
for e := range watch.ResultChan() {
obj, ok := e.Object.(T)
if !ok {
continue
}

if meta.IsStatusConditionTrue(obj.StatusConditions(), conditionType) {
stateCheckErr = checkState(obj)
if stateCheckErr == nil {
return obj, nil
}
}

return empty, fmt.Errorf("object %s:%s did not get the %s condition within timeout period %d ms",
object.GetNamespace(), object.GetName(), conditionType, a.timeout.Milliseconds(),
return empty, fmt.Errorf("object %s/%s did not match desired state within %d ms: %s",
object.GetNamespace(), object.GetName(), a.timeout.Milliseconds(), stateCheckErr.Error(),
)
}

func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
return a.AwaitState(ctx, k8sClient, object, func(obj T) error {
if meta.IsStatusConditionTrue(obj.StatusConditions(), conditionType) {
return nil
}
return fmt.Errorf("expected the %s condition to be true", conditionType)
})
}
104 changes: 71 additions & 33 deletions api/repositories/conditions/await_test.go
Original file line number Diff line number Diff line change
@@ -1,29 +1,47 @@
package conditions_test

import (
"context"
"errors"
"sync"
"time"

"code.cloudfoundry.org/korifi/api/repositories/conditions"
korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
"code.cloudfoundry.org/korifi/tools/k8s"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Await", func() {
var _ = Describe("StateAwaiter", func() {
var (
awaiter *conditions.Awaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList, *korifiv1alpha1.CFTaskList]
task *korifiv1alpha1.CFTask
awaitedTask *korifiv1alpha1.CFTask
awaitErr error
)

asyncPatchTask := func(patchTask func(*korifiv1alpha1.CFTask)) {
wg := &sync.WaitGroup{}
wg.Add(1)

go func() {
defer GinkgoRecover()
defer wg.Done()

Expect(k8s.Patch(ctx, k8sClient, task, func() {
patchTask(task)
})).To(Succeed())
}()

DeferCleanup(func() {
wg.Wait()
})
}

BeforeEach(func() {
awaiter = conditions.NewConditionAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](time.Second)
awaiter = conditions.NewStateAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](time.Second)
awaitedTask = nil
awaitErr = nil

Expand All @@ -34,48 +52,68 @@ var _ = Describe("Await", func() {
},
}

Expect(k8sClient.Create(context.Background(), task)).To(Succeed())
Expect(k8sClient.Create(ctx, task)).To(Succeed())
})

JustBeforeEach(func() {
awaitedTask, awaitErr = awaiter.AwaitCondition(context.Background(), k8sClient, task, korifiv1alpha1.TaskInitializedConditionType)
})
Describe("AwaitState", func() {
JustBeforeEach(func() {
awaitedTask, awaitErr = awaiter.AwaitState(ctx, k8sClient, task, func(actualTask *korifiv1alpha1.CFTask) error {
if actualTask.Status.DropletRef.Name == "" {
return errors.New("droplet ref not set")
}

It("returns an error as the condition never becomes true", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("did not get the Initialized condition")))
})
return nil
})
})

When("the condition becomes true", func() {
var wg sync.WaitGroup
It("returns an error as the desired state is never reached", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("droplet ref not set")))
})

BeforeEach(func() {
wg.Add(1)
When("the desired state is reached", func() {
BeforeEach(func() {
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
cfTask.Status.DropletRef.Name = "some-droplet"
})
})

go func() {
defer GinkgoRecover()
defer wg.Done()
It("succeeds and returns the updated object", func() {
Expect(awaitErr).NotTo(HaveOccurred())
Expect(awaitedTask).NotTo(BeNil())

taskCopy := task.DeepCopy()
meta.SetStatusCondition(&taskCopy.Status.Conditions, metav1.Condition{
Type: korifiv1alpha1.TaskInitializedConditionType,
Status: metav1.ConditionTrue,
Reason: "initialized",
})
Expect(awaitedTask.Name).To(Equal(task.Name))
Expect(awaitedTask.Status.DropletRef.Name).To(Equal("some-droplet"))
})
})
})

Expect(k8sClient.Status().Patch(context.Background(), taskCopy, client.MergeFrom(task))).To(Succeed())
}()
Describe("AwaitCondition", func() {
JustBeforeEach(func() {
awaitedTask, awaitErr = awaiter.AwaitCondition(ctx, k8sClient, task, korifiv1alpha1.TaskInitializedConditionType)
})

AfterEach(func() {
wg.Wait()
It("returns an error as the condition never becomes true", func() {
Expect(awaitErr).To(MatchError(ContainSubstring("expected the Initialized condition to be true")))
})

It("succeeds and returns the updated object", func() {
Expect(awaitErr).NotTo(HaveOccurred())
Expect(awaitedTask).NotTo(BeNil())
When("the condition becomes true", func() {
BeforeEach(func() {
asyncPatchTask(func(cfTask *korifiv1alpha1.CFTask) {
meta.SetStatusCondition(&cfTask.Status.Conditions, metav1.Condition{
Type: korifiv1alpha1.TaskInitializedConditionType,
Status: metav1.ConditionTrue,
Reason: "initialized",
})
})
})

It("succeeds and returns the updated object", func() {
Expect(awaitErr).NotTo(HaveOccurred())
Expect(awaitedTask).NotTo(BeNil())

Expect(awaitedTask.Name).To(Equal(task.Name))
Expect(meta.IsStatusConditionTrue(awaitedTask.Status.Conditions, korifiv1alpha1.TaskInitializedConditionType)).To(BeTrue())
Expect(awaitedTask.Name).To(Equal(task.Name))
Expect(meta.IsStatusConditionTrue(awaitedTask.Status.Conditions, korifiv1alpha1.TaskInitializedConditionType)).To(BeTrue())
})
})
})
})
6 changes: 4 additions & 2 deletions api/repositories/conditions/conditions_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,14 @@ func TestConditions(t *testing.T) {
}

var (
ctx context.Context
testEnv *envtest.Environment
k8sClient client.WithWatch
namespace string
)

var _ = BeforeSuite(func() {
ctx = context.Background()
logf.SetLogger(zap.New(zap.WriteTo(GinkgoWriter), zap.UseDevMode(true)))

testEnv = &envtest.Environment{
Expand Down Expand Up @@ -58,9 +60,9 @@ var _ = AfterSuite(func() {

var _ = BeforeEach(func() {
namespace = "test-ns-" + uuid.NewString()[:8]
Expect(k8sClient.Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})).To(Succeed())
Expect(k8sClient.Create(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})).To(Succeed())
})

var _ = AfterEach(func() {
Expect(k8sClient.Delete(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})).To(Succeed())
Expect(k8sClient.Delete(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}})).To(Succeed())
})
Loading

0 comments on commit 5beea86

Please sign in to comment.