Skip to content

Commit

Permalink
feat: support for graceful shutdown based on configuration (#2479)
Browse files Browse the repository at this point in the history
---------

Signed-off-by: 10000-ki <10000ki6472@gmail.com>
  • Loading branch information
10000-ki authored and csviri committed Aug 8, 2024
1 parent 657301f commit dd104ab
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 42 deletions.
12 changes: 12 additions & 0 deletions docs/content/en/docs/patterns-and-best-practices/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,15 @@ might be a permission issue for some resources in another namespace.
The `stopOnInformerErrorDuringStartup` has implication on [cache sync timeout](https://github.com/java-operator-sdk/java-operator-sdk/blob/114c4312c32b34688811df8dd7cea275878c9e73/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L177-L179)
behavior. If true operator will stop on cache sync timeout. if `false`, after the timeout the controller will start
reconcile resources even if one or more event source caches did not sync yet.

## Graceful Shutdown

You can provide sufficient time for the reconciler to process and complete the currently ongoing events before shutting down.
The configuration is simple. You just need to set an appropriate duration value for `reconciliationTerminationTimeout` using `ConfigurationServiceOverrider`.

```java
final var overridden = new ConfigurationServiceOverrider(config)
.withReconciliationTerminationTimeout(Duration.ofSeconds(5));

final var operator = new Operator(overridden);
```
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ private static ConfigurationService initConfigurationService(KubernetesClient cl
@SuppressWarnings("unused")
public void installShutdownHook(Duration gracefulShutdownTimeout) {
if (!leaderElectionManager.isLeaderElectionEnabled()) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout)));
Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
} else {
log.warn("Leader election is on, shutdown hook will not be installed.");
}
Expand Down Expand Up @@ -145,15 +145,18 @@ public synchronized void start() {
}
}

public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
@Override
public void stop() throws OperatorException {
Duration reconciliationTerminationTimeout =
configurationService.reconciliationTerminationTimeout();
if (!started) {
return;
}
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
log.info("Operator SDK {} is shutting down...",
configurationService.getVersion().getSdkVersion());
controllerManager.stop();

configurationService.getExecutorServiceManager().stop(gracefulShutdownTimeout);
configurationService.getExecutorServiceManager().stop(reconciliationTerminationTimeout);
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
getKubernetesClient().close();
Expand All @@ -162,11 +165,6 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
started = false;
}

@Override
public void stop() throws OperatorException {
stop(Duration.ZERO);
}

/**
* Add a registration requests for the specified reconciler with this operator. The effective
* registration of the reconciler is delayed till the operator is started.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,8 @@ static ConfigurationService newOverriddenConfigurationService(
*
* @param reconciler the reconciler we want the configuration of
* @param <R> the {@code CustomResource} type associated with the specified reconciler
* @return the {@link ControllerConfiguration} associated with the specified reconciler or {@code
* null} if no configuration exists for the reconciler
* @return the {@link ControllerConfiguration} associated with the specified reconciler or
* {@code null} if no configuration exists for the reconciler
*/
<R extends HasMetadata> ControllerConfiguration<R> getConfigurationFor(Reconciler<R> reconciler);

