Skip to content

Commit

Permalink
Merge pull request #3 from apache/master
Browse files Browse the repository at this point in the history
update fork 3rd Dec 2019, 14:55PM
  • Loading branch information
amoght authored Dec 3, 2019
2 parents 802c2b6 + 72a2d04 commit 4f89355
Show file tree
Hide file tree
Showing 22 changed files with 528 additions and 133 deletions.
6 changes: 5 additions & 1 deletion .test-infra/jenkins/job_Release_Gradle_Build.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@ job('beam_Release_Gradle_Build') {
description('Verify Gradle build against the official release version.')

// Set common parameters.
commonJobProperties.setTopLevelMainJobProperties(delegate)
commonJobProperties
.setTopLevelMainJobProperties(
delegate,
defaultBranch='master',
defaultTimeout=300)

// Allows triggering this build against pull requests.
commonJobProperties.enablePhraseTriggeringFromPullRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Materializations;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
Expand Down Expand Up @@ -710,8 +711,11 @@ private void addOutput(String name, PValue value, Coder<?> valueCoder) {
String generatedName = String.format("%s.out%d", stepName, outputInfoList.size());

addString(outputInfo, PropertyNames.USER_NAME, generatedName);
if (value instanceof PCollection
&& translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value)) {
if ((value instanceof PCollection
&& translator.runner.doesPCollectionRequireIndexedFormat((PCollection<?>) value))
|| ((value instanceof PCollectionView)
&& (Materializations.MULTIMAP_MATERIALIZATION_URN.equals(
((PCollectionView) value).getViewFn().getMaterialization().getUrn())))) {
addBoolean(outputInfo, PropertyNames.USE_INDEXED_FORMAT, true);
}
if (valueCoder != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,8 @@ public void testStreamingSplittableParDoTranslation() throws Exception {
@Test
public void testSplittableParDoTranslationFnApi() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
DataflowRunner runner = DataflowRunner.fromOptions(options);
options.setExperiments(Arrays.asList("beam_fn_api"));
DataflowRunner runner = DataflowRunner.fromOptions(options);
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);

Pipeline pipeline = Pipeline.create(options);
Expand Down Expand Up @@ -851,6 +851,66 @@ public void testToIterableTranslationWithIsmSideInput() throws Exception {
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}

@Test
public void testToSingletonTranslationWithFnApiSideInput() throws Exception {
// A "change detector" test that makes sure the translation
// of getting a PCollectionView<T> does not change
// in bad ways during refactor

DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("beam_fn_api"));
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1)).apply(View.asSingleton());
DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob();
assertAllStepOutputsHaveUniqueIds(job);

List<Step> steps = job.getSteps();
assertEquals(14, steps.size());

Step collectionToSingletonStep = steps.get(steps.size() - 1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());

@SuppressWarnings("unchecked")
List<Map<String, Object>> ctsOutputs =
(List<Map<String, Object>>)
steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
}

@Test
public void testToIterableTranslationWithFnApiSideInput() throws Exception {
// A "change detector" test that makes sure the translation
// of getting a PCollectionView<Iterable<T>> does not change
// in bad ways during refactor

DataflowPipelineOptions options = buildPipelineOptions();
options.setExperiments(Arrays.asList("beam_fn_api"));
DataflowPipelineTranslator translator = DataflowPipelineTranslator.fromOptions(options);

Pipeline pipeline = Pipeline.create(options);
pipeline.apply(Create.of(1, 2, 3)).apply(View.asIterable());

DataflowRunner runner = DataflowRunner.fromOptions(options);
runner.replaceTransforms(pipeline);
Job job = translator.translate(pipeline, runner, Collections.emptyList()).getJob();
assertAllStepOutputsHaveUniqueIds(job);

List<Step> steps = job.getSteps();
assertEquals(10, steps.size());

@SuppressWarnings("unchecked")
List<Map<String, Object>> ctsOutputs =
(List<Map<String, Object>>)
steps.get(steps.size() - 1).getProperties().get(PropertyNames.OUTPUT_INFO);
assertTrue(Structs.getBoolean(Iterables.getOnlyElement(ctsOutputs), "use_indexed_format"));
Step collectionToSingletonStep = steps.get(steps.size() - 1);
assertEquals("CollectionToSingleton", collectionToSingletonStep.getKind());
}

