Skip to content

Commit

Permalink
Add the ability to generate markdown documentation from a set of prov…
Browse files Browse the repository at this point in the history
…iders.
  • Loading branch information
robertwb committed Dec 6, 2023
1 parent 9090b5b commit e7473f2
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 60 deletions.
35 changes: 18 additions & 17 deletions sdks/python/apache_beam/transforms/external.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ def _has_constructor(self):
# Information regarding a SchemaTransform available in an external SDK.
SchemaTransformsConfig = namedtuple(
'SchemaTransformsConfig',
['identifier', 'configuration_schema', 'inputs', 'outputs'])
['identifier', 'configuration_schema', 'inputs', 'outputs', 'description'])


class SchemaAwareExternalTransform(ptransform.PTransform):
Expand Down Expand Up @@ -444,22 +444,23 @@ def discover_iter(expansion_service, ignore_errors=True):
discover_response = service.DiscoverSchemaTransform(
beam_expansion_api_pb2.DiscoverSchemaTransformRequest())

for identifier in discover_response.schema_transform_configs:
proto_config = discover_response.schema_transform_configs[identifier]
try:
schema = named_tuple_from_schema(proto_config.config_schema)
except Exception as exn:
if ignore_errors:
logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
continue
else:
raise

yield SchemaTransformsConfig(
identifier=identifier,
configuration_schema=schema,
inputs=proto_config.input_pcollection_names,
outputs=proto_config.output_pcollection_names)
for identifier in discover_response.schema_transform_configs:
proto_config = discover_response.schema_transform_configs[identifier]
try:
schema = named_tuple_from_schema(proto_config.config_schema)
except Exception as exn:
if ignore_errors:
logging.info("Bad schema for %s: %s", identifier, str(exn)[:250])
continue
else:
raise

yield SchemaTransformsConfig(
identifier=identifier,
configuration_schema=schema,
inputs=proto_config.input_pcollection_names,
outputs=proto_config.output_pcollection_names,
description=proto_config.description)

@staticmethod
def discover_config(expansion_service, name):
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/typehints/schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,10 @@ def typing_to_runner_api(self, type_: type) -> schema_pb2.FieldType:
if isinstance(type_, schema_pb2.Schema):
return schema_pb2.FieldType(row_type=schema_pb2.RowType(schema=type_))

if hasattr(type_, '_beam_schema_proto'):
return schema_pb2.FieldType(
row_type=schema_pb2.RowType(schema=type_._beam_schema_proto))

if isinstance(type_, row_type.RowTypeConstraint):
if type_.schema_id is None:
schema_id = SCHEMA_REGISTRY.generate_new_id()
Expand Down Expand Up @@ -557,6 +561,7 @@ def named_tuple_from_schema(self, schema: schema_pb2.Schema) -> type:
'__reduce__',
_named_tuple_reduce_method(schema.SerializeToString()))
setattr(user_type, row_type._BEAM_SCHEMA_ID, schema.id)
user_type._beam_schema_proto = schema

self.schema_registry.add(user_type, schema)
coders.registry.register_coder(user_type, coders.RowCoder)
Expand Down
188 changes: 188 additions & 0 deletions sdks/python/apache_beam/yaml/generate_yaml_docs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import argparse
import re

import yaml

from apache_beam.yaml import yaml_provider
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import native_type_compatibility
from apache_beam.typehints.schemas import typing_from_runner_api
from apache_beam.utils import subprocess_server


def _fake_value(name, beam_type):
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
if beam_type.atomic_type == schema_pb2.STRING:
return f'"{name}"'
elif beam_type.atomic_type == schema_pb2.BOOLEAN:
return "true|false"
else:
return name
elif type_info == "array_type":
return [_fake_value(name, beam_type.array_type.element_type), '...']
elif type_info == "iterable_type":
return [_fake_value(name, beam_type.iterable_type.element_type), '...']
elif type_info == "map_type":
if beam_type.map_type.key_type.atomic_type == schema_pb2.STRING:
return {
'a': _fake_value(name + '_value_a', beam_type.map_type.value_type),
'b': _fake_value(name + '_value_b', beam_type.map_type.value_type),
'c': '...',
}
else:
return {
_fake_value(name + '_key', beam_type.map_type.key_type): _fake_value(
name + '_value', beam_type.map_type.value_type)
}
elif type_info == "row_type":
return _fake_row(beam_type.row_type.schema)
elif type_info == "logical_type":
return name
else:
raise ValueError(f"Unrecognized type_info: {type_info!r}")


def _fake_row(schema):
if schema is None:
return '...'
return {f.name: _fake_value(f.name, f.type) for f in schema.fields}


def pretty_example(provider, t):
spec = {'type': t}
try:
requires_inputs = provider.requires_inputs(t, {})
except Exception:
requires_inputs = False
if requires_inputs:
spec['input'] = '...'
config_schema = provider.config_schema(t)
if config_schema is None or config_schema.fields:
spec['config'] = _fake_row(config_schema)
s = yaml.dump(spec, sort_keys=False)
return s.replace("'", "")


def config_docs(schema):
if schema is None:
return ''
elif not schema.fields:
return 'No configuration parameters.'

