Skip to content

Commit

Permalink
App restart is synchronous
Browse files Browse the repository at this point in the history
* Introduce `CFApp.Status.ActualState` to reflect the actual state of
  the app. The start/stop/restart operations waits for that field to
  become equal to the desired app state
* Introduce `CFProcess.Status.ActualInstances` to reflect the actual app
  workload instances
* Introduce `AppWorkload.Status.ActualInstances` to reflect the actual
  statefulset replicas
* The state chain is tested by a test in the `crds` suite
* The existing crd test is deleted as it has become obsolete by the test
  above

fixes #3036

Co-authored-by: Georgi Sabev <georgethebeatle@gmail.com>
  • Loading branch information
danail-branekov and georgethebeatle committed Apr 9, 2024
1 parent 3adb162 commit 6a52d81
Show file tree
Hide file tree
Showing 48 changed files with 877 additions and 573 deletions.
14 changes: 7 additions & 7 deletions api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,14 +127,14 @@ func main() {
privilegedCRClient,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFOrg, korifiv1alpha1.CFOrgList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFOrg, korifiv1alpha1.CFOrgList](conditionTimeout),
)
spaceRepo := repositories.NewSpaceRepo(
namespaceRetriever,
orgRepo,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFSpace, korifiv1alpha1.CFSpaceList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFSpace, korifiv1alpha1.CFSpaceList](conditionTimeout),
)
processRepo := repositories.NewProcessRepo(
namespaceRetriever,
Expand All @@ -148,7 +148,7 @@ func main() {
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFApp, korifiv1alpha1.CFAppList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFApp, korifiv1alpha1.CFAppList](conditionTimeout),
)
dropletRepo := repositories.NewDropletRepo(
userClientFactory,
Expand Down Expand Up @@ -184,19 +184,19 @@ func main() {
nsPermissions,
toolsregistry.NewRepositoryCreator(cfg.ContainerRegistryType),
cfg.ContainerRepositoryPrefix,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFPackage, korifiv1alpha1.CFPackageList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFPackage, korifiv1alpha1.CFPackageList](conditionTimeout),
)
serviceInstanceRepo := repositories.NewServiceInstanceRepo(
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFServiceInstance, korifiv1alpha1.CFServiceInstanceList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFServiceInstance, korifiv1alpha1.CFServiceInstanceList](conditionTimeout),
)
serviceBindingRepo := repositories.NewServiceBindingRepo(
namespaceRetriever,
userClientFactory,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFServiceBinding, korifiv1alpha1.CFServiceBindingList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFServiceBinding, korifiv1alpha1.CFServiceBindingList](conditionTimeout),
)
buildpackRepo := repositories.NewBuildpackRepository(cfg.BuilderName,
userClientFactory,
Expand All @@ -223,7 +223,7 @@ func main() {
userClientFactory,
namespaceRetriever,
nsPermissions,
conditions.NewConditionAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](conditionTimeout),
conditions.NewStateAwaiter[*korifiv1alpha1.CFTask, korifiv1alpha1.CFTaskList](conditionTimeout),
)
metricsRepo := repositories.NewMetricsRepo(userClientFactory)

Expand Down
22 changes: 16 additions & 6 deletions api/repositories/app_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,20 +42,20 @@ type AppRepo struct {
namespaceRetriever NamespaceRetriever
userClientFactory authorization.UserK8sClientFactory
namespacePermissions *authorization.NamespacePermissions
appConditionAwaiter ConditionAwaiter[*korifiv1alpha1.CFApp]
appAwaiter Awaiter[*korifiv1alpha1.CFApp]
}

func NewAppRepo(
namespaceRetriever NamespaceRetriever,
userClientFactory authorization.UserK8sClientFactory,
authPerms *authorization.NamespacePermissions,
appConditionAwaiter ConditionAwaiter[*korifiv1alpha1.CFApp],
appAwaiter Awaiter[*korifiv1alpha1.CFApp],
) *AppRepo {
return &AppRepo{
namespaceRetriever: namespaceRetriever,
userClientFactory: userClientFactory,
namespacePermissions: authPerms,
appConditionAwaiter: appConditionAwaiter,
appAwaiter: appAwaiter,
}
}

Expand Down Expand Up @@ -431,7 +431,7 @@ func (f *AppRepo) SetCurrentDroplet(ctx context.Context, authInfo authorization.
return CurrentDropletRecord{}, fmt.Errorf("failed to set app droplet: %w", apierrors.FromK8sError(err, AppResourceType))
}

_, err = f.appConditionAwaiter.AwaitCondition(ctx, userClient, cfApp, shared.StatusConditionReady)
_, err = f.appAwaiter.AwaitCondition(ctx, userClient, cfApp, shared.StatusConditionReady)
if err != nil {
return CurrentDropletRecord{}, fmt.Errorf("failed to await the app staged condition: %w", apierrors.FromK8sError(err, AppResourceType))
}
Expand All @@ -456,12 +456,22 @@ func (f *AppRepo) SetAppDesiredState(ctx context.Context, authInfo authorization
}

