Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add @yields_batches and @yields_elements #19268

Merged
merged 3 commits into from
Jun 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cdef class DoFnSignature(object):

cdef class DoFnInvoker(object):
cdef public DoFnSignature signature
cdef OutputProcessor output_processor
cdef OutputHandler output_handler
cdef object user_state_context
cdef public object bundle_finalizer_param

Expand Down Expand Up @@ -124,24 +124,47 @@ cdef class DoFnRunner:
cpdef process(self, WindowedValue windowed_value)


cdef class OutputProcessor(object):
cdef class OutputHandler(object):
@cython.locals(windowed_value=WindowedValue,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results,
watermark_estimator=*)
cpdef handle_process_outputs(self, WindowedValue element, results,
watermark_estimator=*)

@cython.locals(windowed_batch=WindowedBatch,
output_element_count=int64_t)
cpdef handle_process_batch_outputs(self, WindowedBatch input_batch, results,
watermark_estimator=*)


cdef class _OutputProcessor(OutputProcessor):
cdef class _OutputHandler(OutputHandler):
cdef object window_fn
cdef Receiver main_receivers
cdef object tagged_receivers
cdef DataflowDistributionCounter per_element_output_counter
cdef object output_batch_converter
cdef bint _process_batch_yields_elements
cdef bint _process_yields_batches

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
output_element_count=int64_t)
cpdef handle_process_outputs(self, WindowedValue element, results,
watermark_estimator=*)

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results,
watermark_estimator=*)
cpdef handle_process_batch_outputs(self, WindowedBatch input_batch, results,
watermark_estimator=*)

@cython.locals(windowed_value=WindowedValue)
cdef inline WindowedValue _maybe_propagate_windowing_info(self, WindowedValue input_element, result)
cdef inline tuple _handle_tagged_output(self, result)
cdef inline _write_value_to_tag(self, tag, WindowedValue windowed_value,
watermark_estimator)
cdef inline _write_batch_to_tag(self, tag, WindowedBatch windowed_batch,
watermark_estimator)
cdef inline _verify_batch_output(self, result)

cdef class DoFnContext(object):
cdef object label
Expand Down
Loading