Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-14334] Remove remaining forkEvery 1 from all Spark tests and stop mixing unit tests with runner validations. #17662

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 22 additions & 62 deletions runners/spark/spark_runner.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,17 @@ configurations {
examplesJavaIntegrationTest
}

def sparkTestProperties(overrides = [:]) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it just refactoring (extract method)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes 👍

def defaults = ["--runner": "TestSparkRunner"]
[
"spark.sql.shuffle.partitions": "4",
"spark.ui.enabled" : "false",
"spark.ui.showConsoleProgress": "false",
"beamTestPipelineOptions" :
JsonOutput.toJson((defaults + overrides).collect { k, v -> "$k=$v" })
]
}

def hadoopVersions = [
"285" : "2.8.5",
"292" : "2.9.2",
Expand Down Expand Up @@ -94,14 +105,7 @@ if (copySourceBase) {
}

test {
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beamTestPipelineOptions", """[
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=true"
]"""
systemProperties sparkTestProperties()
systemProperty "log4j.configuration", "log4j-test.properties"
// Change log level to debug:
// systemProperty "org.slf4j.simpleLogger.defaultLogLevel", "debug"
Expand All @@ -113,10 +117,6 @@ test {
}

maxParallelForks 4
useJUnit {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why it was removed?

Copy link
Member Author

@mosche mosche May 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously these unit tests were excluded during test runs because they failed. instead these were run as part of validatesRunnerStreaming.
I fixed them so they can be run as normal unit tests, so this exclusion isn't needed any more.

excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
}

// easily re-run all tests (to deal with flaky tests / SparkContext leaks)
if(project.hasProperty("rerun-tests")) { outputs.upToDateWhen {false} }
Expand Down Expand Up @@ -218,29 +218,17 @@ def validatesRunnerBatch = tasks.register("validatesRunnerBatch", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=false",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperties sparkTestProperties(["--enableSparkMetricSinks":"false"])

classpath = configurations.validatesRunner
testClassesDirs = files(
project(":sdks:java:core").sourceSets.test.output.classesDirs,
project(":runners:core-java").sourceSets.test.output.classesDirs,
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All unit tests should be run as part of the test task


// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
useJUnit {
includeCategories 'org.apache.beam.sdk.testing.ValidatesRunner'
includeCategories 'org.apache.beam.runners.spark.UsesCheckpointRecovery'
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unit test!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you mean?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as said above, tests of this custom category are normal unit tests and are already run during test. there's no runner validation for such a category

// Should be run only in a properly configured SDK harness environment
excludeCategories 'org.apache.beam.sdk.testing.UsesSdkHarnessEnvironment'
excludeCategories 'org.apache.beam.sdk.testing.UsesCustomWindowMerging'
Expand Down Expand Up @@ -293,15 +281,7 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=SparkStructuredStreamingRunner",
"--testMode=true",
"--streaming=false",
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperties sparkTestProperties(["--runner":"SparkStructuredStreamingRunner", "--testMode":"true"])

classpath = configurations.validatesRunner
testClassesDirs = files(
Expand All @@ -310,8 +290,6 @@ tasks.register("validatesStructuredStreamingRunnerBatch", Test) {
)
testClassesDirs += files(project.sourceSets.test.output.classesDirs)

// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
// Increase memory heap in order to avoid OOM errors
jvmArgs '-Xmx7g'
Expand Down Expand Up @@ -373,27 +351,18 @@ createJavaExamplesArchetypeValidationTask(type: 'Quickstart', runner: 'Spark')

tasks.register("hadoopVersionsTest") {
group = "Verification"
def taskNames = hadoopVersions.keySet().stream()
.map { num -> "hadoopVersion${num}Test" }
.collect(Collectors.toList())
dependsOn taskNames
dependsOn hadoopVersions.collect{k,v -> "hadoopVersion${k}Test"}
}

tasks.register("examplesIntegrationTest", Test) {
group = "Verification"
// Disable gradle cache
outputs.upToDateWhen { false }
def pipelineOptions = JsonOutput.toJson([
"--runner=TestSparkRunner",
"--enableSparkMetricSinks=true",
"--tempLocation=${tempLocation}",
"--tempRoot=${tempLocation}",
"--project=${gcpProject}",
systemProperties sparkTestProperties([
"--tempLocation": "${tempLocation}",
"--tempRoot" : "${tempLocation}",
"--project" : "${gcpProject}"
])
systemProperty "beamTestPipelineOptions", pipelineOptions
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"

include '**/*IT.class'
maxParallelForks 4
Expand All @@ -414,19 +383,10 @@ hadoopVersions.each { kv ->
group = "Verification"
description = "Runs Spark tests with Hadoop version $kv.value"
classpath = configurations."hadoopVersion$kv.key" + sourceSets.test.runtimeClasspath
systemProperty "beam.spark.test.reuseSparkContext", "true"
systemProperty "spark.sql.shuffle.partitions", "4"
systemProperty "spark.ui.enabled", "false"
systemProperty "spark.ui.showConsoleProgress", "false"
systemProperty "beamTestPipelineOptions", """[
"--runner=TestSparkRunner",
"--streaming=false",
"--enableSparkMetricSinks=true"
]"""
// Only one SparkContext may be running in a JVM (SPARK-2243)
forkEvery 1
maxParallelForks 4
systemProperties sparkTestProperties()

include "**/*Test.class"
maxParallelForks 4
useJUnit {
excludeCategories "org.apache.beam.runners.spark.StreamingTest"
excludeCategories "org.apache.beam.runners.spark.UsesCheckpointRecovery"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.beam.runners.spark.SparkContextRule;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.StringUtf8Coder;
Expand Down Expand Up @@ -81,6 +82,7 @@ public void testInBatchMode() throws Exception {
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);
assertThat(InMemoryMetrics.valueOf("emptyLines"), is(nullValue()));

Instant instant = new Instant(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.runners.core.metrics.TestMetricsSink;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.Counter;
Expand Down Expand Up @@ -68,6 +69,8 @@ public void init() {
@Category(StreamingTest.class)
@Test
public void testInStreamingMode() throws Exception {
pipeline.getOptions().as(TestSparkPipelineOptions.class).setForceStreaming(true);

Instant instant = new Instant(0);
CreateStream<Integer> source =
CreateStream.of(VarIntCoder.of(), batchDuration())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Combine;
Expand Down Expand Up @@ -84,7 +86,7 @@
@Category(StreamingTest.class)
public class CreateStreamTest implements Serializable {

@Rule public final transient TestPipeline p = TestPipeline.create();
@Rule public final transient TestPipeline p = TestPipeline.fromOptions(streamingOptions());
@Rule public final transient ExpectedException thrown = ExpectedException.none();

@Test
Expand Down Expand Up @@ -524,4 +526,10 @@ public void process(ProcessContext context) {
context.output(element);
}
}

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@

import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
Expand All @@ -52,7 +54,7 @@ public class SparkCoGroupByKeyStreamingTest {
private static final TupleTag<Integer> INPUT1_TAG = new TupleTag<>("input1");
private static final TupleTag<Integer> INPUT2_TAG = new TupleTag<>("input2");

@Rule public final TestPipeline pipeline = TestPipeline.create();
@Rule public final TestPipeline pipeline = TestPipeline.fromOptions(streamingOptions());

private Duration batchDuration() {
return Duration.millis(
Expand Down Expand Up @@ -165,4 +167,10 @@ public void testInStreamingMode() throws Exception {
});
pipeline.run();
}

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
return options;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import java.io.Serializable;
import org.apache.beam.runners.spark.StreamingTest;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.Source;
Expand All @@ -33,6 +34,7 @@
import org.apache.beam.sdk.metrics.MetricQueryResults;
import org.apache.beam.sdk.metrics.MetricsFilter;
import org.apache.beam.sdk.metrics.SourceMetrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.PCollection;
Expand All @@ -47,7 +49,7 @@ public class StreamingSourceMetricsTest implements Serializable {
private static final MetricName ELEMENTS_READ = SourceMetrics.elementsRead().getName();

// Force streaming pipeline using pipeline rule.
@Rule public final transient TestPipeline pipeline = TestPipeline.create();
@Rule public final transient TestPipeline pipeline = TestPipeline.fromOptions(streamingOptions());

@Test
@Category(StreamingTest.class)
Expand Down Expand Up @@ -87,4 +89,10 @@ public void testUnboundedSourceMetrics() {
greaterThanOrEqualTo(minElements),
false)));
}

private static PipelineOptions streamingOptions() {
PipelineOptions options = TestPipeline.testingPipelineOptions();
options.as(TestSparkPipelineOptions.class).setForceStreaming(true);
return options;
}
}