Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK] Add more info to Worker Status API #21776

Merged
merged 7 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
"--options=" + options,
}
if info.GetStatusEndpoint() != nil {
args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl())
}

log.Fatalf("User program exited: %v", execx.Execute(prog, args...))
Expand Down
10 changes: 10 additions & 0 deletions sdks/go/pkg/beam/core/metrics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,3 +218,13 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) {
}
b.store[l] = m
}

// BundleState returns the bundle state.
func (b *Store) BundleState() *BundleState {
return b.bundleState
}

// StateRegistry returns the state registry that stores bundleID to executions states mapping.
func (b *Store) StateRegistry() map[string]*[4]ExecutionState {
return b.stateRegistry
}
41 changes: 16 additions & 25 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"io"
"os"
"sync"
"sync/atomic"
"time"
Expand All @@ -37,26 +38,15 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

// StatusAddress is a type of status endpoint address as an optional argument to harness.Main().
type StatusAddress string

// TODO(herohde) 2/8/2017: for now, assume we stage a full binary (not a plugin).

// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation around "expected" Environment variables. TBH, I don't mind the options approach to pass into main, but the fetching form the Env vars would then happen in the init.go.

func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options ...interface{}) error {
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
hooks.DeserializeHooksFromOptions(ctx)

statusEndpoint := ""
for _, option := range options {
switch option := option.(type) {
case StatusAddress:
statusEndpoint = string(option)
default:
return errors.Errorf("unknown type %T, value %v in error call", option, option)
}
}
statusEndpoint := os.Getenv("STATUS_ENDPOINT")

// Pass in the logging endpoint for use w/the default remote logging hook.
ctx = context.WithValue(ctx, loggingEndpointCtxKey, loggingEndpoint)
Expand Down Expand Up @@ -112,18 +102,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
log.Debugf(ctx, "control response channel closed")
}()

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint)
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

sideCache := statecache.SideInputCache{}
sideCache.Init(cacheSize)

Expand All @@ -140,6 +118,19 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
state: &StateChannelManager{},
cache: &sideCache,
}

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.metStore, ctrl.cache)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

// gRPC requires all readers of a stream be the same goroutine, so this goroutine
// is responsible for managing the network data. All it does is pull data from
// the stream, and hand off the message to a goroutine to actually be handled,
Expand Down
2 changes: 1 addition & 1 deletion sdks/go/pkg/beam/core/runtime/harness/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func hook() {
// does, and establish the background context here.

ctx := grpcx.WriteWorkerID(context.Background(), *id)
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint, harness.StatusAddress(*statusEndpoint)); err != nil {
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
case Terminate:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,8 @@ func (c *SideInputCache) evictElement(ctx context.Context) {
}
}
}

// CacheMetrics returns the cache metrics for current side input cache.
func (c *SideInputCache) CacheMetrics() CacheMetrics {
return c.metrics
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}
45 changes: 40 additions & 5 deletions sdks/go/pkg/beam/core/runtime/harness/worker_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ package harness

import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/metrics"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
"io"
"runtime"
"sync"
Expand All @@ -34,14 +37,16 @@ type workerStatusHandler struct {
conn *grpc.ClientConn
shouldShutdown int32
wg sync.WaitGroup
metStore map[instructionID]*metrics.Store //*metrics.Store for active bundles
cache *statecache.SideInputCache
}

func newWorkerStatusHandler(ctx context.Context, endpoint string) (*workerStatusHandler, error) {
func newWorkerStatusHandler(ctx context.Context, endpoint string, metStore map[instructionID]*metrics.Store, cache *statecache.SideInputCache) (*workerStatusHandler, error) {
sconn, err := dial(ctx, endpoint, 60*time.Second)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint)
}
return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil
return &workerStatusHandler{conn: sconn, shouldShutdown: 0, metStore: metStore, cache: cache}, nil
}

func (w *workerStatusHandler) isAlive() bool {
Expand All @@ -65,20 +70,50 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
return nil
}

func memoryUsage() string {
m := runtime.MemStats{}
runtime.ReadMemStats(&m)
return fmt.Sprintf("\n Total Alloc: %v bytes \n Sys: %v bytes \n Mallocs: %v\n Frees: %v\n HeapAlloc: %v bytes", m.TotalAlloc, m.Sys, m.Mallocs, m.Frees, m.HeapAlloc)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}

func (w *workerStatusHandler) activeProcessBundleStates() string {
var states string
for bundleID, store := range w.metStore {
execStates := ""
for bundleID, state := range store.StateRegistry() {
execStates += fmt.Sprintf("ID: %v Execution States: %#v,", bundleID, *state)

}
states += fmt.Sprintf("\nBundle ID: %v\nBundle State: %#v\nBundle Execution States: %v\n", bundleID, *store.BundleState(), execStates)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}
return states
}

func (w *workerStatusHandler) cacheStats() string {
return fmt.Sprintf("Cache:\n%v", w.cache.CacheMetrics())
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}

func goroutineDump() string {
buf := make([]byte, 1<<16)
runtime.Stack(buf, true)
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
return string(buf)
}

// reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to
// a response channel.
func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
defer w.wg.Done()
buf := make([]byte, 1<<16)

for w.isAlive() {
req, err := stub.Recv()
if err != nil && err != io.EOF {
log.Debugf(ctx, "exiting workerStatusHandler.Reader(): %v", err)
return
}
log.Debugf(ctx, "RECV-status: %v", req.GetId())
runtime.Stack(buf, true)
response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)}

statusInfo := fmt.Sprintf("\n============Memory Usage============\n%s\n============Active Process Bundle States============\n%s\n============Cache Stats============\n%s\n============Goroutine Dump============\n%s\n", memoryUsage(), w.activeProcessBundleStates(), w.cacheStats(), goroutineDump())
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo}
if err := stub.Send(response); err != nil && err != io.EOF {
log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err)
}
Expand Down
3 changes: 2 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package harness
import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
"log"
"net"
"testing"
Expand Down Expand Up @@ -75,7 +76,7 @@ func TestSendStatusResponse(t *testing.T) {
t.Fatalf("unable to start test server: %v", err)
}

statusHandler := workerStatusHandler{conn: conn}
statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}}
if err := statusHandler.start(ctx); err != nil {
t.Fatal(err)
}
Expand Down
7 changes: 2 additions & 5 deletions sdks/go/test/integration/wordcount/wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,13 @@ package wordcount

import (
"context"
"regexp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this change. At best, we mostly want to remove the blank line between "strings" and "fmt".

"strings"

"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
"github.com/apache/beam/sdks/v2/go/pkg/beam/transforms/stats"
"regexp"
"strings"
)

var (
Expand Down Expand Up @@ -81,7 +79,6 @@ func formatFn(w string, c int) string {
// WordCount returns a self-validating wordcount pipeline.
func WordCount(glob, hash string, size int) *beam.Pipeline {
p, s := beam.NewPipelineWithRoot()

in := textio.Read(s, glob)
WordCountFromPCol(s, in, hash, size)
return p
Expand Down