Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK] Add more info to Worker Status API #21776

Merged
merged 7 commits into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/go/container/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func main() {
"--options=" + options,
}
if info.GetStatusEndpoint() != nil {
args = append(args, "--status_endpoint="+info.GetStatusEndpoint().GetUrl())
os.Setenv("STATUS_ENDPOINT", info.GetStatusEndpoint().GetUrl())
}

if len(info.GetRunnerCapabilities()) > 0 {
Expand Down
50 changes: 50 additions & 0 deletions sdks/go/pkg/beam/core/metrics/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ package metrics

import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
)

// Implementation note: We avoid depending on the FnAPI protos here
Expand Down Expand Up @@ -156,6 +159,22 @@ type ptCounterSet struct {

type bundleProcState int

// String implements the Stringer interface.
func (b bundleProcState) String() string {
switch b {
case StartBundle:
return "START_BUNDLE"
case ProcessBundle:
return "PROCESS_BUNDLE"
case FinishBundle:
return "FINISH_BUNDLE"
case TotalBundle:
return "TOTAL_BUNDLE"
default:
return "unknown process bundle state!"
}
}

const (
// StartBundle indicates starting state of a bundle
StartBundle bundleProcState = 0
Expand All @@ -174,12 +193,22 @@ type ExecutionState struct {
TotalTime time.Duration
}

// String implements the Stringer interface.
func (e ExecutionState) String() string {
return fmt.Sprintf("Execution State:\n\t State: %s\n\t IsProcessing: %v\n\t Total time: %v\n", e.State, e.IsProcessing, e.TotalTime)
}

// BundleState stores information about a PTransform for execution time metrics.
type BundleState struct {
pid string
currentState bundleProcState
}

// String implements the Stringer interface.
func (b BundleState) String() string {
return fmt.Sprintf("Bundle State:\n\t PTransform ID: %s\n\t Current state: %s", b.pid, b.currentState)
}

// currentStateVal exports the current state of a bundle wrt PTransform.
type currentStateVal struct {
pid string
Expand Down Expand Up @@ -218,3 +247,24 @@ func (b *Store) storeMetric(pid string, n name, m userMetric) {
}
b.store[l] = m
}

// BundleState returns the bundle state.
func (b *Store) BundleState() string {
bs := *(*BundleState)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(&b.bundleState))))
return bs.String()
}

// StateRegistry returns the state registry that stores bundleID to executions states mapping.
func (b *Store) StateRegistry() string {
b.mu.Lock()
defer b.mu.Unlock()
builder := &strings.Builder{}
builder.WriteString("\n | All Bundle Process States | \n")
for bundleID, state := range b.stateRegistry {
builder.WriteString(fmt.Sprintf("\tBundle ID: %s\n", bundleID))
for i := 0; i < 4; i++ {
builder.WriteString(fmt.Sprintf("\t%s\n", state[i]))
}
}
return builder.String()
}
52 changes: 26 additions & 26 deletions sdks/go/pkg/beam/core/runtime/harness/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,6 @@ import (
"google.golang.org/protobuf/types/known/durationpb"
)

// StatusAddress is a type of status endpoint address as an optional argument to harness.Main().
type StatusAddress string

// URNMonitoringInfoShortID is a URN indicating support for short monitoring info IDs.
const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"

Expand All @@ -50,22 +47,14 @@ const URNMonitoringInfoShortID = "beam:protocol:monitoring_info_short_ids:v1"
// Main is the main entrypoint for the Go harness. It runs at "runtime" -- not
// "pipeline-construction time" -- on each worker. It is a FnAPI client and
// ultimately responsible for correctly executing user code.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add documentation around "expected" Environment variables. TBH, I don't mind the options approach to pass into main, but the fetching form the Env vars would then happen in the init.go.

func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options ...interface{}) error {
func Main(ctx context.Context, loggingEndpoint, controlEndpoint string) error {
hooks.DeserializeHooksFromOptions(ctx)

statusEndpoint := ""
for _, option := range options {
switch option := option.(type) {
case StatusAddress:
statusEndpoint = string(option)
default:
return errors.Errorf("unknown type %T, value %v in error call", option, option)
}
}

// Extract environment variables. These are optional runner supported capabilities.
// Expected env variables:
// RUNNER_CAPABILITIES : list of runner supported capability urn.
// STATUS_ENDPOINT : Endpoint to connect to status server used for worker status reporting.
statusEndpoint := os.Getenv("STATUS_ENDPOINT")
runnerCapabilities := strings.Split(os.Getenv("RUNNER_CAPABILITIES"), " ")
rcMap := make(map[string]bool)
if len(runnerCapabilities) > 0 {
Expand Down Expand Up @@ -128,18 +117,6 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
log.Debugf(ctx, "control response channel closed")
}()

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint)
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

sideCache := statecache.SideInputCache{}
sideCache.Init(cacheSize)

Expand All @@ -157,6 +134,19 @@ func Main(ctx context.Context, loggingEndpoint, controlEndpoint string, options
cache: &sideCache,
runnerCapabilities: rcMap,
}

// if the runner supports worker status api then expose SDK harness status
if statusEndpoint != "" {
statusHandler, err := newWorkerStatusHandler(ctx, statusEndpoint, ctrl.cache, func(statusInfo *strings.Builder) { ctrl.metStoreToString(statusInfo) })
if err != nil {
log.Errorf(ctx, "error establishing connection to worker status API: %v", err)
} else {
if err := statusHandler.start(ctx); err == nil {
defer statusHandler.stop(ctx)
}
}
}

// gRPC requires all readers of a stream be the same goroutine, so this goroutine
// is responsible for managing the network data. All it does is pull data from
// the stream, and hand off the message to a goroutine to actually be handled,
Expand Down Expand Up @@ -296,6 +286,16 @@ type control struct {
runnerCapabilities map[string]bool
}

func (c *control) metStoreToString(statusInfo *strings.Builder) {
c.mu.Lock()
defer c.mu.Unlock()
for bundleID, store := range c.metStore {
statusInfo.WriteString(fmt.Sprintf("Bundle ID: %v\n", bundleID))
statusInfo.WriteString(fmt.Sprintf("\t%s", store.BundleState()))
statusInfo.WriteString(fmt.Sprintf("\t%s", store.StateRegistry()))
}
}

func (c *control) getOrCreatePlan(bdID bundleDescriptorID) (*exec.Plan, error) {
c.mu.Lock()
plans, ok := c.plans[bdID]
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/pkg/beam/core/runtime/harness/init/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ var (
id = flag.String("id", "", "Local identifier (required in worker mode).")
loggingEndpoint = flag.String("logging_endpoint", "", "Local logging gRPC endpoint (required in worker mode).")
controlEndpoint = flag.String("control_endpoint", "", "Local control gRPC endpoint (required in worker mode).")
statusEndpoint = flag.String("status_endpoint", "", "Local status gRPC endpoint (optional in worker mode).")
//lint:ignore U1000 semiPersistDir flag is passed in through the boot container, will need to be removed later
semiPersistDir = flag.String("semi_persist_dir", "/tmp", "Local semi-persistent directory (optional in worker mode).")
options = flag.String("options", "", "JSON-encoded pipeline options (required in worker mode).")
Expand Down Expand Up @@ -108,7 +107,7 @@ func hook() {
// does, and establish the background context here.

ctx := grpcx.WriteWorkerID(context.Background(), *id)
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint, harness.StatusAddress(*statusEndpoint)); err != nil {
if err := harness.Main(ctx, *loggingEndpoint, *controlEndpoint); err != nil {
fmt.Fprintf(os.Stderr, "Worker failed: %v\n", err)
switch ShutdownMode {
case Terminate:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,3 +272,10 @@ func (c *SideInputCache) evictElement(ctx context.Context) {
}
}
}

// CacheMetrics returns the cache metrics for current side input cache.
func (c *SideInputCache) CacheMetrics() CacheMetrics {
c.mu.Lock()
defer c.mu.Unlock()
return c.metrics
riteshghorse marked this conversation as resolved.
Show resolved Hide resolved
}
66 changes: 58 additions & 8 deletions sdks/go/pkg/beam/core/runtime/harness/worker_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,13 @@ package harness

import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
"io"
"runtime"
"runtime/debug"
"runtime/pprof"
"strings"
"sync"
"sync/atomic"
"time"
Expand All @@ -31,17 +36,19 @@ import (

// workerStatusHandler stores the communication information of WorkerStatus API.
type workerStatusHandler struct {
conn *grpc.ClientConn
shouldShutdown int32
wg sync.WaitGroup
conn *grpc.ClientConn
shouldShutdown int32
wg sync.WaitGroup
cache *statecache.SideInputCache
metStoreToString func(*strings.Builder)
}

func newWorkerStatusHandler(ctx context.Context, endpoint string) (*workerStatusHandler, error) {
func newWorkerStatusHandler(ctx context.Context, endpoint string, cache *statecache.SideInputCache, metStoreToString func(*strings.Builder)) (*workerStatusHandler, error) {
sconn, err := dial(ctx, endpoint, 60*time.Second)
if err != nil {
return nil, errors.Wrapf(err, "failed to connect: %v\n", endpoint)
}
return &workerStatusHandler{conn: sconn, shouldShutdown: 0}, nil
return &workerStatusHandler{conn: sconn, shouldShutdown: 0, cache: cache, metStoreToString: metStoreToString}, nil
}

func (w *workerStatusHandler) isAlive() bool {
Expand All @@ -65,20 +72,63 @@ func (w *workerStatusHandler) start(ctx context.Context) error {
return nil
}

func memoryUsage(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Memory Usage============\n")
m := runtime.MemStats{}
runtime.ReadMemStats(&m)
statusInfo.WriteString(fmt.Sprintf("heap in-use-spans/allocated/total/max = %d/%d/%d/%d MB\n", m.HeapInuse>>20, m.HeapAlloc>>20, m.TotalAlloc>>20, m.HeapSys>>20))
statusInfo.WriteString(fmt.Sprintf("stack in-use-spans/max = %d/%d MB\n", m.StackInuse>>20, m.StackSys>>20))
statusInfo.WriteString(fmt.Sprintf("GC-CPU percentage = %.2f %%\n", m.GCCPUFraction*100))
statusInfo.WriteString(fmt.Sprintf("Last GC time: %v\n", time.Unix(0, int64(m.LastGC))))
statusInfo.WriteString(fmt.Sprintf("Next GC: %v MB\n", m.NextGC>>20))
}

func (w *workerStatusHandler) activeProcessBundleStates(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Active Process Bundle States============\n")
w.metStoreToString(statusInfo)
}

func (w *workerStatusHandler) cacheStats(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Cache Stats============\n")
statusInfo.WriteString(fmt.Sprintf("State Cache:\n%+v\n", w.cache.CacheMetrics()))
}

func goroutineDump(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Goroutine Dump============\n")
profile := pprof.Lookup("goroutine")
if profile != nil {
profile.WriteTo(statusInfo, 1)
}
}

func buildInfo(statusInfo *strings.Builder) {
statusInfo.WriteString("\n============Build Info============\n")
if info, ok := debug.ReadBuildInfo(); ok {
statusInfo.WriteString(info.String())
}
}

// reader reads the WorkerStatusRequest from the stream and sends a processed WorkerStatusResponse to
// a response channel.
func (w *workerStatusHandler) reader(ctx context.Context, stub fnpb.BeamFnWorkerStatus_WorkerStatusClient) {
defer w.wg.Done()
buf := make([]byte, 1<<16)

for w.isAlive() {
req, err := stub.Recv()
if err != nil && err != io.EOF {
log.Debugf(ctx, "exiting workerStatusHandler.Reader(): %v", err)
return
}
log.Debugf(ctx, "RECV-status: %v", req.GetId())
runtime.Stack(buf, true)
response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: string(buf)}

statusInfo := &strings.Builder{}
memoryUsage(statusInfo)
w.activeProcessBundleStates(statusInfo)
w.cacheStats(statusInfo)
goroutineDump(statusInfo)
buildInfo(statusInfo)

response := &fnpb.WorkerStatusResponse{Id: req.GetId(), StatusInfo: statusInfo.String()}
if err := stub.Send(response); err != nil && err != io.EOF {
log.Errorf(ctx, "workerStatus.Writer: Failed to respond: %v", err)
}
Expand Down
6 changes: 5 additions & 1 deletion sdks/go/pkg/beam/core/runtime/harness/worker_status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package harness
import (
"context"
"fmt"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/runtime/harness/statecache"
"log"
"net"
"strings"
"testing"

fnpb "github.com/apache/beam/sdks/v2/go/pkg/beam/model/fnexecution_v1"
Expand Down Expand Up @@ -75,7 +77,9 @@ func TestSendStatusResponse(t *testing.T) {
t.Fatalf("unable to start test server: %v", err)
}

statusHandler := workerStatusHandler{conn: conn}
statusHandler := workerStatusHandler{conn: conn, cache: &statecache.SideInputCache{}, metStoreToString: func(builder *strings.Builder) {
builder.WriteString("metStore metadata")
}}
if err := statusHandler.start(ctx); err != nil {
t.Fatal(err)
}
Expand Down
3 changes: 1 addition & 2 deletions sdks/go/test/integration/wordcount/wordcount.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ package wordcount

import (
"context"
"fmt"
"regexp"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove this change. At best, we mostly want to remove the blank line between "strings" and "fmt".

"strings"

"fmt"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
"github.com/apache/beam/sdks/v2/go/pkg/beam/testing/passert"
Expand Down