Skip to content

Commit

Permalink
Add @yields_batches and @yields_elements (#19268)
Browse files Browse the repository at this point in the history
* Add yields_batches and yields_elements decorators

* Handle timestamp propagation for element-to-batch and batch-to-element

* fixup! Handle timestamp propagation for element-to-batch and batch-to-element
  • Loading branch information
TheNeuralBit authored Jun 10, 2022
1 parent c975450 commit 0de9821
Show file tree
Hide file tree
Showing 7 changed files with 387 additions and 177 deletions.
37 changes: 30 additions & 7 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ cdef class DoFnSignature(object):

cdef class DoFnInvoker(object):
cdef public DoFnSignature signature
cdef OutputProcessor output_processor
cdef OutputHandler output_handler
cdef object user_state_context
cdef public object bundle_finalizer_param

Expand Down Expand Up @@ -124,24 +124,47 @@ cdef class DoFnRunner:
cpdef process(self, WindowedValue windowed_value)


cdef class OutputProcessor(object):
cdef class OutputHandler(object):
@cython.locals(windowed_value=WindowedValue,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results,
watermark_estimator=*)
cpdef handle_process_outputs(self, WindowedValue element, results,
watermark_estimator=*)

@cython.locals(windowed_batch=WindowedBatch,
output_element_count=int64_t)
cpdef handle_process_batch_outputs(self, WindowedBatch input_batch, results,
watermark_estimator=*)


cdef class _OutputProcessor(OutputProcessor):
cdef class _OutputHandler(OutputHandler):
cdef object window_fn
cdef Receiver main_receivers
cdef object tagged_receivers
cdef DataflowDistributionCounter per_element_output_counter
cdef object output_batch_converter
cdef bint _process_batch_yields_elements
cdef bint _process_yields_batches

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
output_element_count=int64_t)
cpdef handle_process_outputs(self, WindowedValue element, results,
watermark_estimator=*)

@cython.locals(windowed_value=WindowedValue,
windowed_batch=WindowedBatch,
output_element_count=int64_t)
cpdef process_outputs(self, WindowedValue element, results,
watermark_estimator=*)
cpdef handle_process_batch_outputs(self, WindowedBatch input_batch, results,
watermark_estimator=*)

@cython.locals(windowed_value=WindowedValue)
cdef inline WindowedValue _maybe_propagate_windowing_info(self, WindowedValue input_element, result)
cdef inline tuple _handle_tagged_output(self, result)
cdef inline _write_value_to_tag(self, tag, WindowedValue windowed_value,
watermark_estimator)
cdef inline _write_batch_to_tag(self, tag, WindowedBatch windowed_batch,
watermark_estimator)
cdef inline _verify_batch_output(self, result)

cdef class DoFnContext(object):
cdef object label
Expand Down
Loading

0 comments on commit 0de9821

Please sign in to comment.