Skip to content

Commit

Permalink
Fix bug with sdf bundle finalization
Browse files Browse the repository at this point in the history
  • Loading branch information
damccorm committed Jun 2, 2022
1 parent ae52acb commit 04b24dc
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 3 deletions.
3 changes: 2 additions & 1 deletion sdks/go/examples/snippets/04transforms.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/sdf"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/rtrackers/offsetrange"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
Expand Down Expand Up @@ -96,7 +97,7 @@ type weDoFn struct{}

// [START bundlefinalization_simplecallback]

func (fn *splittableDoFn) ProcessElement(bf beam.BundleFinalization, element string) {
func (fn *splittableDoFn) ProcessElement(bf beam.BundleFinalization, rt *sdf.LockRTracker, element string) {
// ... produce output ...

bf.RegisterCallback(5*time.Minute, func() error {
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ type bundleFinalizer struct {
lastValidCallback time.Time // Used to track when we can safely gc the bundleFinalizer
}

type needsBundleFinalization interface {
AttachFinalizer(*bundleFinalizer)
}

// RegisterCallback is used to register callbacks during DoFn execution.
func (bf *bundleFinalizer) RegisterCallback(t time.Duration, cb func() error) {
callback := bundleFinalizationCallback{
Expand Down
4 changes: 4 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/pardo.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ func (n *ParDo) Up(ctx context.Context) error {
return nil
}

func (n *ParDo) AttachFinalizer(bf *bundleFinalizer) {
n.bf = bf
}

// StartBundle does pre-bundle processing operation for the DoFn.
func (n *ParDo) StartBundle(ctx context.Context, id string, data DataContext) error {
if n.status != Up {
Expand Down
4 changes: 2 additions & 2 deletions sdks/go/pkg/beam/core/runtime/exec/plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func NewPlan(id string, units []Unit) (*Plan, error) {
if p, ok := u.(*PCollection); ok {
pcols = append(pcols, p)
}
if p, ok := u.(*ParDo); ok {
p.bf = &bf
if p, ok := u.(needsBundleFinalization); ok {
p.AttachFinalizer(&bf)
}
}
if len(roots) == 0 {
Expand Down
8 changes: 8 additions & 0 deletions sdks/go/pkg/beam/core/runtime/exec/sdf.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,10 @@ func (n *ProcessSizedElementsAndRestrictions) Up(ctx context.Context) error {
return n.PDo.Up(ctx)
}

func (n *ProcessSizedElementsAndRestrictions) AttachFinalizer(bf *bundleFinalizer) {
n.PDo.bf = bf
}

// StartBundle calls the ParDo's StartBundle method.
func (n *ProcessSizedElementsAndRestrictions) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.PDo.StartBundle(ctx, id, data)
Expand Down Expand Up @@ -950,6 +954,10 @@ func (n *SdfFallback) Up(ctx context.Context) error {
return n.PDo.Up(ctx)
}

func (n *SdfFallback) AttachFinalizer(bf *bundleFinalizer) {
n.PDo.bf = bf
}

// StartBundle calls the ParDo's StartBundle method.
func (n *SdfFallback) StartBundle(ctx context.Context, id string, data DataContext) error {
return n.PDo.StartBundle(ctx, id, data)
Expand Down

0 comments on commit 04b24dc

Please sign in to comment.