Expand Down Expand Up @@ -211,7 +211,7 @@ default int concurrentWorkflowExecutorThreads() {

/**
* Override to provide a custom {@link Metrics} implementation
*
*
* @return the {@link Metrics} implementation
*/
default Metrics getMetrics() {
Expand All @@ -221,7 +221,7 @@ default Metrics getMetrics() {
/**
* Override to provide a custom {@link ExecutorService} implementation to change how threads
* handle concurrent reconciliations
*
*
* @return the {@link ExecutorService} implementation to use for concurrent reconciliation
* processing
*/
Expand All @@ -232,7 +232,7 @@ default ExecutorService getExecutorService() {
/**
* Override to provide a custom {@link ExecutorService} implementation to change how dependent
* workflows are processed in parallel
*
*
* @return the {@link ExecutorService} implementation to use for dependent workflow processing
*/
default ExecutorService getWorkflowExecutorService() {
Expand All @@ -242,7 +242,7 @@ default ExecutorService getWorkflowExecutorService() {
/**
* Determines whether the associated Kubernetes client should be closed when the associated
* {@link io.javaoperatorsdk.operator.Operator} is stopped.
*
*
* @return {@code true} if the Kubernetes should be closed on stop, {@code false} otherwise
*/
default boolean closeClientOnStop() {
Expand All @@ -252,7 +252,7 @@ default boolean closeClientOnStop() {
/**
* Override to provide a custom {@link DependentResourceFactory} implementation to change how
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} are instantiated
*
*
* @return the custom {@link DependentResourceFactory} implementation
*/
@SuppressWarnings("rawtypes")
Expand All @@ -264,7 +264,7 @@ default DependentResourceFactory dependentResourceFactory() {
* Retrieves the optional {@link LeaderElectionConfiguration} to specify how the associated
* {@link io.javaoperatorsdk.operator.Operator} handles leader election to ensure only one
* instance of the operator runs on the cluster at any given time
*
*
* @return the {@link LeaderElectionConfiguration}
*/
default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
Expand Down Expand Up @@ -299,6 +299,17 @@ default Duration cacheSyncTimeout() {
return Duration.ofMinutes(2);
}

/**
* This is the timeout value that allows the reconciliation threads to gracefully shut down. If no
* value is set, the default is immediate shutdown.
*
* @return The duration of time to wait before terminating the reconciliation threads
* @since 5.0.0
*/
default Duration reconciliationTerminationTimeout() {
return Duration.ZERO;
}

/**
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
* a resource that cannot be deserialized.
Expand Down Expand Up @@ -326,7 +337,7 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
* Override to provide a custom {@link ManagedWorkflowFactory} implementation to change how
* {@link io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow} are
* instantiated
*
*
* @return the custom {@link ManagedWorkflowFactory} implementation
*/
@SuppressWarnings("rawtypes")
Expand All @@ -336,7 +347,7 @@ default ManagedWorkflowFactory getWorkflowFactory() {

/**
* Override to provide a custom {@link ExecutorServiceManager} implementation
*
*
* @return the custom {@link ExecutorServiceManager} implementation
*/
default ExecutorServiceManager getExecutorServiceManager() {
Expand All @@ -353,9 +364,8 @@ default ExecutorServiceManager getExecutorServiceManager() {
* SSA based create/update can be still used with the legacy matching, just overriding the match
* method of Kubernetes Dependent Resource.
*
* @since 4.4.0
*
* @return if SSA should be used for dependent resources
* @since 4.4.0
*/
default boolean ssaBasedCreateUpdateMatchForDependentResources() {
return true;
Expand Down Expand Up @@ -383,9 +393,8 @@ default Set<Class<? extends HasMetadata>> defaultNonSSAResource() {
* <p>
* Disable this if you want to react to your own dependent resource updates
*
* @since 4.5.0
*
* @return if special annotation should be used for dependent resource to filter events
* @since 4.5.0
*/
default boolean previousAnnotationForDependentResourcesEventFiltering() {
return true;
Expand All @@ -400,9 +409,8 @@ default boolean previousAnnotationForDependentResourcesEventFiltering() {
* logic, and you want to further minimize the amount of work done / updates issued by the
* operator.
*
* @since 4.5.0
*
* @return if resource version should be parsed (as integer)
* @since 4.5.0
*/
default boolean parseResourceVersionsForEventFilteringAndCaching() {
return false;
Expand All @@ -415,8 +423,8 @@ default boolean parseResourceVersionsForEventFilteringAndCaching() {
*
* @return {@code true} if Server-Side Apply (SSA) should be used when patching the primary
* resources, {@code false} otherwise
* @since 5.0.0
* @see ConfigurationServiceOverrider#withUseSSAToPatchPrimaryResource(boolean)
* @since 5.0.0
*/
default boolean useSSAToPatchPrimaryResource() {
return true;
Expand All @@ -427,18 +435,17 @@ default boolean useSSAToPatchPrimaryResource() {
* Determines whether resources retrieved from caches such as via calls to
* {@link Context#getSecondaryResource(Class)} should be defensively cloned first.
* </p>
*
*
* <p>
* Defensive cloning to prevent problematic cache modifications (modifying the resource would
* otherwise modify the stored copy in the cache) was transparently done in previous JOSDK
* versions. This might have performance consequences and, with the more prevalent use of
* Server-Side Apply, where you should create a new copy of your resource with only modified
* fields, such modifications of these resources are less likely to occur.
* </p>
*
*
* @return {@code true} if resources should be defensively cloned before returning them from
* caches, {@code false} otherwise
*
* @since 5.0.0
*/
default boolean cloneSecondaryResourcesWhenGettingFromCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ public class ConfigurationServiceOverrider {
private InformerStoppedHandler informerStoppedHandler;
private Boolean stopOnInformerErrorDuringStartup;
private Duration cacheSyncTimeout;
private Duration reconciliationTerminationTimeout;
private Boolean ssaBasedCreateUpdateMatchForDependentResources;
private Set<Class<? extends HasMetadata>> defaultNonSSAResource;
private Boolean previousAnnotationForDependentResources;
Expand Down Expand Up @@ -127,6 +128,12 @@ public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTime
return this;
}

public ConfigurationServiceOverrider withReconciliationTerminationTimeout(
Duration reconciliationTerminationTimeout) {
this.reconciliationTerminationTimeout = reconciliationTerminationTimeout;
return this;
}

public ConfigurationServiceOverrider withSSABasedCreateUpdateMatchForDependentResources(
boolean value) {
this.ssaBasedCreateUpdateMatchForDependentResources = value;
Expand Down Expand Up @@ -251,6 +258,12 @@ public Duration cacheSyncTimeout() {
return overriddenValueOrDefault(cacheSyncTimeout, ConfigurationService::cacheSyncTimeout);
}

@Override
public Duration reconciliationTerminationTimeout() {
return overriddenValueOrDefault(reconciliationTerminationTimeout,
ConfigurationService::reconciliationTerminationTimeout);
}

@Override
public boolean ssaBasedCreateUpdateMatchForDependentResources() {
return overriddenValueOrDefault(ssaBasedCreateUpdateMatchForDependentResources,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;

Expand Down Expand Up @@ -63,6 +64,7 @@ public <R extends HasMetadata> R clone(R object) {
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
.withInformerStoppedHandler((informer, ex) -> {
})
.withReconciliationTerminationTimeout(Duration.ofSeconds(30))
.build();

assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
Expand All @@ -77,6 +79,8 @@ public <R extends HasMetadata> R clone(R object) {
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.getInformerStoppedHandler(),
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.reconciliationTerminationTimeout(),
overridden.reconciliationTerminationTimeout());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,21 @@
public class GracefulStopIT {

public static final String TEST_1 = "test1";
public static final String TEST_2 = "test2";

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withConfigurationService(o -> o.withCloseClientOnStop(false))
.withConfigurationService(o -> o.withCloseClientOnStop(false)
.withReconciliationTerminationTimeout(Duration.ofMillis(RECONCILER_SLEEP)))
.withReconciler(new GracefulStopTestReconciler())
.build();

@Test
void stopsGracefullyWIthTimeout() {
testGracefulStop(TEST_1, RECONCILER_SLEEP, 2);
void stopsGracefullyWithTimeoutConfiguration() {
testGracefulStop(TEST_1, 2);
}

@Test
void stopsGracefullyWithExpiredTimeout() {
testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1);
}

private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) {
private void testGracefulStop(String resourceName, int expectedFinalGeneration) {
var testRes = operator.create(testResource(resourceName));
await().untilAsserted(() -> {
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
Expand All @@ -54,7 +49,7 @@ private void testGracefulStop(String resourceName, int stopTimeout, int expected
() -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class)
.getNumberOfExecutions()).isEqualTo(2));

operator.getOperator().stop(Duration.ofMillis(stopTimeout));
operator.getOperator().stop();

await().untilAsserted(() -> {
var r = operator.get(GracefulStopTestCustomResource.class, resourceName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ void beforeEach(TestInfo testInfo) {
@AfterEach
void cleanup() {
if (operator != null) {
operator.stop(Duration.ofSeconds(1));
operator.stop();
}
adminClient.resource(dependentConfigMap()).delete();
adminClient.resource(testCustomResource()).delete();
Expand Down Expand Up @@ -321,6 +321,7 @@ Operator startOperator(boolean stopOnInformerErrorDuringStartup, boolean addStop
co.withKubernetesClient(clientUsingServiceAccount());
co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup);
co.withCacheSyncTimeout(Duration.ofMillis(3000));
co.withReconciliationTerminationTimeout(Duration.ofSeconds(1));
if (addStopHandler) {
co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true);
}
Expand Down

0 comments on commit dd104ab

Please sign in to comment.