Skip to content

Commit

Permalink
Refactor task summary params and added unit tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sumit Bansal <sumitsb@amazon.com>
  • Loading branch information
sumitasr committed Jul 18, 2024
1 parent 9a58edc commit f3eaeb4
Show file tree
Hide file tree
Showing 4 changed files with 192 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -221,15 +222,10 @@ protected void onTimeout(List<? extends BatchedTask> tasks, TimeValue timeout) {
}

@Override
protected void run(
Object batchingKey,
List<? extends BatchedTask> tasks,
Supplier<String> tasksSummarySupplier,
String tasksShortSummary
) {
protected void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator) {
ClusterStateTaskExecutor<Object> taskExecutor = (ClusterStateTaskExecutor<Object>) batchingKey;
List<UpdateTask> updateTasks = (List<UpdateTask>) tasks;
runTasks(new TaskInputs(taskExecutor, updateTasks, tasksSummarySupplier, tasksShortSummary));
runTasks(new TaskInputs(taskExecutor, updateTasks, taskSummaryGenerator));
}

class UpdateTask extends BatchedTask {
Expand Down Expand Up @@ -302,8 +298,8 @@ public static boolean assertNotMasterUpdateThread(String reason) {
}

private void runTasks(TaskInputs taskInputs) {
final String longSummary = logger.isTraceEnabled() ? taskInputs.summarySupplier.get() : "";
final String shortSummary = taskInputs.shortSummary;
final String longSummary = logger.isTraceEnabled() ? taskInputs.taskSummaryGenerator.apply(true) : "";
final String shortSummary = taskInputs.taskSummaryGenerator.apply(false);

if (!lifecycle.started()) {
logger.debug("processing [{}]: ignoring, cluster-manager service not started", shortSummary);
Expand Down Expand Up @@ -464,8 +460,8 @@ private void handleException(String summary, long startTimeMillis, ClusterState
// TODO: do we want to call updateTask.onFailure here?
}

private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String longSummary) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, longSummary);
private TaskOutputs calculateTaskOutputs(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult = executeTasks(taskInputs, previousClusterState, taskSummary);
ClusterState newClusterState = patchVersions(previousClusterState, clusterTasksResult);
return new TaskOutputs(
taskInputs,
Expand Down Expand Up @@ -909,7 +905,7 @@ public void onTimeout() {
}
}

private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String longSummary) {
private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterState previousClusterState, String taskSummary) {
ClusterTasksResult<Object> clusterTasksResult;
try {
List<Object> inputs = taskInputs.updateTasks.stream().map(tUpdateTask -> tUpdateTask.task).collect(Collectors.toList());
Expand All @@ -925,7 +921,7 @@ private ClusterTasksResult<Object> executeTasks(TaskInputs taskInputs, ClusterSt
"failed to execute cluster state update (on version: [{}], uuid: [{}]) for [{}]\n{}{}{}",
previousClusterState.version(),
previousClusterState.stateUUID(),
longSummary,
taskSummary,
previousClusterState.nodes(),
previousClusterState.routingTable(),
previousClusterState.getRoutingNodes()
Expand Down Expand Up @@ -970,19 +966,16 @@ private class TaskInputs {

final List<Batcher.UpdateTask> updateTasks;
final ClusterStateTaskExecutor<Object> executor;
final Supplier<String> summarySupplier;
final String shortSummary;
final Function<Boolean, String> taskSummaryGenerator;

TaskInputs(
ClusterStateTaskExecutor<Object> executor,
List<Batcher.UpdateTask> updateTasks,
Supplier<String> summarySupplier,
String shortSummary
final Function<Boolean, String> taskSummaryGenerator
) {
this.executor = executor;
this.updateTasks = updateTasks;
this.summarySupplier = summarySupplier;
this.shortSummary = shortSummary;
this.taskSummaryGenerator = taskSummaryGenerator;
}

boolean runOnlyWhenClusterManager() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand Down Expand Up @@ -196,13 +195,17 @@ void runIfNotProcessed(BatchedTask updateTask) {
}

if (toExecute.isEmpty() == false) {
final Supplier<String> tasksSummarySupplier = () -> processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
final String tasksShortSummary = buildShortSummary(updateTask.batchingKey);
Function<Boolean, String> taskSummaryGenerator = (longSummaryRequired) -> {
if (longSummaryRequired == null || !longSummaryRequired) {
return buildShortSummary(updateTask.batchingKey);
}
return processTasksBySource.entrySet().stream().map(entry -> {
String tasks = updateTask.describeTasks(entry.getValue());
return tasks.isEmpty() ? entry.getKey() : entry.getKey() + "[" + tasks + "]";
}).reduce((s1, s2) -> s1 + ", " + s2).orElse("");
};
taskBatcherListener.onBeginProcessing(toExecute);
run(updateTask.batchingKey, toExecute, tasksSummarySupplier, tasksShortSummary);
run(updateTask.batchingKey, toExecute, taskSummaryGenerator);
}
}
}
Expand All @@ -215,12 +218,7 @@ private String buildShortSummary(final Object batchingKey) {
* Action to be implemented by the specific batching implementation
* All tasks have the given batching key.
*/
protected abstract void run(
Object batchingKey,
List<? extends BatchedTask> tasks,
Supplier<String> tasksSummary,
String tasksShortSummary
);
protected abstract void run(Object batchingKey, List<? extends BatchedTask> tasks, Function<Boolean, String> taskSummaryGenerator);

/**
* Represents a runnable task that supports batching.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ public void onFailure(String source, Exception e) {}
}

@TestLogging(value = "org.opensearch.cluster.service:TRACE", reason = "to ensure that we log cluster state events on TRACE level")
public void testClusterStateUpdateLogging() throws Exception {
public void testClusterStateUpdateLoggingWithTraceEnabled() throws Exception {
try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) {
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
Expand Down Expand Up @@ -540,6 +540,171 @@ public void onFailure(String source, Exception e) {
}
}

@TestLogging(value = "org.opensearch.cluster.service:DEBUG", reason = "to ensure that we log cluster state events on DEBUG level")
public void testClusterStateUpdateLoggingWithDebugEnabled() throws Exception {
try (MockLogAppender mockAppender = MockLogAppender.createForLoggers(LogManager.getLogger(MasterService.class))) {
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1 start",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [1s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test1 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);

mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 start",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);
mockAppender.addExpectation(
new MockLogAppender.UnseenEventExpectation(
"test2 failure",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"failed to execute cluster state update (on version: [*], uuid: [*]) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]*"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [2s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test2 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [0s] to notify listeners on unchanged cluster state for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);

mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3 start",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3 computation",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [3s] to compute cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);
mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test3 notification",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"took [4s] to notify listeners on successful publication of cluster state (version: *, uuid: *) for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);

mockAppender.addExpectation(
new MockLogAppender.SeenEventExpectation(
"test4",
MasterService.class.getCanonicalName(),
Level.DEBUG,
"executing cluster state update for [Tasks batched with key: org.opensearch.cluster.service.MasterServiceTests]"
)
);

try (ClusterManagerService clusterManagerService = createClusterManagerService(true)) {
clusterManagerService.submitStateUpdateTask("test1", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
timeDiffInMillis += TimeValue.timeValueSeconds(1).millis();
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {}

@Override
public void onFailure(String source, Exception e) {
fail();
}
});
clusterManagerService.submitStateUpdateTask("test2", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
timeDiffInMillis += TimeValue.timeValueSeconds(2).millis();
throw new IllegalArgumentException("Testing handling of exceptions in the cluster state task");
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
fail();
}

@Override
public void onFailure(String source, Exception e) {}
});
clusterManagerService.submitStateUpdateTask("test3", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
timeDiffInMillis += TimeValue.timeValueSeconds(3).millis();
return ClusterState.builder(currentState).incrementVersion().build();
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
timeDiffInMillis += TimeValue.timeValueSeconds(4).millis();
}

@Override
public void onFailure(String source, Exception e) {
fail();
}
});
clusterManagerService.submitStateUpdateTask("test4", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
}

@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {}

@Override
public void onFailure(String source, Exception e) {
fail();
}
});
assertBusy(mockAppender::assertAllExpectationsMatched);
// verify stats values after state is published
assertEquals(1, clusterManagerService.getClusterStateStats().getUpdateSuccess());
assertEquals(0, clusterManagerService.getClusterStateStats().getUpdateFailed());
}
}
}

public void testClusterStateBatchedUpdates() throws BrokenBarrierException, InterruptedException {
AtomicInteger counter = new AtomicInteger();
class Task {
Expand Down Expand Up @@ -1237,7 +1402,7 @@ public void onFailure(String source, Exception e) {
});
// Additional update task to make sure all previous logging made it to the loggerName
// We don't check logging for this on since there is no guarantee that it will occur before our check
clusterManagerService.submitStateUpdateTask("test7", new ClusterStateUpdateTask() {
clusterManagerService.submitStateUpdateTask("test6", new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) {
return currentState;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -82,8 +83,7 @@ static class TestTaskBatcher extends TaskBatcher {
protected void run(
Object batchingKey,
List<? extends BatchedTask> tasks,
Supplier<String> tasksSummarySupplier,
String tasksShortSummary
Function<Boolean, String> taskSummaryGenerator
) {
List<UpdateTask> updateTasks = (List) tasks;
((TestExecutor) batchingKey).execute(updateTasks.stream().map(t -> t.task).collect(Collectors.toList()));
Expand Down

0 comments on commit f3eaeb4

Please sign in to comment.