Skip to content

Commit

Permalink
[BEAM-5980] Remove redundant Combine Performance tests for Python SDK…
Browse files Browse the repository at this point in the history
… running on Flink

It turned out we tested nothing with the three tests. They mapped records
of different sizes to Long which basically made the size irrelevant for
the rest of the pipeline.
  • Loading branch information
lgajowy committed Aug 7, 2019
1 parent 3833479 commit c02a05a
Showing 1 changed file with 6 additions and 80 deletions.
86 changes: 6 additions & 80 deletions .test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -31,74 +31,9 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}"

String flinkVersion = '1.7'
String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz'
Integer parallelism = 16

def loadTestConfigurationsFiveWorkers = { datasetName -> [
[
title : 'Combine Python Load test: 2GB 10 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-1-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_1',
input_options : '\'{' +
'"num_records": 200000000,' +
'"key_size": 1,' +
'"value_size": 9}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-2-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_2',
input_options : '\'{' +
'"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 kilobyte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-3-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_3',
input_options : '\'{' +
'"num_records": 2000,' +
'"key_size": 100000,' +
'"value_size": 90}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
]}

def loadTestConfigurationsSixteenWorkers = { datasetName -> [
def loadTestConfigurations = { datasetName -> [
[
title : 'Combine Python Load test: 2GB Fanout 4',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand All @@ -113,7 +48,7 @@ def loadTestConfigurationsSixteenWorkers = { datasetName -> [
'"num_records": 5000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
parallelism : 16,
parallelism : parallelism,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
Expand All @@ -135,7 +70,7 @@ def loadTestConfigurationsSixteenWorkers = { datasetName -> [
'"num_records": 2500000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
parallelism : 16,
parallelism : parallelism,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
Expand All @@ -149,22 +84,13 @@ def batchLoadTestJob = { scope, triggeringContext ->
scope.description('Runs Python Combine load tests on Flink runner in batch mode')
commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240)

def numberOfWorkers = 16
def scaledNumberOfWorkers = 5
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)

infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag)
infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag)
infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers)

def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}

infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers)
infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, parallelism)

testConfigs = loadTestConfigurationsFiveWorkers(datasetName)
def testConfigs = loadTestConfigurations(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}
Expand Down

0 comments on commit c02a05a

Please sign in to comment.