diff --git a/sdks/go/pkg/beam/core/graph/window/trigger/trigger.go b/sdks/go/pkg/beam/core/graph/window/trigger/trigger.go index 148a672fc695b..d3c21f30f157a 100644 --- a/sdks/go/pkg/beam/core/graph/window/trigger/trigger.go +++ b/sdks/go/pkg/beam/core/graph/window/trigger/trigger.go @@ -30,6 +30,11 @@ type Trigger interface { // DefaultTrigger fires once after the end of window. Late Data is discarded. type DefaultTrigger struct{} +// String implements the Stringer interface and returns trigger details as a string. +func (t DefaultTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t DefaultTrigger) trigger() {} // Default constructs a default trigger that fires once after the end of window. @@ -43,6 +48,11 @@ type AlwaysTrigger struct{} func (t AlwaysTrigger) trigger() {} +// String implements the Stringer interface and returns trigger details as a string. +func (t *AlwaysTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + // Always constructs a trigger that fires immediately // whenever an element is received. // @@ -56,6 +66,11 @@ type AfterCountTrigger struct { elementCount int32 } +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterCountTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t AfterCountTrigger) trigger() {} // ElementCount returns the elementCount. @@ -77,6 +92,11 @@ type AfterProcessingTimeTrigger struct { timestampTransforms []TimestampTransform } +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterProcessingTimeTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t AfterProcessingTimeTrigger) trigger() {} // TimestampTransforms returns the timestampTransforms. @@ -108,6 +128,11 @@ type DelayTransform struct { Delay int64 // in milliseconds } +// String implements the Stringer interface and returns trigger details as a string. +func (t *DelayTransform) String() string { + return fmt.Sprintf("%#v", t) +} + func (DelayTransform) timestampTransform() {} // AlignToTransform takes the timestamp and transforms it to the lowest @@ -120,6 +145,11 @@ type AlignToTransform struct { Period, Offset int64 // in milliseconds } +// String implements the Stringer interface and returns trigger details as a string. +func (t *AlignToTransform) String() string { + return fmt.Sprintf("%#v", t) +} + func (AlignToTransform) timestampTransform() {} // PlusDelay configures an AfterProcessingTime trigger to fire after a specified delay, @@ -158,6 +188,11 @@ type RepeatTrigger struct { subtrigger Trigger } +// String implements the Stringer interface and returns trigger details as a string. +func (t *RepeatTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t RepeatTrigger) trigger() {} // SubTrigger returns the trigger to be repeated. @@ -170,6 +205,9 @@ func (t *RepeatTrigger) SubTrigger() Trigger { // // Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always(). func Repeat(t Trigger) *RepeatTrigger { + if t == nil { + panic("trigger argument to trigger.Repeat() cannot be nil") + } return &RepeatTrigger{subtrigger: t} } @@ -179,6 +217,11 @@ type AfterEndOfWindowTrigger struct { lateFiring Trigger } +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterEndOfWindowTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t AfterEndOfWindowTrigger) trigger() {} // Early returns the Early Firing trigger for AfterEndOfWindowTrigger. @@ -217,11 +260,15 @@ func (t *AfterEndOfWindowTrigger) LateFiring(late Trigger) *AfterEndOfWindowTrig } // AfterAnyTrigger fires after any of sub-trigger fires. -// NYI(BEAM-3304). Intended for framework use only. type AfterAnyTrigger struct { subtriggers []Trigger } +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterAnyTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t AfterAnyTrigger) trigger() {} // SubTriggers returns the component triggers. @@ -229,12 +276,24 @@ func (t *AfterAnyTrigger) SubTriggers() []Trigger { return t.subtriggers } +// AfterAny returns a new AfterAny trigger with subtriggers set to passed argument. +func AfterAny(triggers []Trigger) *AfterAnyTrigger { + if len(triggers) <= 1 { + panic("empty slice passed as an argument to trigger.AfterAny()") + } + return &AfterAnyTrigger{subtriggers: triggers} +} + // AfterAllTrigger fires after all subtriggers are fired. -// NYI(BEAM-3304). Intended for framework use only. type AfterAllTrigger struct { subtriggers []Trigger } +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterAllTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t AfterAllTrigger) trigger() {} // SubTriggers returns the component triggers. @@ -242,20 +301,99 @@ func (t *AfterAllTrigger) SubTriggers() []Trigger { return t.subtriggers } -// OrFinallyTrigger serves as final condition to cause any trigger to fire. -// NYI(BEAM-3304). Intended for framework use only. -type OrFinallyTrigger struct{} +// AfterAll returns a new AfterAll trigger with subtriggers set to the passed argument. +func AfterAll(triggers []Trigger) *AfterAllTrigger { + if len(triggers) <= 1 { + panic(fmt.Sprintf("number of subtriggers to trigger.AfterAll() should be greater than 1, got: %v", len(triggers))) + } + return &AfterAllTrigger{subtriggers: triggers} +} + +// OrFinallyTrigger is ready whenever either of its subtriggers are ready, but finishes output +// when the finally subtrigger fires. +type OrFinallyTrigger struct { + main Trigger // Trigger governing main output; may fire repeatedly. + finally Trigger // Trigger governing termination of output. +} + +// String implements the Stringer interface and returns trigger details as a string. +func (t *OrFinallyTrigger) String() string { + return fmt.Sprintf("%#v", t) +} func (t OrFinallyTrigger) trigger() {} +// OrFinally trigger has main trigger which may fire repeatedly and the finally trigger. +// Output is produced when the finally trigger fires. +func OrFinally(main, finally Trigger) *OrFinallyTrigger { + if main == nil || finally == nil { + panic("main and finally trigger arguments to trigger.OrFinally() cannot be nil") + } + return &OrFinallyTrigger{main: main, finally: finally} +} + +// Main returns the main trigger of OrFinallyTrigger. +func (t *OrFinallyTrigger) Main() Trigger { + return t.main +} + +// Finally returns the finally trigger of OrFinallyTrigger. +func (t *OrFinallyTrigger) Finally() Trigger { + return t.finally +} + // NeverTrigger is never ready to fire. -// NYI(BEAM-3304). Intended for framework use only. type NeverTrigger struct{} +// String implements the Stringer interface and returns trigger details as a string. +func (t *NeverTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t NeverTrigger) trigger() {} -// AfterSynchronizedProcessingTimeTrigger fires when processing time synchronises with arrival time. -// NYI(BEAM-3304). Intended for framework use only. +// Never creates a Never Trigger that is never ready to fire. +// There will only be an ON_TIME output and a final output at window expiration. +func Never() *NeverTrigger { + return &NeverTrigger{} +} + +// AfterSynchronizedProcessingTimeTrigger fires when processing time synchronizes with arrival time. type AfterSynchronizedProcessingTimeTrigger struct{} +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterSynchronizedProcessingTimeTrigger) String() string { + return fmt.Sprintf("%#v", t) +} + func (t AfterSynchronizedProcessingTimeTrigger) trigger() {} + +// AfterSynchronizedProcessingTime creates a new AfterSynchronizedProcessingTime trigger that fires when +// processing time synchronizes with arrival time. +func AfterSynchronizedProcessingTime() *AfterSynchronizedProcessingTimeTrigger { + return &AfterSynchronizedProcessingTimeTrigger{} +} + +// AfterEachTrigger fires when each trigger is ready. Order of triggers matters. +type AfterEachTrigger struct { + subtriggers []Trigger +} + +func (t AfterEachTrigger) trigger() {} + +// AfterEach creates a new AfterEach trigger that fires after each trigger is ready. It follows the order +// of triggers passed in as arguments. Let's say if the second trigger gets ready but the first one is not ready +// then it won't be fired until first triggered is ready and fired. +func AfterEach(subtriggers []Trigger) *AfterEachTrigger { + return &AfterEachTrigger{subtriggers: subtriggers} +} + +// Subtriggers returns the list of subtriggers for the current AfterEach trigger. +func (t *AfterEachTrigger) Subtriggers() []Trigger { + return t.subtriggers +} + +// String implements the Stringer interface and returns trigger details as a string. +func (t *AfterEachTrigger) String() string { + return fmt.Sprintf("%#v", t) +} diff --git a/sdks/go/pkg/beam/core/graph/window/trigger/trigger_test.go b/sdks/go/pkg/beam/core/graph/window/trigger/trigger_test.go index a23de354bfd61..2974e47b51ecc 100644 --- a/sdks/go/pkg/beam/core/graph/window/trigger/trigger_test.go +++ b/sdks/go/pkg/beam/core/graph/window/trigger/trigger_test.go @@ -15,6 +15,7 @@ package trigger import ( + "reflect" "testing" "time" @@ -58,6 +59,13 @@ func TestRepeatTrigger(t *testing.T) { t.Errorf("subtrigger not configured correctly. got %v, want %v", got, subTr) } } +func TestRepeatTrigger_SubTrigger(t1 *testing.T) { + subTr := AfterCount(2) + t := &RepeatTrigger{subtrigger: subTr} + if got, want := t.SubTrigger(), subTr; !reflect.DeepEqual(got, want) { + t1.Errorf("SubTrigger() = %v, want %v", got, want) + } +} func TestAfterEndOfWindowTrigger(t *testing.T) { earlyTr := AfterCount(50) @@ -71,3 +79,209 @@ func TestAfterEndOfWindowTrigger(t *testing.T) { t.Errorf("late firing trigger not configured correctly. got %v, want %v", got, lateTr) } } + +func TestAfterAll(t *testing.T) { + args := []Trigger{AfterCount(int32(10)), AfterProcessingTime()} + want := &AfterAllTrigger{subtriggers: args} + if got := AfterAll(args); !reflect.DeepEqual(got, want) { + t.Errorf("AfterAll() = %v, want %v", got, want) + } +} + +func TestAfterAllTrigger_SubTriggers(t1 *testing.T) { + args := []Trigger{AfterCount(int32(10)), AfterProcessingTime()} + t := &AfterAllTrigger{subtriggers: args} + + if got, want := t.SubTriggers(), args; !reflect.DeepEqual(got, want) { + t1.Errorf("SubTriggers() = %v, want %v", got, want) + } +} + +func TestAfterAny(t *testing.T) { + args := []Trigger{AfterCount(int32(10)), AfterProcessingTime()} + want := &AfterAnyTrigger{subtriggers: args} + + if got := AfterAny(args); !reflect.DeepEqual(got, want) { + t.Errorf("AfterAny() = %v, want %v", got, want) + } +} + +func TestAfterAnyTrigger_SubTriggers(t1 *testing.T) { + args := []Trigger{AfterCount(int32(10)), AfterProcessingTime()} + + t := &AfterAnyTrigger{subtriggers: args} + if got, want := t.SubTriggers(), args; !reflect.DeepEqual(got, want) { + t1.Errorf("SubTriggers() = %v, want %v", got, want) + } +} + +func TestAfterEach(t *testing.T) { + args := []Trigger{AfterCount(int32(10)), AfterProcessingTime()} + want := &AfterEachTrigger{subtriggers: args} + + if got := AfterEach(args); !reflect.DeepEqual(got, want) { + t.Errorf("AfterEach() = %v, want %v", got, want) + } +} + +func TestAfterEachTrigger_Subtriggers(t1 *testing.T) { + args := []Trigger{AfterCount(int32(10)), AfterProcessingTime()} + + t := &AfterAnyTrigger{subtriggers: args} + if got, want := t.SubTriggers(), args; !reflect.DeepEqual(got, want) { + t1.Errorf("SubTriggers() = %v, want %v", got, want) + } +} + +func TestAfterEndOfWindowTrigger_Early(t1 *testing.T) { + early := AfterCount(int32(50)) + t := &AfterEndOfWindowTrigger{ + earlyFiring: early, + lateFiring: Always(), + } + if got, want := t.Early(), early; !reflect.DeepEqual(got, want) { + t1.Errorf("Early() = %v, want %v", got, want) + } +} + +func TestAfterEndOfWindowTrigger_EarlyFiring(t1 *testing.T) { + early := AfterCount(int32(50)) + t := &AfterEndOfWindowTrigger{ + earlyFiring: Default(), + lateFiring: Always(), + } + want := &AfterEndOfWindowTrigger{ + earlyFiring: early, + lateFiring: Always(), + } + if got := t.EarlyFiring(early); !reflect.DeepEqual(got, want) { + t1.Errorf("EarlyFiring() = %v, want %v", got, want) + } +} + +func TestAfterEndOfWindowTrigger_Late(t1 *testing.T) { + late := Always() + t := &AfterEndOfWindowTrigger{ + earlyFiring: AfterCount(int32(50)), + lateFiring: late, + } + + if got, want := t.Late(), late; !reflect.DeepEqual(got, want) { + t1.Errorf("Late() = %v, want %v", got, want) + } +} + +func TestAfterEndOfWindowTrigger_LateFiring(t1 *testing.T) { + late := Always() + t := &AfterEndOfWindowTrigger{ + earlyFiring: AfterCount(int32(50)), + lateFiring: Default(), + } + want := &AfterEndOfWindowTrigger{ + earlyFiring: AfterCount(int32(50)), + lateFiring: late, + } + + if got := t.LateFiring(late); !reflect.DeepEqual(got, want) { + t1.Errorf("LateFiring() = %v, want %v", got, want) + } +} + +func TestAfterProcessingTimeTrigger_AlignedTo(t1 *testing.T) { + t := &AfterProcessingTimeTrigger{} + period, offset := int64(1), int64(0) + want := &AfterProcessingTimeTrigger{ + timestampTransforms: []TimestampTransform{AlignToTransform{Period: period, Offset: offset}}, + } + if got := t.AlignedTo(time.Millisecond, time.Time{}); !reflect.DeepEqual(got, want) { + t1.Errorf("AlignedTo() = %v, want %v", got, want) + } +} + +func TestAfterProcessingTimeTrigger_PlusDelay(t1 *testing.T) { + t := &AfterProcessingTimeTrigger{} + + want := &AfterProcessingTimeTrigger{ + timestampTransforms: []TimestampTransform{DelayTransform{Delay: int64(1)}}, + } + + if got := t.PlusDelay(time.Millisecond); !reflect.DeepEqual(got, want) { + t1.Errorf("PlusDelay() = %v, want %v", got, want) + } +} + +func TestAfterProcessingTimeTrigger_TimestampTransforms(t1 *testing.T) { + period, offset := int64(1), int64(0) + tt := []TimestampTransform{AlignToTransform{Period: period, Offset: offset}} + t := &AfterProcessingTimeTrigger{ + timestampTransforms: tt, + } + if got, want := t.TimestampTransforms(), tt; !reflect.DeepEqual(got, want) { + t1.Errorf("TimestampTransforms() = %v, want %v", got, want) + } +} + +func TestAfterSynchronizedProcessingTime(t *testing.T) { + want := &AfterSynchronizedProcessingTimeTrigger{} + if got := AfterSynchronizedProcessingTime(); !reflect.DeepEqual(got, want) { + t.Errorf("AfterSynchronizedProcessingTime() = %v, want %v", got, want) + } +} + +func TestAlways(t *testing.T) { + want := &AlwaysTrigger{} + if got := Always(); !reflect.DeepEqual(got, want) { + t.Errorf("Always() = %v, want %v", got, want) + } +} +func TestDefault(t *testing.T) { + want := &DefaultTrigger{} + if got := Default(); !reflect.DeepEqual(got, want) { + t.Errorf("Default() = %v, want %v", got, want) + } +} + +func TestNever(t *testing.T) { + want := &NeverTrigger{} + if got := Never(); !reflect.DeepEqual(got, want) { + t.Errorf("Never() = %v, want %v", got, want) + } +} + +func TestOrFinally(t *testing.T) { + main := AfterCount(50) + finally := Always() + want := &OrFinallyTrigger{ + main: main, + finally: finally, + } + if got := OrFinally(main, finally); !reflect.DeepEqual(got, want) { + t.Errorf("OrFinally() = %v, want %v", got, want) + } +} + +func TestOrFinallyTrigger_Finally(t1 *testing.T) { + main := AfterCount(50) + finally := Always() + t := &OrFinallyTrigger{ + main: main, + finally: finally, + } + + if got, want := t.Finally(), finally; !reflect.DeepEqual(got, want) { + t1.Errorf("Finally() = %v, want %v", got, want) + } +} + +func TestOrFinallyTrigger_Main(t1 *testing.T) { + main := AfterCount(50) + finally := Always() + t := &OrFinallyTrigger{ + main: main, + finally: finally, + } + + if got, want := t.Main(), main; !reflect.DeepEqual(got, want) { + t1.Errorf("Main() = %v, want %v", got, want) + } +} diff --git a/sdks/go/pkg/beam/core/runtime/graphx/translate.go b/sdks/go/pkg/beam/core/runtime/graphx/translate.go index 6d3b48733e2af..bde9f9b676ab6 100644 --- a/sdks/go/pkg/beam/core/runtime/graphx/translate.go +++ b/sdks/go/pkg/beam/core/runtime/graphx/translate.go @@ -1170,6 +1170,23 @@ func makeTrigger(t trigger.Trigger) *pipepb.Trigger { AfterSynchronizedProcessingTime: &pipepb.Trigger_AfterSynchronizedProcessingTime{}, }, } + case *trigger.OrFinallyTrigger: + return &pipepb.Trigger{ + Trigger: &pipepb.Trigger_OrFinally_{ + OrFinally: &pipepb.Trigger_OrFinally{ + Main: makeTrigger(t.Main()), + Finally: makeTrigger(t.Finally()), + }, + }, + } + case *trigger.AfterEachTrigger: + return &pipepb.Trigger{ + Trigger: &pipepb.Trigger_AfterEach_{ + AfterEach: &pipepb.Trigger_AfterEach{ + Subtriggers: extractSubtriggers(t.Subtriggers()), + }, + }, + } default: return &pipepb.Trigger{ Trigger: &pipepb.Trigger_Default_{ diff --git a/sdks/go/test/integration/integration.go b/sdks/go/test/integration/integration.go index 673603916ceb0..fe899d872269b 100644 --- a/sdks/go/test/integration/integration.go +++ b/sdks/go/test/integration/integration.go @@ -103,8 +103,8 @@ var flinkFilters = []string{ // TODO(BEAM-12815): Test fails on post commits: "Insufficient number of network buffers". "TestXLang_Multi", "TestDebeziumIO_BasicRead", - // Triggers are not yet supported - "TestTrigger.*", + // The number of produced outputs in AfterSynchronizedProcessingTime varies in different runs. + "TestTriggerAfterSynchronizedProcessingTime", // The flink runner does not support pipeline drain for SDF. "TestDrain", } diff --git a/sdks/go/test/integration/primitives/windowinto.go b/sdks/go/test/integration/primitives/windowinto.go index ded0587c2f0ed..fb29bea4a1ab5 100644 --- a/sdks/go/test/integration/primitives/windowinto.go +++ b/sdks/go/test/integration/primitives/windowinto.go @@ -267,3 +267,127 @@ func TriggerAfterEndOfWindow(s beam.Scope) { beam.Trigger(trigger), }, 2) } + +// TriggerAfterAll tests AfterAll trigger. The output pane is fired when all triggers in the subtriggers +// are ready. In this test, since trigger.AfterCount(int32(5)) won't be ready unless we see 5 elements, +// trigger.Always() won't fire until we meet that condition. So we fire only once when we see the 5th element. +func TriggerAfterAll(s beam.Scope) { + con := teststream.NewConfig() + con.AddElements(1000, 1.0, 2.0, 3.0, 5.0, 8.0) + con.AdvanceWatermark(11000) + + col := teststream.Create(s, con) + trigger := trigger.Repeat( + trigger.AfterAll( + []trigger.Trigger{ + trigger.Always(), + trigger.AfterCount(int32(5)), + }, + ), + ) + + validateCount(s.Scope("Global"), window.NewFixedWindows(10*time.Second), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger), + }, 1) +} + +// TriggerAfterEach tests AfterEach trigger. The output pane is fired after each trigger +// is ready in the order set in subtriggers. In this test, since trigger.AfterCount(int32(3)) is first, +// first pane is fired after 3 elements, then a pane is fired each for trigger.Always() for +// element 5.0 and 8.0 +func TriggerAfterEach(s beam.Scope) { + con := teststream.NewConfig() + con.AddElements(1000, 1.0, 2.0, 3.0, 5.0, 8.0) + con.AdvanceWatermark(11000) + + col := teststream.Create(s, con) + trigger := trigger.Repeat( + trigger.AfterEach( + []trigger.Trigger{ + trigger.AfterCount(int32(3)), + trigger.Always(), + }, + ), + ) + + validateCount(s.Scope("Global"), window.NewGlobalWindows(), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger), + }, 3) +} + +// TriggerAfterAny tests AfterAny trigger. In this test, trigger.Always() gets ready everytime. +// So we would expect panes to be fired at every element irrespective of checking for other triggers. +func TriggerAfterAny(s beam.Scope) { + con := teststream.NewConfig() + con.AddElements(1000, 1.0, 2.0, 3.0) + con.AdvanceWatermark(11000) + con.AddElements(12000, 5.0, 8.0) + + col := teststream.Create(s, con) + trigger := trigger.Repeat( + trigger.AfterAny( + []trigger.Trigger{ + trigger.AfterCount(int32(3)), + trigger.Always(), + }, + ), + ) + windowSize := 10 * time.Second + validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger), + }, 5) +} + +// TriggerAfterSynchronizedProcessingTime tests AfterSynchronizedProcessingTime trigger. It fires at the window +// expiration since the times doesn't synchronize in this test case. +func TriggerAfterSynchronizedProcessingTime(s beam.Scope) { + con := teststream.NewConfig() + con.AddElements(1000, 1.0, 2.0, 3.0) + con.AdvanceWatermark(11000) + con.AddElements(12000, 5.0, 8.0) + + col := teststream.Create(s, con) + trigger := trigger.Repeat(trigger.AfterSynchronizedProcessingTime()) + windowSize := 10 * time.Second + validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger), + }, 2) +} + +// TriggerNever tests Never Trigger. It fires at the window expiration. +func TriggerNever(s beam.Scope) { + con := teststream.NewConfig() + con.AddElements(1000, 1.0, 2.0, 3.0) + con.AdvanceWatermark(11000) + con.AddElements(12000, 5.0, 8.0) + + col := teststream.Create(s, con) + trigger := trigger.Never() + windowSize := 10 * time.Second + validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger), + }, 2) +} + +// TriggerOrFinally tests OrFinally trigger. The main trigger in this test case trigger.Always() +// is always ready. But the output is produced only when finally trigger is ready. So it is ready at second +// element in first window and produces two output panes. Similarly, for the second window. +func TriggerOrFinally(s beam.Scope) { + con := teststream.NewConfig() + con.AddElements(1000, 1.0, 2.0, 3.0) + con.AdvanceWatermark(11000) + con.AddElements(12000, 5.0, 8.0) + + col := teststream.Create(s, con) + trigger := trigger.OrFinally(trigger.Always(), trigger.AfterCount(int32(2))) + windowSize := 10 * time.Second + validateCount(s.Scope("Global"), window.NewFixedWindows(windowSize), col, + []beam.WindowIntoOption{ + beam.Trigger(trigger), + }, 4) +} diff --git a/sdks/go/test/integration/primitives/windowinto_test.go b/sdks/go/test/integration/primitives/windowinto_test.go index 3bdccc4ec4f65..16405ebb4b2b7 100644 --- a/sdks/go/test/integration/primitives/windowinto_test.go +++ b/sdks/go/test/integration/primitives/windowinto_test.go @@ -78,3 +78,45 @@ func TestTriggerAfterEndOfWindow(t *testing.T) { TriggerAfterEndOfWindow(s) ptest.RunAndValidate(t, p) } + +func TestTriggerAfterAll(t *testing.T) { + integration.CheckFilters(t) + p, s := beam.NewPipelineWithRoot() + TriggerAfterAll(s) + ptest.RunAndValidate(t, p) +} + +func TestTriggerAfterEach(t *testing.T) { + integration.CheckFilters(t) + p, s := beam.NewPipelineWithRoot() + TriggerAfterEach(s) + ptest.RunAndValidate(t, p) +} + +func TestTriggerAfterAny(t *testing.T) { + integration.CheckFilters(t) + p, s := beam.NewPipelineWithRoot() + TriggerAfterAny(s) + ptest.RunAndValidate(t, p) +} + +func TestTriggerAfterSynchronizedProcessingTime(t *testing.T) { + integration.CheckFilters(t) + p, s := beam.NewPipelineWithRoot() + TriggerAfterSynchronizedProcessingTime(s) + ptest.RunAndValidate(t, p) +} + +func TestTriggerNever(t *testing.T) { + integration.CheckFilters(t) + p, s := beam.NewPipelineWithRoot() + TriggerNever(s) + ptest.RunAndValidate(t, p) +} + +func TestTriggerOrFinally(t *testing.T) { + integration.CheckFilters(t) + p, s := beam.NewPipelineWithRoot() + TriggerOrFinally(s) + ptest.RunAndValidate(t, p) +}