def pretty_type(beam_type):
type_info = beam_type.WhichOneof("type_info")
if type_info == "atomic_type":
return schema_pb2.AtomicType.Name(beam_type.atomic_type).lower()
elif type_info == "array_type":
return f'Array[{pretty_type(beam_type.array_type.element_type)}]'
elif type_info == "iterable_type":
return f'Iterable[{pretty_type(beam_type.iterable_type.element_type)}]'
elif type_info == "map_type":
return f'Map[{pretty_type(beam_type.map_type.key_type)}, {pretty_type(beam_type.map_type.value_type)}]'
elif type_info == "row_type":
return 'Row'
else:
return '?'

def maybe_row_parameters(t):
if t.WhichOneof("type_info") == "row_type":
return indent('\n\nRow fields:\n\n' + config_docs(t.row_type.schema), 4)
else:
return ''

def maybe_optional(t):
return " (Optional)" if t.nullable else ""

def lines():
for f in schema.fields:
yield ''.join([
f'**{f.name}** `{pretty_type(f.type)}`',
maybe_optional(f.type),
': ' + f.description if f.description else '',
maybe_row_parameters(f.type),
])

return '\n\n'.join('*' + indent(line, 2) for line in lines()).strip()


def indent(lines, size):
return '\n'.join(' ' * size + line for line in lines.split('\n'))


def longest(func, xs):
return max([func(x) or '' for x in xs], key=len)


def io_grouping_key(transform_name):
"""Place reads and writes next to each other, after all other transforms."""
if transform_name.startswith('ReadFrom'):
return 1, transform_name[8:], 0
elif transform_name.startswith('WriteTo'):
return 1, transform_name[7:], 1
else:
return 0, transform_name


SKIP = [
'Combine',
'Filter',
'MapToFields',
]


def transform_docs(t, providers):
return '\n'.join([
f'## {t}',
'',
longest(lambda p: p.description(t), providers),
'',
'### Configuration',
'',
longest(lambda p: config_docs(p.config_schema(t)), providers),
'',
'### Usage',
'',
indent(longest(lambda p: pretty_example(p, t), providers), 4),
])


def main():
parser = argparse.ArgumentParser()
parser.add_argument('output_file')
parser.add_argument('--include', default='.*')
parser.add_argument(
'--exclude', default='(Combine)|(Filter)|(MapToFields)-.*')
options = parser.parse_args()
include = re.compile(options.include).match
exclude = re.compile(options.exclude).match

with subprocess_server.SubprocessServer.cache_subprocesses():
with open(options.output_file, 'w') as fout:
providers = yaml_provider.standard_providers()
for transform in sorted(providers.keys(), key=io_grouping_key):
if include(transform) and not exclude(transform):
print(transform)
fout.write(transform_docs(transform, providers[transform]))
fout.write('\n\n')


if __name__ == '__main__':
main()
3 changes: 1 addition & 2 deletions sdks/python/apache_beam/yaml/standard_io.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@
'format': 'format'
'topic': 'topic'
'bootstrap_servers': 'bootstrapServers'
'producer_config_updates': 'ProducerConfigUpdates'
'error_handling': 'errorHandling'
'producer_config_updates': 'producerConfigUpdates'
'file_descriptor_path': 'fileDescriptorPath'
'message_name': 'messageName'
underlying_provider:
Expand Down
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/yaml_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def write_to_pubsub(
attributes_map: Optional[str] = None,
id_attribute: Optional[str] = None,
timestamp_attribute: Optional[str] = None):
"""Writes messages from Cloud Pub/Sub.
"""Writes messages to Cloud Pub/Sub.
Args:
topic: Cloud Pub/Sub topic in the form "/topics/<project>/<topic>".
Expand Down
22 changes: 22 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_mapping.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
#

"""This module defines the basic MapToFields operation."""
import functools
import inspect
import itertools
from collections import abc
from typing import Any
from typing import Callable
from typing import Collection
from typing import Dict
from typing import Mapping
from typing import NamedTuple
from typing import Optional
from typing import Union

Expand Down Expand Up @@ -267,6 +270,11 @@ def checking_func(row):
return func


class ErrorHandlingConfig(NamedTuple):
output: str
# TODO: Other parameters are valid here too, but not common to Java.


def exception_handling_args(error_handling_spec):
if error_handling_spec:
return {
Expand Down Expand Up @@ -294,12 +302,26 @@ def expand(self, pcoll):


def maybe_with_exception_handling_transform_fn(transform_fn):
@functools.wraps(transform_fn)
def expand(pcoll, error_handling=None, **kwargs):
wrapped_pcoll = beam.core._MaybePValueWithErrors(
pcoll, exception_handling_args(error_handling))
return transform_fn(wrapped_pcoll,
**kwargs).as_result(_map_errors_to_standard_format())

original_signature = inspect.signature(transform_fn)
new_parameters = list(original_signature.parameters.values())
error_handling_param = inspect.Parameter(
'error_handling',
inspect.Parameter.KEYWORD_ONLY,
default=None,
annotation=ErrorHandlingConfig)
if new_parameters[-1].kind == inspect.Parameter.VAR_KEYWORD:
new_parameters.insert(-1, error_handling_param)
else:
new_parameters.append(error_handling_param)
expand.__signature__ = original_signature.replace(parameters=new_parameters)

return expand


Expand Down
Loading

0 comments on commit e7473f2

Please sign in to comment.