Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#30083][prism] Stabilize additional teststream cases. #31046

Merged
merged 4 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 24 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/engine/elementmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,8 @@ type Config struct {
type ElementManager struct {
config Config

stages map[string]*stageState // The state for each stage.
impulses set[string] // List of impulse stages.
stages map[string]*stageState // The state for each stage.

consumers map[string][]string // Map from pcollectionID to stageIDs that consumes them as primary input.
sideConsumers map[string][]LinkID // Map from pcollectionID to the stage+transform+input that consumes them as side input.
Expand Down Expand Up @@ -254,6 +255,14 @@ func (em *ElementManager) Impulse(stageID string) {
em.addPending(count)
}
refreshes := stage.updateWatermarks(em)

// Since impulses are synthetic, we need to simulate them properly
// if a pipeline is only test stream driven.
if em.impulses == nil {
em.impulses = refreshes
} else {
em.impulses.merge(refreshes)
}
em.addRefreshes(refreshes)
}

Expand Down Expand Up @@ -286,6 +295,13 @@ func (em *ElementManager) Bundles(ctx context.Context, nextBundID func() string)
// Watermark evaluation goroutine.
go func() {
defer close(runStageCh)

// If we have a test stream, clear out existing refreshes, so the test stream can
// insert any elements it needs.
if em.testStreamHandler != nil {
em.watermarkRefreshes = singleSet(em.testStreamHandler.ID)
}

for {
em.refreshCond.L.Lock()
// If there are no watermark refreshes available, we wait until there are.
Expand Down Expand Up @@ -370,7 +386,13 @@ func (em *ElementManager) checkForQuiescence(advanced set[string]) {
nextEvent.Execute(em)
// Decrement pending for the event being processed.
em.addPending(-1)
return
// If there are refreshes scheduled, then test stream permitted execution to continue.
// Note: it's a prism bug if test stream never causes a refresh to occur for a given event.
// It's not correct to move to the next event if no refreshes would occur.
if len(em.watermarkRefreshes) > 0 {
return
}
// If there are no refreshes, then there's no mechanism to make progress, so it's time to fast fail.
}

v := em.livePending.Load()
Expand Down
5 changes: 5 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,11 @@ func TestTestStream(t *testing.T) {
{pipeline: primitives.TestStreamTwoFloat64Sequences},
{pipeline: primitives.TestStreamTwoInt64Sequences},
{pipeline: primitives.TestStreamTwoUserTypeSequences},

{pipeline: primitives.TestStreamSimple},
{pipeline: primitives.TestStreamSimple_InfinityDefault},
{pipeline: primitives.TestStreamToGBK},
{pipeline: primitives.TestStreamTimersEventTime},
}

configs := []struct {
Expand Down
49 changes: 45 additions & 4 deletions sdks/go/pkg/beam/runners/prism/internal/engine/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package engine

import (
"container/heap"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
Expand Down Expand Up @@ -46,13 +47,15 @@ type testStreamHandler struct {

tagState map[string]tagState // Map from event tag to related outputs.

completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once.
currentHold mtime.Time // indicates if the default watermark hold has been lifted.
completed bool // indicates that no further test stream events exist, and all watermarks are advanced to infinity. Used to send the final event, once.
}

func makeTestStreamHandler(id string) *testStreamHandler {
return &testStreamHandler{
ID: id,
tagState: map[string]tagState{},
ID: id,
tagState: map[string]tagState{},
currentHold: mtime.MinTimestamp,
}
}

Expand Down Expand Up @@ -124,6 +127,35 @@ func (ts *testStreamHandler) NextEvent() tsEvent {
return ev
}

// UpdateHold restrains the watermark based on upcoming elements in the test stream queue
// This uses the element manager's normal hold mechnanisms to avoid premature pipeline termination,
// when there are still remaining events to process.
func (ts *testStreamHandler) UpdateHold(em *ElementManager, newHold mtime.Time) {
if ts == nil {
return
}

ss := em.stages[ts.ID]
ss.mu.Lock()
defer ss.mu.Unlock()

if ss.watermarkHoldsCounts[ts.currentHold] > 0 {
heap.Pop(&ss.watermarkHoldHeap)
ss.watermarkHoldsCounts[ts.currentHold] = ss.watermarkHoldsCounts[ts.currentHold] - 1
}
ts.currentHold = newHold
heap.Push(&ss.watermarkHoldHeap, ts.currentHold)
ss.watermarkHoldsCounts[ts.currentHold] = 1

// kick the TestStream and Impulse stages too.
kick := singleSet(ts.ID)
kick.merge(em.impulses)

// This executes under the refreshCond lock, so we can't call em.addRefreshes.
em.watermarkRefreshes.merge(kick)
em.refreshCond.Broadcast()
}

// TestStreamElement wraps the provided bytes and timestamp for ingestion and use.
type TestStreamElement struct {
Encoded []byte
Expand Down Expand Up @@ -195,6 +227,8 @@ func (ev tsWatermarkEvent) Execute(em *ElementManager) {
ss.updateUpstreamWatermark(ss.inputID, t.watermark)
em.watermarkRefreshes.insert(sID)
}
// Clear the default hold after the inserts have occured.
em.testStreamHandler.UpdateHold(em, t.watermark)
}

// tsProcessingTimeEvent implements advancing the synthetic processing time.
Expand All @@ -215,7 +249,7 @@ type tsFinalEvent struct {
}

func (ev tsFinalEvent) Execute(em *ElementManager) {
em.addPending(1) // We subtrack a pending after event execution, so add one now.
em.testStreamHandler.UpdateHold(em, mtime.MaxTimestamp)
ss := em.stages[ev.stageID]
kickSet := ss.updateWatermarks(em)
kickSet.insert(ev.stageID)
Expand All @@ -242,6 +276,13 @@ var (
func (tsi *testStreamImpl) initHandler(id string) {
if tsi.em.testStreamHandler == nil {
tsi.em.testStreamHandler = makeTestStreamHandler(id)

ss := tsi.em.stages[id]
tsi.em.addPending(1) // We subtrack a pending after event execution, so add one now for the final event to avoid a race condition.

// Arrest the watermark initially to prevent terminal advancement.
heap.Push(&ss.watermarkHoldHeap, tsi.em.testStreamHandler.currentHold)
ss.watermarkHoldsCounts[tsi.em.testStreamHandler.currentHold] = 1
}
}

Expand Down
5 changes: 3 additions & 2 deletions sdks/go/pkg/beam/runners/prism/internal/worker/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,10 @@ type B struct {
func (b *B) Init() {
// We need to see final data and timer signals that match the number of
// outputs the stage this bundle executes posesses
b.dataSema.Store(int32(b.OutputCount + len(b.HasTimers)))
outCap := int32(b.OutputCount + len(b.HasTimers))
b.dataSema.Store(outCap)
b.DataWait = make(chan struct{})
if b.OutputCount == 0 {
if outCap == 0 {
close(b.DataWait) // Can happen if there are no outputs for the bundle.
}
b.Resp = make(chan *fnpb.ProcessBundleResponse, 1)
Expand Down
74 changes: 74 additions & 0 deletions sdks/go/test/integration/primitives/teststream.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
package primitives

import (
"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/register"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/teststream"
)
Expand Down Expand Up @@ -172,3 +175,74 @@ func TestStreamInt16Sequence(s beam.Scope) {
passert.Count(s, col, "teststream int15", 3)
passert.EqualsList(s, col, ele)
}

// panicIfNot42 panics if the value is not 42.
func panicIfNot42(v int) {
if v != 42 {
panic(fmt.Sprintf("got %v, want 42", v))
}
}

// dropKeyEmitValues panics if the value is not 42.
lostluck marked this conversation as resolved.
Show resolved Hide resolved
func dropKeyEmitValues(_ int, vs func(*int) bool, emit func(int)) {
var v int
for vs(&v) {
emit(v)
}
}

func init() {
register.Function1x0(panicIfNot42)
register.Function3x0(dropKeyEmitValues)
}

// TestStreamSimple is a trivial pipeline where teststream sends
// a single element to a DoFn that checks that it's received the value.
// Intended for runner validation.
func TestStreamSimple(s beam.Scope) {
con := teststream.NewConfig()
ele := []int{42}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)
beam.ParDo0(s, panicIfNot42, col)
}

// TestStreamSimple_InfinityDefault is the same trivial pipeline that
// validates that the watermark is automatically advanced to infinity
// even when the user doesn't set it.
// Intended for runner validation.
func TestStreamSimple_InfinityDefault(s beam.Scope) {
con := teststream.NewConfig()
ele := []int{42}
con.AddElementList(100, ele)

col := teststream.Create(s, con)
beam.ParDo0(s, panicIfNot42, col)
}

// TestStreamToGBK is a trivial pipeline where teststream sends
// a single element to a GBK.
func TestStreamToGBK(s beam.Scope) {
con := teststream.NewConfig()
ele := []int{42}
con.AddElementList(100, ele)
con.AdvanceWatermarkToInfinity()

col := teststream.Create(s, con)
keyed := beam.AddFixedKey(s, col)
gbk := beam.GroupByKey(s, keyed)
dropped := beam.ParDo(s, dropKeyEmitValues, gbk)
beam.ParDo0(s, panicIfNot42, dropped)
}

// TestStreamTimersEventTime validates event time timers in a test stream "driven" pipeline.
func TestStreamTimersEventTime(s beam.Scope) {
timersEventTimePipelineBuilder(func(s beam.Scope) beam.PCollection {
c := teststream.NewConfig()
c.AddElements(123456, []byte{42})
c.AdvanceWatermarkToInfinity()
return teststream.Create(s, c)
})(s)
}
20 changes: 20 additions & 0 deletions sdks/go/test/integration/primitives/teststream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,23 @@ func TestTestStreamTwoUserTypeSequences(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamTwoUserTypeSequences)
}

func TestTestStreamSimple(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamSimple)
}

func TestTestStreamTestStreamSimple_InfinityDefault(t *testing.T) {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamSimple_InfinityDefault)
}

func TestTestStreamToGBK(t *testing.T) {
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamToGBK)
}

func TestTestStreamTimersEventTimeTestStream(t *testing.T) {
lostluck marked this conversation as resolved.
Show resolved Hide resolved
integration.CheckFilters(t)
ptest.BuildAndRun(t, TestStreamTimersEventTime)
}
2 changes: 1 addition & 1 deletion sdks/go/test/integration/primitives/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (fn *eventTimeFn) OnTimer(ctx context.Context, ts beam.EventTime, sp state.
}
}

// TimersEventTime takes in an impulse transform and produces a pipeline construction
// timersEventTimePipelineBuilder takes in an impulse transform and produces a pipeline construction
// function that validates EventTime timers.
//
// The impulse is provided outside to swap between a bounded impulse, and
Expand Down
Loading