Skip to content

Commit

Permalink
Run isolated tasks last (#4004)
Browse files Browse the repository at this point in the history
Rather  than executing tasks requiring the global read-write lock 
while forked tasks are still being executed and thus causing contention,
such isolated tasks are now executed after all other work is done.

Resolves #3928.
  • Loading branch information
marcphilipp committed Sep 18, 2024
1 parent 40907ee commit c8496a2
Show file tree
Hide file tree
Showing 3 changed files with 80 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ on GitHub.
[[release-notes-5.11.1-junit-platform-new-features-and-improvements]]
==== New Features and Improvements

* ❓
* Improve parallelism and reduce number of blocked threads used by
`HierarchicalTestEngine` implementations when parallel execution is enabled and the
global read-write lock is used.


[[release-notes-5.11.1-junit-jupiter]]
Expand All @@ -51,7 +53,8 @@ on GitHub.
[[release-notes-5.11.1-junit-jupiter-new-features-and-improvements]]
==== New Features and Improvements

* ❓
* Improve parallelism and reduce number of blocked threads in the presence of `@Isolated`
tests when parallel execution is enabled


[[release-notes-5.11.1-junit-vintage]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.apiguardian.api.API.Status.STABLE;
import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.CONCURRENT;
import static org.junit.platform.engine.support.hierarchical.Node.ExecutionMode.SAME_THREAD;

import java.lang.Thread.UncaughtExceptionHandler;
import java.lang.reflect.Constructor;
Expand All @@ -38,6 +39,7 @@
import org.junit.platform.commons.logging.LoggerFactory;
import org.junit.platform.commons.util.ExceptionUtils;
import org.junit.platform.engine.ConfigurationParameters;
import org.junit.platform.engine.support.hierarchical.SingleLock.GlobalReadWriteLock;

/**
* A {@link ForkJoinPool}-based
Expand Down Expand Up @@ -155,29 +157,34 @@ public void invokeAll(List<? extends TestTask> tasks) {
new ExclusiveTask(tasks.get(0)).execSync();
return;
}
Deque<ExclusiveTask> nonConcurrentTasks = new LinkedList<>();
Deque<ExclusiveTask> isolatedTasks = new LinkedList<>();
Deque<ExclusiveTask> sameThreadTasks = new LinkedList<>();
Deque<ExclusiveTask> concurrentTasksInReverseOrder = new LinkedList<>();
forkConcurrentTasks(tasks, nonConcurrentTasks, concurrentTasksInReverseOrder);
executeNonConcurrentTasks(nonConcurrentTasks);
forkConcurrentTasks(tasks, isolatedTasks, sameThreadTasks, concurrentTasksInReverseOrder);
executeSync(sameThreadTasks);
joinConcurrentTasksInReverseOrderToEnableWorkStealing(concurrentTasksInReverseOrder);
executeSync(isolatedTasks);
}

private void forkConcurrentTasks(List<? extends TestTask> tasks, Deque<ExclusiveTask> nonConcurrentTasks,
Deque<ExclusiveTask> concurrentTasksInReverseOrder) {
private void forkConcurrentTasks(List<? extends TestTask> tasks, Deque<ExclusiveTask> isolatedTasks,
Deque<ExclusiveTask> sameThreadTasks, Deque<ExclusiveTask> concurrentTasksInReverseOrder) {
for (TestTask testTask : tasks) {
ExclusiveTask exclusiveTask = new ExclusiveTask(testTask);
if (testTask.getExecutionMode() == CONCURRENT) {
exclusiveTask.fork();
concurrentTasksInReverseOrder.addFirst(exclusiveTask);
if (testTask.getResourceLock() instanceof GlobalReadWriteLock) {
isolatedTasks.add(exclusiveTask);
}
else if (testTask.getExecutionMode() == SAME_THREAD) {
sameThreadTasks.add(exclusiveTask);
}
else {
nonConcurrentTasks.add(exclusiveTask);
exclusiveTask.fork();
concurrentTasksInReverseOrder.addFirst(exclusiveTask);
}
}
}

private void executeNonConcurrentTasks(Deque<ExclusiveTask> nonConcurrentTasks) {
for (ExclusiveTask task : nonConcurrentTasks) {
private void executeSync(Deque<ExclusiveTask> tasks) {
for (ExclusiveTask task : tasks) {
task.execSync();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@
import static org.junit.jupiter.api.parallel.ExecutionMode.SAME_THREAD;
import static org.junit.jupiter.engine.Constants.DEFAULT_CLASSES_EXECUTION_MODE_PROPERTY_NAME;
import static org.junit.jupiter.engine.Constants.DEFAULT_PARALLEL_EXECUTION_MODE;
import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_FIXED_MAX_POOL_SIZE_PROPERTY_NAME;
import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_FIXED_PARALLELISM_PROPERTY_NAME;
import static org.junit.jupiter.engine.Constants.PARALLEL_CONFIG_STRATEGY_PROPERTY_NAME;
import static org.junit.jupiter.engine.Constants.PARALLEL_EXECUTION_ENABLED_PROPERTY_NAME;
import static org.junit.platform.launcher.core.LauncherDiscoveryRequestBuilder.request;
import static org.junit.platform.commons.util.CollectionUtils.getOnlyElement;
import static org.junit.platform.testkit.engine.EventConditions.container;
import static org.junit.platform.testkit.engine.EventConditions.event;
import static org.junit.platform.testkit.engine.EventConditions.finishedSuccessfully;
Expand Down Expand Up @@ -65,15 +66,18 @@
import org.junit.jupiter.api.parallel.Isolated;
import org.junit.jupiter.api.parallel.ResourceLock;
import org.junit.platform.engine.TestDescriptor;
import org.junit.platform.engine.discovery.ClassSelector;
import org.junit.platform.engine.discovery.DiscoverySelectors;
import org.junit.platform.engine.reporting.ReportEntry;
import org.junit.platform.engine.support.descriptor.MethodSource;
import org.junit.platform.testkit.engine.EngineExecutionResults;
import org.junit.platform.testkit.engine.EngineTestKit;
import org.junit.platform.testkit.engine.Event;

/**
* @since 1.3
*/
@SuppressWarnings({ "JUnitMalformedDeclaration", "NewClassNamingConvention" })
class ParallelExecutionIntegrationTests {

@Test
Expand Down Expand Up @@ -250,6 +254,35 @@ void canRunTestsIsolatedFromEachOtherAcrossClassesWithOtherResourceLocks() {
assertThat(events.stream().filter(event(test(), finishedWithFailure())::matches)).isEmpty();
}

@Test
void runsIsolatedTestsLastToMaximizeParallelism() {
var configParams = Map.of( //
DEFAULT_PARALLEL_EXECUTION_MODE, "concurrent", //
PARALLEL_CONFIG_FIXED_MAX_POOL_SIZE_PROPERTY_NAME, "3" //
);
Class<?>[] testClasses = { IsolatedTestCase.class, SuccessfulParallelTestCase.class };
var events = executeWithFixedParallelism(3, configParams, testClasses) //
.allEvents();

assertThat(events.stream().filter(event(test(), finishedWithFailure())::matches)).isEmpty();

List<Event> parallelTestMethodEvents = events.reportingEntryPublished() //
.filter(e -> e.getTestDescriptor().getSource() //
.filter(it -> //
it instanceof MethodSource
&& SuccessfulParallelTestCase.class.equals(((MethodSource) it).getJavaClass()) //
).isPresent() //
) //
.toList();
assertThat(ThreadReporter.getThreadNames(parallelTestMethodEvents)).hasSize(3);

var parallelClassFinish = getOnlyElement(getTimestampsFor(events.list(),
event(container(SuccessfulParallelTestCase.class), finishedSuccessfully())));
var isolatedClassStart = getOnlyElement(
getTimestampsFor(events.list(), event(container(IsolatedTestCase.class), started())));
assertThat(isolatedClassStart).isAfterOrEqualTo(parallelClassFinish);
}

@Isolated("testing")
static class IsolatedTestCase {
static AtomicInteger sharedResource;
Expand Down Expand Up @@ -384,27 +417,30 @@ private List<Instant> getTimestampsFor(List<Event> events, Condition<Event> cond
}

private List<Event> executeConcurrently(int parallelism, Class<?>... testClasses) {
return executeWithFixedParallelism(parallelism, Map.of(DEFAULT_PARALLEL_EXECUTION_MODE, "concurrent"),
testClasses).allEvents().list();
Map<String, String> configParams = Map.of(DEFAULT_PARALLEL_EXECUTION_MODE, "concurrent");
return executeWithFixedParallelism(parallelism, configParams, testClasses) //
.allEvents() //
.list();
}

private EngineExecutionResults executeWithFixedParallelism(int parallelism, Map<String, String> configParams,
Class<?>... testClasses) {
// @formatter:off
var discoveryRequest = request()
.selectors(Arrays.stream(testClasses).map(DiscoverySelectors::selectClass).collect(toList()))
.configurationParameter(PARALLEL_EXECUTION_ENABLED_PROPERTY_NAME, String.valueOf(true))
.configurationParameter(PARALLEL_CONFIG_STRATEGY_PROPERTY_NAME, "fixed")
.configurationParameter(PARALLEL_CONFIG_FIXED_PARALLELISM_PROPERTY_NAME, String.valueOf(parallelism))
.configurationParameters(configParams)
.build();
// @formatter:on
return EngineTestKit.execute("junit-jupiter", discoveryRequest);
var classSelectors = Arrays.stream(testClasses) //
.map(DiscoverySelectors::selectClass) //
.toArray(ClassSelector[]::new);
return EngineTestKit.engine("junit-jupiter") //
.selectors(classSelectors) //
.configurationParameter(PARALLEL_EXECUTION_ENABLED_PROPERTY_NAME, String.valueOf(true)) //
.configurationParameter(PARALLEL_CONFIG_STRATEGY_PROPERTY_NAME, "fixed") //
.configurationParameter(PARALLEL_CONFIG_FIXED_PARALLELISM_PROPERTY_NAME, String.valueOf(parallelism)) //
.configurationParameters(configParams) //
.execute();
}

// -------------------------------------------------------------------------

@ExtendWith(ThreadReporter.class)
@Execution(SAME_THREAD)
static class SuccessfulParallelTestCase {

static AtomicInteger sharedResource;
Expand All @@ -417,16 +453,19 @@ static void initialize() {
}

@Test
@Execution(CONCURRENT)
void firstTest() throws Exception {
incrementAndBlock(sharedResource, countDownLatch);
}

@Test
@Execution(CONCURRENT)
void secondTest() throws Exception {
incrementAndBlock(sharedResource, countDownLatch);
}

@Test
@Execution(CONCURRENT)
void thirdTest() throws Exception {
incrementAndBlock(sharedResource, countDownLatch);
}
Expand Down Expand Up @@ -782,6 +821,7 @@ private static void incrementBlockAndCheck(AtomicInteger sharedResource, CountDo
assertEquals(value, sharedResource.get());
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private static int incrementAndBlock(AtomicInteger sharedResource, CountDownLatch countDownLatch)
throws InterruptedException {
var value = sharedResource.incrementAndGet();
Expand All @@ -790,6 +830,7 @@ private static int incrementAndBlock(AtomicInteger sharedResource, CountDownLatc
return value;
}

@SuppressWarnings("ResultOfMethodCallIgnored")
private static void storeAndBlockAndCheck(AtomicInteger sharedResource, CountDownLatch countDownLatch)
throws InterruptedException {
var value = sharedResource.get();
Expand All @@ -798,18 +839,18 @@ private static void storeAndBlockAndCheck(AtomicInteger sharedResource, CountDow
assertEquals(value, sharedResource.get());
}

/**
/*
* To simulate tests running in parallel tests will modify a shared
* resource, simulate work by waiting, then check if the shared resource was
* not modified by any other thread.
*
* Depending on system performance the simulation of work needs to be longer
* on slower systems to ensure tests can run in parallel.
*
* Currently CI is known to be slow.
* Currently, CI is known to be slow.
*/
private static long estimateSimulatedTestDurationInMiliseconds() {
var runningInCi = Boolean.valueOf(System.getenv("CI"));
var runningInCi = Boolean.parseBoolean(System.getenv("CI"));
return runningInCi ? 1000 : 100;
}

Expand Down

0 comments on commit c8496a2

Please sign in to comment.