Skip to content

Commit

Permalink
[#29772][prism] Handle EventTime Timers. (#29900)
Browse files Browse the repository at this point in the history
  • Loading branch information
lostluck authored Jan 11, 2024
1 parent 96d092b commit 812684f
Show file tree
Hide file tree
Showing 17 changed files with 778 additions and 96 deletions.
7 changes: 5 additions & 2 deletions sdks/go/pkg/beam/core/timers/timers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,10 @@ type TimerMap struct {
FireTimestamp, HoldTimestamp mtime.Time
}

// timerConfig is used transiently to hold configuration from the functional options.
type timerConfig struct {
Tag string
HoldSet bool // Whether the HoldTimestamp was set.
HoldTimestamp mtime.Time
}

Expand All @@ -68,6 +70,7 @@ func WithTag(tag string) timerOptions {
// WithOutputTimestamp sets the output timestamp for the timer.
func WithOutputTimestamp(outputTimestamp time.Time) timerOptions {
return func(tm *timerConfig) {
tm.HoldSet = true
tm.HoldTimestamp = mtime.FromTime(outputTimestamp)
}
}
Expand Down Expand Up @@ -108,7 +111,7 @@ func (et EventTime) Set(p Provider, FiringTimestamp time.Time, opts ...timerOpti
opt(&tc)
}
tm := TimerMap{Family: et.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)}
if !tc.HoldTimestamp.ToTime().IsZero() {
if tc.HoldSet {
tm.HoldTimestamp = tc.HoldTimestamp
}
p.Set(tm)
Expand Down Expand Up @@ -142,7 +145,7 @@ func (pt ProcessingTime) Set(p Provider, FiringTimestamp time.Time, opts ...time
opt(&tc)
}
tm := TimerMap{Family: pt.Family, Tag: tc.Tag, FireTimestamp: mtime.FromTime(FiringTimestamp), HoldTimestamp: mtime.FromTime(FiringTimestamp)}
if !tc.HoldTimestamp.ToTime().IsZero() {
if tc.HoldSet {
tm.HoldTimestamp = tc.HoldTimestamp
}

Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/runners/prism/internal/coders.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func pullDecoder(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reade
}
}

// pullDecoderNoAlloc returns a function that decodes a single eleemnt of the given coder.
// pullDecoderNoAlloc returns a function that decodes a single element of the given coder.
// Intended to only be used as an internal function for pullDecoder, which will use a io.TeeReader
// to extract the bytes.
func pullDecoderNoAlloc(c *pipepb.Coder, coders map[string]*pipepb.Coder) func(io.Reader) {
Expand Down
16 changes: 16 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@ type StateData struct {
Multimap map[string][][]byte
}

// TimerKey is for use as a key for timers.
type TimerKey struct {
Transform, Family string
}

// TentativeData is where data for in progress bundles is put
// until the bundle executes successfully.
type TentativeData struct {
Raw map[string][][]byte

// state is a map from transformID + UserStateID, to window, to userKey, to datavalues.
state map[LinkID]map[typex.Window]map[string]StateData
// timers is a map from the Timer transform+family to the encoded timer.
timers map[TimerKey][][]byte
}

// WriteData adds data to a given global collectionID.
Expand All @@ -49,6 +56,15 @@ func (d *TentativeData) WriteData(colID string, data []byte) {
d.Raw[colID] = append(d.Raw[colID], data)
}

// WriteTimers adds timers to the associated transform handler.
func (d *TentativeData) WriteTimers(transformID, familyID string, timers []byte) {
if d.timers == nil {
d.timers = map[TimerKey][][]byte{}
}
link := TimerKey{Transform: transformID, Family: familyID}
d.timers[link] = append(d.timers[link], timers)
}

func (d *TentativeData) toWindow(wKey []byte) typex.Window {
if len(wKey) == 0 {
return window.GlobalWindow{}
Expand Down
Loading

0 comments on commit 812684f

Please sign in to comment.