err = k8s.PatchResource(ctx, userClient, cfApp, func() {
cfApp.Spec.DesiredState = korifiv1alpha1.DesiredState(message.DesiredState)
cfApp.Spec.DesiredState = korifiv1alpha1.AppState(message.DesiredState)
})
if err != nil {
return AppRecord{}, fmt.Errorf("failed to set app desired state: %w", apierrors.FromK8sError(err, AppResourceType))
}

if _, err := f.appAwaiter.AwaitState(ctx, userClient, cfApp, func(actualApp *korifiv1alpha1.CFApp) error {
desiredState := korifiv1alpha1.AppState(message.DesiredState)
if (actualApp.Spec.DesiredState == desiredState) && (actualApp.Status.ActualState == desiredState) {
return nil
}
return fmt.Errorf("expected actual state to be %s; it is currently %s", message.DesiredState, actualApp.Status.ActualState)
}); err != nil {
return AppRecord{}, apierrors.FromK8sError(err, AppResourceType)
}

return cfAppToAppRecord(*cfApp), nil
}

Expand Down Expand Up @@ -608,7 +618,7 @@ func (m *CreateAppMessage) toCFApp() korifiv1alpha1.CFApp {
},
Spec: korifiv1alpha1.CFAppSpec{
DisplayName: m.Name,
DesiredState: korifiv1alpha1.DesiredState(m.State),
DesiredState: korifiv1alpha1.AppState(m.State),
EnvSecretName: GenerateEnvSecretName(guid),
Lifecycle: korifiv1alpha1.Lifecycle{
Type: korifiv1alpha1.LifecycleType(m.Lifecycle.Type),
Expand Down
40 changes: 24 additions & 16 deletions api/repositories/app_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"code.cloudfoundry.org/korifi/api/authorization"
apierrors "code.cloudfoundry.org/korifi/api/errors"
. "code.cloudfoundry.org/korifi/api/repositories"
"code.cloudfoundry.org/korifi/api/repositories/fakeawaiter"
korifiv1alpha1 "code.cloudfoundry.org/korifi/controllers/api/v1alpha1"
"code.cloudfoundry.org/korifi/controllers/controllers/shared"
"code.cloudfoundry.org/korifi/controllers/controllers/workloads/env"
Expand All @@ -37,7 +38,7 @@ const (

var _ = Describe("AppRepository", func() {
var (
conditionAwaiter *FakeAwaiter[
appAwaiter *fakeawaiter.FakeAwaiter[
*korifiv1alpha1.CFApp,
korifiv1alpha1.CFAppList,
*korifiv1alpha1.CFAppList,
Expand All @@ -49,12 +50,12 @@ var _ = Describe("AppRepository", func() {
)

BeforeEach(func() {
conditionAwaiter = &FakeAwaiter[
appAwaiter = &fakeawaiter.FakeAwaiter[
*korifiv1alpha1.CFApp,
korifiv1alpha1.CFAppList,
*korifiv1alpha1.CFAppList,
]{}
appRepo = NewAppRepo(namespaceRetriever, userClientFactory, nsPerms, conditionAwaiter)
appRepo = NewAppRepo(namespaceRetriever, userClientFactory, nsPerms, appAwaiter)

cfOrg = createOrgWithCleanup(ctx, prefixedGUID("org"))
cfSpace = createSpaceWithCleanup(ctx, cfOrg.Name, prefixedGUID("space1"))
Expand Down Expand Up @@ -1115,8 +1116,8 @@ var _ = Describe("AppRepository", func() {
})

It("awaits the ready condition", func() {
Expect(conditionAwaiter.AwaitConditionCallCount()).To(Equal(1))
obj, conditionType := conditionAwaiter.AwaitConditionArgsForCall(0)
Expect(appAwaiter.AwaitConditionCallCount()).To(Equal(1))
obj, conditionType := appAwaiter.AwaitConditionArgsForCall(0)
Expect(obj.GetName()).To(Equal(appGUID))
Expect(obj.GetNamespace()).To(Equal(cfSpace.Name))
Expect(conditionType).To(Equal(shared.StatusConditionReady))
Expand All @@ -1139,7 +1140,7 @@ var _ = Describe("AppRepository", func() {

When("the app never becomes ready", func() {
BeforeEach(func() {
conditionAwaiter.AwaitConditionReturns(&korifiv1alpha1.CFApp{}, errors.New("time-out-err"))
appAwaiter.AwaitConditionReturns(&korifiv1alpha1.CFApp{}, errors.New("time-out-err"))
})

It("returns an error", func() {
Expand Down Expand Up @@ -1167,7 +1168,6 @@ var _ = Describe("AppRepository", func() {

Describe("SetDesiredState", func() {
const (
appName = "some-app"
appStartedValue = "STARTED"
appStoppedValue = "STOPPED"
)
Expand All @@ -1186,8 +1186,10 @@ var _ = Describe("AppRepository", func() {
})

JustBeforeEach(func() {
appGUID = uuid.NewString()
_ = createAppCR(ctx, k8sClient, appName, appGUID, cfSpace.Name, initialAppState)
appGUID = cfApp.Name
Expect(k8s.PatchResource(ctx, k8sClient, cfApp, func() {
cfApp.Spec.DesiredState = korifiv1alpha1.AppState(initialAppState)
})).To(Succeed())
appRecord, err := appRepo.SetAppDesiredState(ctx, authInfo, SetAppDesiredStateMessage{
AppGUID: appGUID,
SpaceGUID: cfSpace.Name,
Expand All @@ -1213,9 +1215,15 @@ var _ = Describe("AppRepository", func() {

It("returns the updated app record", func() {
Expect(returnedAppRecord.GUID).To(Equal(appGUID))
Expect(returnedAppRecord.Name).To(Equal(appName))
Expect(returnedAppRecord.Name).To(Equal(cfApp.Spec.DisplayName))
Expect(returnedAppRecord.SpaceGUID).To(Equal(cfSpace.Name))
Expect(returnedAppRecord.State).To(Equal(DesiredState("STARTED")))
})

It("waits for the desired state", func() {
Expect(appAwaiter.AwaitStateCallCount()).To(Equal(1))
actualCFApp := appAwaiter.AwaitStateArgsForCall(0)
Expect(actualCFApp.GetName()).To(Equal(cfApp.Name))
Expect(actualCFApp.GetNamespace()).To(Equal(cfApp.Namespace))
})

It("changes the desired state of the App", func() {
Expand All @@ -1235,11 +1243,11 @@ var _ = Describe("AppRepository", func() {
Expect(returnedErr).ToNot(HaveOccurred())
})

It("returns the updated app record", func() {
Expect(returnedAppRecord.GUID).To(Equal(appGUID))
Expect(returnedAppRecord.Name).To(Equal(appName))
Expect(returnedAppRecord.SpaceGUID).To(Equal(cfSpace.Name))
Expect(returnedAppRecord.State).To(Equal(DesiredState("STOPPED")))
It("waits for the desired state", func() {
Expect(appAwaiter.AwaitStateCallCount()).To(Equal(1))
actualCFApp := appAwaiter.AwaitStateArgsForCall(0)
Expect(actualCFApp.GetName()).To(Equal(cfApp.Name))
Expect(actualCFApp.GetNamespace()).To(Equal(cfApp.Namespace))
})

It("changes the desired state of the App", func() {
Expand Down
21 changes: 16 additions & 5 deletions api/repositories/conditions/await.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,13 @@ type Awaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]] struc
timeout time.Duration
}

func NewConditionAwaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]](timeout time.Duration) *Awaiter[T, L, PL] {
func NewStateAwaiter[T RuntimeObjectWithStatusConditions, L any, PL ObjectList[L]](timeout time.Duration) *Awaiter[T, L, PL] {
return &Awaiter[T, L, PL]{
timeout: timeout,
}
}

func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
func (a *Awaiter[T, L, PL]) AwaitState(ctx context.Context, k8sClient client.WithWatch, object client.Object, checkState func(T) error) (T, error) {
var empty T
objList := PL(new(L))

Expand All @@ -47,18 +47,29 @@ func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client
}
defer watch.Stop()

var stateCheckErr error
for e := range watch.ResultChan() {
obj, ok := e.Object.(T)
if !ok {
continue
}

if meta.IsStatusConditionTrue(obj.StatusConditions(), conditionType) {
stateCheckErr = checkState(obj)
if stateCheckErr == nil {
return obj, nil
}
}

return empty, fmt.Errorf("object %s:%s did not get the %s condition within timeout period %d ms",
object.GetNamespace(), object.GetName(), conditionType, a.timeout.Milliseconds(),
return empty, fmt.Errorf("object %s/%s did not match desired state within %d ms: %s",
object.GetNamespace(), object.GetName(), a.timeout.Milliseconds(), stateCheckErr.Error(),
)
}

func (a *Awaiter[T, L, PL]) AwaitCondition(ctx context.Context, k8sClient client.WithWatch, object client.Object, conditionType string) (T, error) {
return a.AwaitState(ctx, k8sClient, object, func(obj T) error {
if meta.IsStatusConditionTrue(obj.StatusConditions(), conditionType) {
return nil
}
return fmt.Errorf("expected the %s condition to be true", conditionType)
})
}
Loading

0 comments on commit 6a52d81

Please sign in to comment.