Skip to content

Commit

Permalink
Add error handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
robertwb committed Sep 21, 2023
1 parent 917edeb commit da3e417
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 31 deletions.
37 changes: 26 additions & 11 deletions sdks/python/apache_beam/transforms/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -2262,6 +2262,10 @@ def __init__(self, pcoll, exception_handling_args, upstream_errors=()):
def element_type(self):
return self._pcoll.element_type

@element_type.setter
def element_type(self, value):
self._pcoll.element_type = value

def main_output_tag(self):
return self._exception_handling_args.get('main_tag', 'good')

Expand All @@ -2272,17 +2276,24 @@ def __or__(self, transform):
return self.apply(transform)

def apply(self, transform):
result = self._pcoll | transform.with_exception_handling(
**self._exception_handling_args)
if result[self.main_output_tag()].element_type == typehints.Any:
result[self.main_output_tag()].element_type = transform.infer_output_type(
self._pcoll.element_type)
# TODO(BEAM-18957): Add support for tagged type hints.
result[self.error_output_tag()].element_type = typehints.Any
return _PValueWithErrors(
result[self.main_output_tag()],
self._exception_handling_args,
self._upstream_errors + (result[self.error_output_tag()], ))
if hasattr(transform, 'with_exception_handling'):
result = self._pcoll | transform.with_exception_handling(
**self._exception_handling_args)
if result[self.main_output_tag()].element_type == typehints.Any:
result[
self.main_output_tag()].element_type = transform.infer_output_type(
self._pcoll.element_type)
# TODO(BEAM-18957): Add support for tagged type hints.
result[self.error_output_tag()].element_type = typehints.Any
return _PValueWithErrors(
result[self.main_output_tag()],
self._exception_handling_args,
self._upstream_errors + (result[self.error_output_tag()], ))
else:
return _PValueWithErrors(
self._pcoll | transform,
self._exception_handling_args,
self._upstream_errors)

def accumulated_errors(self):
if len(self._upstream_errors) == 1:
Expand Down Expand Up @@ -2317,6 +2328,10 @@ def __init__(self, pvalue, exception_handling_args=None):
def element_type(self):
return self._pvalue.element_type

@element_type.setter
def element_type(self, value):
self._pvalue.element_type = value

def __or__(self, transform):
return self.apply(transform)

Expand Down
38 changes: 18 additions & 20 deletions sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
from apache_beam.io.gcp.bigquery import BigQueryDisposition
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import schemas
from apache_beam.yaml import yaml_mapping
from apache_beam.yaml import yaml_provider


Expand Down Expand Up @@ -141,7 +142,20 @@ def _create_parser(format, schema):
raise ValueError(f'Unknown format: {format}')


def _create_formatter(format, schema, beam_schema):
if format == 'raw':
if schema:
raise ValueError('raw format does not take a schema')
field_names = [field.name for field in beam_schema.fields]
if len(field_names) != 1:
raise ValueError(f'Expecting exactly one field, found {field_names}')
return lambda row: getattr(row, field_names[0])
else:
raise ValueError(f'Unknown format: {format}')


@beam.ptransform_fn
@yaml_mapping.maybe_with_exception_handling_transform_fn
def read_from_pubsub(
root,
*,
Expand All @@ -151,8 +165,7 @@ def read_from_pubsub(
schema: Optional[Any] = None,
attributes: Optional[Iterable[str]] = None,
attributes_map: Optional[str] = None,
timestamp_attribute: Optional[str] = None,
error_handling: Optional[Mapping[str, str]] = None):
timestamp_attribute: Optional[str] = None):
if topic and subscription:
raise TypeError('Only one of topic and subscription may be specified.')
elif not topic and not subscription:
Expand Down Expand Up @@ -181,8 +194,6 @@ def mapper(msg):
values[attributes_map] = msg.attributes
return beam.Row(**values)

if error_handling:
raise ValueError('waiting for https://github.com/apache/beam/pull/28462')
output = (
root
| beam.io.ReadFromPubSub(
Expand All @@ -196,19 +207,8 @@ def mapper(msg):
return output


def _create_formatter(format, schema, beam_schema):
if format == 'raw':
if schema:
raise ValueError('raw format does not take a schema')
field_names = [field.name for field in beam_schema.fields]
if len(field_names) != 1:
raise ValueError(f'Expecting exactly one field, found {field_names}')
return lambda row: getattr(row, field_names[0])
else:
raise ValueError(f'Unknown format: {format}')


@beam.ptransform_fn
@yaml_mapping.maybe_with_exception_handling_transform_fn
def write_to_pubsub(
pcoll,
*,
Expand All @@ -217,8 +217,7 @@ def write_to_pubsub(
schema: Optional[Any] = None,
attributes: Optional[Iterable[str]] = None,
attributes_map: Optional[str] = None,
timestamp_attribute: Optional[str] = None,
error_handling: Optional[Mapping[str, str]] = None):
timestamp_attribute: Optional[str] = None):

input_schema = schemas.schema_from_element_type(pcoll.element_type)

Expand Down Expand Up @@ -252,13 +251,12 @@ def attributes_extractor(row):
if field.name not in extra_fields
])
formatter = _create_formatter(format, schema, payload_schema)
_ = (
return (
pcoll | beam.Map(
lambda row: beam.io.gcp.pubsub.PubsubMessage(
formatter(row), attributes_extractor(row)))
| beam.io.WriteToPubSub(
topic, with_attributes=True, timestamp_attribute=timestamp_attribute))
return {}


def io_providers():
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,8 @@ def expand_leaf_transform(spec, scope):
return {f'out{ix}': pcoll for (ix, pcoll) in enumerate(outputs)}
elif isinstance(outputs, beam.PCollection):
return {'out': outputs}
elif outputs is None:
return {}
else:
raise ValueError(
f'Transform {identify_object(spec)} returned an unexpected type '
Expand Down

0 comments on commit da3e417

Please sign in to comment.