@Test
public void testStepDisplayData() throws Exception {
DataflowPipelineOptions options = buildPipelineOptions();
Expand Down
7 changes: 4 additions & 3 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,10 @@ type propertiesWithPubSubMessage struct {
}

type output struct {
UserName string `json:"user_name,omitempty"`
OutputName string `json:"output_name,omitempty"`
Encoding *graphx.CoderRef `json:"encoding,omitempty"`
UserName string `json:"user_name,omitempty"`
OutputName string `json:"output_name,omitempty"`
Encoding *graphx.CoderRef `json:"encoding,omitempty"`
UseIndexedFormat bool `json:"use_indexed_format,omitempty"`
}

type integer struct {
Expand Down
20 changes: 14 additions & 6 deletions sdks/go/pkg/beam/runners/dataflow/dataflowlib/translate.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er
rem := reflectx.ShallowClone(t.Inputs).(map[string]string)

prop.NonParallelInputs = make(map[string]*outputReference)
for key := range payload.SideInputs {
for key, sideInput := range payload.SideInputs {
// Side input require an additional conversion step, which must
// be before the present one.
delete(rem, key)
Expand All @@ -146,16 +146,24 @@ func (x *translator) translateTransform(trunk string, id string) ([]*df.Step, er
ref := x.pcollections[t.Inputs[key]]
c := x.translateCoder(pcol, pcol.CoderId)

var outputInfo output
outputInfo = output{
UserName: "i0",
OutputName: "i0",
Encoding: graphx.WrapIterable(c),
}
if graphx.URNMultimapSideInput == sideInput.GetAccessPattern().GetUrn() {
outputInfo.UseIndexedFormat = true
}

side := &df.Step{
Name: fmt.Sprintf("view%v_%v", id, key),
Kind: sideInputKind,
Properties: newMsg(properties{
ParallelInput: ref,
OutputInfo: []output{{
UserName: "i0",
OutputName: "i0",
Encoding: graphx.WrapIterable(c),
}},
OutputInfo: []output{
outputInfo,
},
UserName: userName(trunk, fmt.Sprintf("AsView%v_%v", id, key)),
}),
}
Expand Down
1 change: 0 additions & 1 deletion sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,6 @@ def setUp(self):
self.dataset_id, self.project)

@attr('IT')
@unittest.skip('BEAM-8842: Disabled due to reliance on old retry behavior.')
def test_multiple_destinations_transform(self):
output_table_1 = '%s%s' % (self.output_table, 1)
output_table_2 = '%s%s' % (self.output_table, 2)
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/io/gcp/bigquery_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -647,6 +647,7 @@ def test_value_provider_transform(self):
method='FILE_LOADS'))

@attr('IT')
@unittest.skip('BEAM-8842: Disabled due to reliance on old retry behavior.')
def test_multiple_destinations_transform(self):
streaming = self.test_pipeline.options.view_as(StandardOptions).streaming
if streaming and isinstance(self.test_pipeline.runner, TestDataflowRunner):
Expand Down
24 changes: 11 additions & 13 deletions sdks/python/apache_beam/io/textio_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from __future__ import division

import bz2
import datetime
import glob
import gzip
import logging
Expand Down Expand Up @@ -101,17 +100,19 @@ def write_data(
return f.name, [line.decode('utf-8') for line in all_data]


def write_pattern(lines_per_file, no_data=False):
def write_pattern(lines_per_file, no_data=False, return_filenames=False):
"""Writes a pattern of temporary files.
Args:
lines_per_file (List[int]): The number of lines to write per file.
no_data (bool): If :data:`True`, empty lines will be written, otherwise
each line will contain a concatenation of b'line' and the line number.
return_filenames (bool): If True, returned list will contain
(filename, data) pairs.
Returns:
Tuple[str, List[str]]: A tuple of the filename pattern and a list of the
utf-8 decoded written data.
Tuple[str, List[Union[str, (str, str)]]]: A tuple of the filename pattern
and a list of the utf-8 decoded written data or (filename, data) pairs.
"""
temp_dir = tempfile.mkdtemp()

Expand All @@ -121,7 +122,10 @@ def write_pattern(lines_per_file, no_data=False):
for i in range(len(lines_per_file)):
file_name, data = write_data(lines_per_file[i], no_data=no_data,
directory=temp_dir, prefix='mytemp')
all_data.extend(data)
if return_filenames:
all_data.extend(zip([file_name] * len(data), data))
else:
all_data.extend(data)
start_index += lines_per_file[i]

assert file_name
Expand Down Expand Up @@ -502,14 +506,8 @@ def test_read_from_text_file_pattern(self):
pipeline.run()

def test_read_from_text_with_file_name_file_pattern(self):
prefix = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
file_name_1, data_1 = write_data(5, prefix=prefix)
file_name_2, data_2 = write_data(5, prefix=prefix)
expected_data = []
expected_data.extend([(file_name_1, el) for el in data_1])
expected_data.extend([(file_name_2, el) for el in data_2])
folder = file_name_1[:file_name_1.rfind(os.path.sep)]
pattern = folder + os.path.sep + prefix + '*'
pattern, expected_data = write_pattern(
lines_per_file=[5, 5], return_filenames=True)
assert len(expected_data) == 10
pipeline = TestPipeline()
pcoll = pipeline | 'Read' >> ReadFromTextWithFilename(pattern)
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,11 @@ def _add_argparse_args(cls, parser):
parser.add_argument('--service_account_email',
default=None,
help='Identity to run virtual machines as.')
parser.add_argument('--no_auth', dest='no_auth', type=bool, default=False)
parser.add_argument('--no_auth',
dest='no_auth',
action='store_true',
default=False,
help='Skips authorizing credentials with Google Cloud.')
# Option to run templated pipelines
parser.add_argument('--template_location',
default=None,
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/pvalue.py
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ def __getitem__(self, tag):

if tag is not None:
self._transform.output_tags.add(tag)
pcoll = PCollection(self._pipeline, tag=tag)
pcoll = PCollection(self._pipeline, tag=tag, element_type=typehints.Any)
# Transfer the producer from the DoOutputsTuple to the resulting
# PCollection.
pcoll.producer = self.producer.parts[0]
Expand Down
22 changes: 14 additions & 8 deletions sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,8 @@ def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
return step

def _add_singleton_step(
self, label, full_label, tag, input_step, windowing_strategy):
self, label, full_label, tag, input_step, windowing_strategy,
access_pattern):
"""Creates a CollectionToSingleton step used to handle ParDo side inputs."""
# Import here to avoid adding the dependency for local running scenarios.
from apache_beam.runners.dataflow.internal import apiclient
Expand All @@ -620,12 +621,16 @@ def _add_singleton_step(
PropertyNames.STEP_NAME: input_step.proto.name,
PropertyNames.OUTPUT_NAME: input_step.get_output(tag)})
step.encoding = self._get_side_input_encoding(input_step.encoding)
step.add_property(
PropertyNames.OUTPUT_INFO,
[{PropertyNames.USER_NAME: (
'%s.%s' % (full_label, PropertyNames.OUTPUT)),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])

output_info = {
PropertyNames.USER_NAME: '%s.%s' % (full_label, PropertyNames.OUTPUT),
PropertyNames.ENCODING: step.encoding,
PropertyNames.OUTPUT_NAME: PropertyNames.OUT
}
if common_urns.side_inputs.MULTIMAP.urn == access_pattern:
output_info[PropertyNames.USE_INDEXED_FORMAT] = True
step.add_property(PropertyNames.OUTPUT_INFO, [output_info])

step.add_property(
PropertyNames.WINDOWING_STRATEGY,
self.serialize_windowing_strategy(windowing_strategy))
Expand Down Expand Up @@ -820,7 +825,8 @@ def run_ParDo(self, transform_node, options):
self._add_singleton_step(
step_name, si_full_label, side_pval.pvalue.tag,
self._cache.get_pvalue(side_pval.pvalue),
side_pval.pvalue.windowing)
side_pval.pvalue.windowing,
side_pval._side_input_data().access_pattern)
si_dict[si_label] = {
'@type': 'OutputReference',
PropertyNames.STEP_NAME: step_name,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def setUp(self):
'--project=test-project',
'--staging_location=ignored',
'--temp_location=/dev/null',
'--no_auth=True',
'--no_auth',
'--dry_run=True']

@mock.patch('time.sleep', return_value=None)
Expand Down Expand Up @@ -295,7 +295,7 @@ def test_no_group_by_key_directly_after_bigquery(self):
'--project=test-project',
'--staging_location=ignored',
'--temp_location=/dev/null',
'--no_auth=True'
'--no_auth'
]))
rows = p | beam.io.Read(beam.io.BigQuerySource('dataset.faketable'))
with self.assertRaises(ValueError,
Expand Down
1 change: 1 addition & 0 deletions sdks/python/apache_beam/runners/dataflow/internal/names.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class PropertyNames(object):
SOURCE_STEP_INPUT = 'custom_source_step_input'
SERIALIZED_TEST_STREAM = 'serialized_test_stream'
STEP_NAME = 'step_name'
USE_INDEXED_FORMAT = 'use_indexed_format'
USER_FN = 'user_fn'
USER_NAME = 'user_name'
VALIDATE_SINK = 'validate_sink'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_full_completion(self):
'--staging_location=' + dummy_dir,
'--temp_location=/dev/null',
'--template_location=' + dummy_file_name,
'--no_auth=True']))
'--no_auth']))

pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned
pipeline.run().wait_until_finish()
Expand All @@ -86,7 +86,7 @@ def test_bad_path(self):
'--staging_location=ignored',
'--temp_location=/dev/null',
'--template_location=/bad/path',
'--no_auth=True']))
'--no_auth']))
remote_runner.job = apiclient.Job(pipeline._options,
pipeline.to_runner_api())

Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class RunnerTest(unittest.TestCase):
'--project=test-project',
'--staging_location=ignored',
'--temp_location=/dev/null',
'--no_auth=True']
'--no_auth']

def test_create_runner(self):
self.assertTrue(
Expand Down
14 changes: 6 additions & 8 deletions sdks/python/apache_beam/runners/worker/sdk_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,16 +145,12 @@ def _execute(self, task, request):
self._responses.put(response)

def _request_register(self, request):
self._request_execute(request)
# registration request is handled synchronously
self._execute(
lambda: self.create_worker().do_instruction(request), request)

def _request_process_bundle(self, request):

def task():
self._execute(
lambda: self.create_worker().do_instruction(request), request)
self._worker_thread_pool.submit(task)
_LOGGER.debug(
"Currently using %s threads." % len(self._worker_thread_pool._workers))
self._request_execute(request)

def _request_process_bundle_split(self, request):
self._request_process_bundle_action(request)
Expand Down Expand Up @@ -190,6 +186,8 @@ def task():
lambda: self.create_worker().do_instruction(request), request)

self._worker_thread_pool.submit(task)
_LOGGER.debug(
"Currently using %s threads." % len(self._worker_thread_pool._workers))

def create_worker(self):
return SdkWorker(
Expand Down
Loading

0 comments on commit 4f89355

Please sign in to comment.