Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Composite triggers and unit tests for Go SDK #21756

Merged
merged 8 commits into from
Jun 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 146 additions & 8 deletions sdks/go/pkg/beam/core/graph/window/trigger/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ type Trigger interface {
// DefaultTrigger fires once after the end of window. Late Data is discarded.
type DefaultTrigger struct{}

// String implements the Stringer interface and returns trigger details as a string.
func (t DefaultTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t DefaultTrigger) trigger() {}

// Default constructs a default trigger that fires once after the end of window.
Expand All @@ -43,6 +48,11 @@ type AlwaysTrigger struct{}

func (t AlwaysTrigger) trigger() {}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AlwaysTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

// Always constructs a trigger that fires immediately
// whenever an element is received.
//
Expand All @@ -56,6 +66,11 @@ type AfterCountTrigger struct {
elementCount int32
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterCountTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t AfterCountTrigger) trigger() {}

// ElementCount returns the elementCount.
Expand All @@ -77,6 +92,11 @@ type AfterProcessingTimeTrigger struct {
timestampTransforms []TimestampTransform
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterProcessingTimeTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t AfterProcessingTimeTrigger) trigger() {}

// TimestampTransforms returns the timestampTransforms.
Expand Down Expand Up @@ -108,6 +128,11 @@ type DelayTransform struct {
Delay int64 // in milliseconds
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *DelayTransform) String() string {
return fmt.Sprintf("%#v", t)
}

func (DelayTransform) timestampTransform() {}

// AlignToTransform takes the timestamp and transforms it to the lowest
Expand All @@ -120,6 +145,11 @@ type AlignToTransform struct {
Period, Offset int64 // in milliseconds
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AlignToTransform) String() string {
return fmt.Sprintf("%#v", t)
}

func (AlignToTransform) timestampTransform() {}

// PlusDelay configures an AfterProcessingTime trigger to fire after a specified delay,
Expand Down Expand Up @@ -158,6 +188,11 @@ type RepeatTrigger struct {
subtrigger Trigger
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *RepeatTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t RepeatTrigger) trigger() {}

// SubTrigger returns the trigger to be repeated.
Expand All @@ -170,6 +205,9 @@ func (t *RepeatTrigger) SubTrigger() Trigger {
//
// Ex: trigger.Repeat(trigger.AfterCount(1)) is same as trigger.Always().
func Repeat(t Trigger) *RepeatTrigger {
if t == nil {
panic("trigger argument to trigger.Repeat() cannot be nil")
}
return &RepeatTrigger{subtrigger: t}
}

Expand All @@ -179,6 +217,11 @@ type AfterEndOfWindowTrigger struct {
lateFiring Trigger
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterEndOfWindowTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t AfterEndOfWindowTrigger) trigger() {}

// Early returns the Early Firing trigger for AfterEndOfWindowTrigger.
Expand Down Expand Up @@ -217,45 +260,140 @@ func (t *AfterEndOfWindowTrigger) LateFiring(late Trigger) *AfterEndOfWindowTrig
}

// AfterAnyTrigger fires after any of sub-trigger fires.
// NYI(BEAM-3304). Intended for framework use only.
type AfterAnyTrigger struct {
subtriggers []Trigger
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterAnyTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t AfterAnyTrigger) trigger() {}

// SubTriggers returns the component triggers.
func (t *AfterAnyTrigger) SubTriggers() []Trigger {
return t.subtriggers
}

// AfterAny returns a new AfterAny trigger with subtriggers set to passed argument.
func AfterAny(triggers []Trigger) *AfterAnyTrigger {
if len(triggers) <= 1 {
panic("empty slice passed as an argument to trigger.AfterAny()")
}
return &AfterAnyTrigger{subtriggers: triggers}
}

// AfterAllTrigger fires after all subtriggers are fired.
// NYI(BEAM-3304). Intended for framework use only.
type AfterAllTrigger struct {
subtriggers []Trigger
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterAllTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t AfterAllTrigger) trigger() {}

// SubTriggers returns the component triggers.
func (t *AfterAllTrigger) SubTriggers() []Trigger {
return t.subtriggers
}

// OrFinallyTrigger serves as final condition to cause any trigger to fire.
// NYI(BEAM-3304). Intended for framework use only.
type OrFinallyTrigger struct{}
// AfterAll returns a new AfterAll trigger with subtriggers set to the passed argument.
func AfterAll(triggers []Trigger) *AfterAllTrigger {
if len(triggers) <= 1 {
panic(fmt.Sprintf("number of subtriggers to trigger.AfterAll() should be greater than 1, got: %v", len(triggers)))
}
return &AfterAllTrigger{subtriggers: triggers}
}

// OrFinallyTrigger is ready whenever either of its subtriggers are ready, but finishes output
// when the finally subtrigger fires.
type OrFinallyTrigger struct {
main Trigger // Trigger governing main output; may fire repeatedly.
finally Trigger // Trigger governing termination of output.
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *OrFinallyTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t OrFinallyTrigger) trigger() {}

// OrFinally trigger has main trigger which may fire repeatedly and the finally trigger.
// Output is produced when the finally trigger fires.
func OrFinally(main, finally Trigger) *OrFinallyTrigger {
if main == nil || finally == nil {
panic("main and finally trigger arguments to trigger.OrFinally() cannot be nil")
}
return &OrFinallyTrigger{main: main, finally: finally}
}

// Main returns the main trigger of OrFinallyTrigger.
func (t *OrFinallyTrigger) Main() Trigger {
return t.main
}

// Finally returns the finally trigger of OrFinallyTrigger.
func (t *OrFinallyTrigger) Finally() Trigger {
return t.finally
}

// NeverTrigger is never ready to fire.
// NYI(BEAM-3304). Intended for framework use only.
type NeverTrigger struct{}

// String implements the Stringer interface and returns trigger details as a string.
func (t *NeverTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t NeverTrigger) trigger() {}

// AfterSynchronizedProcessingTimeTrigger fires when processing time synchronises with arrival time.
// NYI(BEAM-3304). Intended for framework use only.
// Never creates a Never Trigger that is never ready to fire.
// There will only be an ON_TIME output and a final output at window expiration.
func Never() *NeverTrigger {
return &NeverTrigger{}
}

// AfterSynchronizedProcessingTimeTrigger fires when processing time synchronizes with arrival time.
type AfterSynchronizedProcessingTimeTrigger struct{}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterSynchronizedProcessingTimeTrigger) String() string {
return fmt.Sprintf("%#v", t)
}

func (t AfterSynchronizedProcessingTimeTrigger) trigger() {}

// AfterSynchronizedProcessingTime creates a new AfterSynchronizedProcessingTime trigger that fires when
// processing time synchronizes with arrival time.
func AfterSynchronizedProcessingTime() *AfterSynchronizedProcessingTimeTrigger {
return &AfterSynchronizedProcessingTimeTrigger{}
}

// AfterEachTrigger fires when each trigger is ready. Order of triggers matters.
type AfterEachTrigger struct {
subtriggers []Trigger
}

func (t AfterEachTrigger) trigger() {}

// AfterEach creates a new AfterEach trigger that fires after each trigger is ready. It follows the order
// of triggers passed in as arguments. Let's say if the second trigger gets ready but the first one is not ready
// then it won't be fired until first triggered is ready and fired.
func AfterEach(subtriggers []Trigger) *AfterEachTrigger {
return &AfterEachTrigger{subtriggers: subtriggers}
}

// Subtriggers returns the list of subtriggers for the current AfterEach trigger.
func (t *AfterEachTrigger) Subtriggers() []Trigger {
return t.subtriggers
}

// String implements the Stringer interface and returns trigger details as a string.
func (t *AfterEachTrigger) String() string {
return fmt.Sprintf("%#v", t)
}
Loading