Skip to content

Commit

Permalink
[BEAM-5980] Remove redundant Combine Performance tests for Python SDK…
Browse files Browse the repository at this point in the history
… running on Flink

It turned out we tested nothing with the three tests. They mapped records
of different sizes to Long which basically made the size irrelevant for
the rest of the pipeline.
  • Loading branch information
lgajowy committed Aug 7, 2019
1 parent 3833479 commit ce8be07
Showing 1 changed file with 4 additions and 77 deletions.
81 changes: 4 additions & 77 deletions .test-infra/jenkins/job_LoadTests_Combine_Flink_Python.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -32,73 +32,7 @@ String pythonHarnessImageTag = "${dockerRegistryRoot}/python:${dockerTag}"
String flinkVersion = '1.7'
String flinkDownloadUrl = 'https://archive.apache.org/dist/flink/flink-1.7.0/flink-1.7.0-bin-hadoop28-scala_2.11.tgz'

def loadTestConfigurationsFiveWorkers = { datasetName -> [
[
title : 'Combine Python Load test: 2GB 10 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-1-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_1',
input_options : '\'{' +
'"num_records": 200000000,' +
'"key_size": 1,' +
'"value_size": 9}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 byte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-2-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_2',
input_options : '\'{' +
'"num_records": 20000000,' +
'"key_size": 10,' +
'"value_size": 90}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
[
title : 'Combine Python Load test: 2GB 100 kilobyte records',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
runner : CommonTestProperties.Runner.PORTABLE,
jobProperties: [
job_name : 'load-tests-python-flink-batch-combine-3-' + now,
project : 'apache-beam-testing',
publish_to_big_query: true,
metrics_dataset : datasetName,
metrics_table : 'python_flink_batch_combine_3',
input_options : '\'{' +
'"num_records": 2000,' +
'"key_size": 100000,' +
'"value_size": 90}\'',
parallelism : 5,
job_endpoint : 'localhost:8099',
environment_config : pythonHarnessImageTag,
environment_type : 'DOCKER',
top_count : 20,
]
],
]}

def loadTestConfigurationsSixteenWorkers = { datasetName -> [
def loadTestConfigurations = { datasetName -> [
[
title : 'Combine Python Load test: 2GB Fanout 4',
itClass : 'apache_beam.testing.load_tests.combine_test:CombineTest.testCombineGlobally',
Expand Down Expand Up @@ -149,22 +83,15 @@ def batchLoadTestJob = { scope, triggeringContext ->
scope.description('Runs Python Combine load tests on Flink runner in batch mode')
commonJobProperties.setTopLevelMainJobProperties(scope, 'master', 240)

def numberOfWorkers = 16
def scaledNumberOfWorkers = 5
def datasetName = loadTestsBuilder.getBigQueryDataset('load_test', triggeringContext)

infra.prepareSDKHarness(scope, CommonTestProperties.SDK.PYTHON, dockerRegistryRoot, dockerTag)
infra.prepareFlinkJobServer(scope, flinkVersion, dockerRegistryRoot, dockerTag)
infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers)

def testConfigs = loadTestConfigurationsSixteenWorkers(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}

infra.scaleCluster(scope, jenkinsJobName, scaledNumberOfWorkers)
def numberOfWorkers = 16
infra.setupFlinkCluster(scope, jenkinsJobName, flinkDownloadUrl, pythonHarnessImageTag, jobServerImageTag, numberOfWorkers)

testConfigs = loadTestConfigurationsFiveWorkers(datasetName)
def testConfigs = loadTestConfigurations(datasetName)
for (config in testConfigs) {
loadTestsBuilder.loadTest(scope, config.title, config.runner, CommonTestProperties.SDK.PYTHON, config.jobProperties, config.itClass)
}
Expand Down

0 comments on commit ce8be07

Please sign in to comment.