Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prism] TestStream support (Elements + Watermarks) #29917

Closed
Tracked by #29650
lostluck opened this issue Jan 4, 2024 · 2 comments
Closed
Tracked by #29650

[prism] TestStream support (Elements + Watermarks) #29917

lostluck opened this issue Jan 4, 2024 · 2 comments
Assignees

Comments

@lostluck
Copy link
Contributor

lostluck commented Jan 4, 2024

With Event Time timers largely sorted out in #29900, the next thing to work on is TestStream, since it should inform correct handling of ProcessingTime in general.

Largely described in this blog post TestStream allows pipeline authors to validate their pipelines and code in a variety of conditions deterministically. TestStream does this by providing a string of 3 different event types: ElementEvents, WatermarkEvents, and ProcessingTime events.

  • ElementEvents specify event time timestamped elements.
  • WatermarkEvents that set a new watermark.
  • ProcessingTime events which advance the "real time clock" by a specified duration.

WRT the implementation in protos, TestStream is a Primitive Transform configured by a TestStreamPayload.

The TestStream is to enact a single event at a time, and only proceed to the next event after the runner has reached quiescence. That is, all "side effects" of that event have occured, and processing has reached a stand still. This requires integration with the runner's watermark and bundle tracking infrastructure.

While in principle it's possible for a single pipeline to have multiple TestStream transforms, ElementEvents in practice this is not going to yield correct results, since it's not possible to specify a Total Ordering of events with multiple test streams, without belabored alternations with WatermarkEvents, which are the only real global constraint on ordering.

As such, this initial pass will only permit a single TestStream to exist in a pipeline, and reject pipelines that have more than one TestStream instance, at job submission time. This would be in the Prepare request from the JobManagment service.


Prism already knows when it has "reached quiescence" and currently treats this as an error case to avoid hanging indefinitely. In particular, in the ElementManager.Bundles after watermarks have refreshed, we check if there are no further inprogress bundles, or scheduled watermark refreshes. This is a natural point to trigger/insert any events from TestStream. If there are no test stream events,


The protos indicate it's possible to "tag" the Watermark and Element events, in order to specify a the total ordering of events, and avoid adding multiple TestStreams to a pipeline in the first place. This facility is currently only implemented in the Python SDK, in order to help service the Interactive Beam runner.

It is out of scope to block implementing TestStream tags to the Go SDK implementation, so while tags will be kept in mind during the prism implementation, it will not be able to be deemed complete until the python ValidatesRunner tests are executed against prism.

As such there are interesting code and tests from the Python SDK to look at for validating this feature set.

Tests:
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/testing/test_stream_test.py

The Python direct runner implementation: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/test_stream_impl.py

The WatermarkControllerEvaluator (that which the direct runner substitutes in to the above implementation to control the watermark precisely):
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L377


Structurally the Prism implementation will be similar, but without additional "transform" wrappings the Python runners take. Prism will approach test stream like impulse, in that there will be a single TestStreamHandler within the ElementManager. Each output tag will have a synthetic stage associated with it, permitting it to accept TestStreamEvents.

The synthetic stage approach allows tag level control of the output watermarks, and avoiding adding additional special case handling for output watermarks which are usually all the same for a given stage. (AKA a single output watermark, rather than multiple).

When the quiescence is reached, the TestStreamHandler can direct the next event as appropriate, which then updates their state as appropriate, via special methods on the stateState.

The overall pipeline execution should be fairly normal otherwise. TestStream can update "pending" elements and similar just as the ElementManager already does for ordinary data and timer handling.


TestStream also has the ability to connect to a TestStreamService. IIUC this is to allow for an external handler of test stream state for Interactive Beam. This is out of scope for Prism at this time, and test streams with the service configured will fail at pipeline submission.

@lostluck lostluck self-assigned this Jan 4, 2024
@lostluck lostluck changed the title TestStream [prism] TestStream support Jan 4, 2024
@apache apache locked and limited conversation to collaborators Jan 4, 2024
@lostluck lostluck changed the title [prism] TestStream support [prism] TestStream support (Elements + Watermarks) Jan 23, 2024
@lostluck
Copy link
Contributor Author

#30072 has the PR for this. TIL that locking comments also prevents the automatic github bot from referencing it here.

@lostluck
Copy link
Contributor Author

PR submitted, Processing time handling is being tracked in #30083.

@github-actions github-actions bot added this to the 2.55.0 Release milestone Feb 16, 2024
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant