From 16c3b7df7f78df4061703be769832de44216bfc1 Mon Sep 17 00:00:00 2001 From: Doug Fawley Date: Tue, 14 Mar 2023 14:01:16 -0700 Subject: [PATCH] examples: add example for ORCA load reporting (#6114) --- examples/examples_test.sh | 4 + examples/features/orca/README.md | 48 ++++++++ examples/features/orca/client/main.go | 153 ++++++++++++++++++++++++++ examples/features/orca/server/main.go | 92 ++++++++++++++++ examples/go.mod | 2 +- internal/internal.go | 3 + orca/service.go | 6 +- 7 files changed, 305 insertions(+), 3 deletions(-) create mode 100644 examples/features/orca/README.md create mode 100644 examples/features/orca/client/main.go create mode 100644 examples/features/orca/server/main.go diff --git a/examples/examples_test.sh b/examples/examples_test.sh index 812a46556bf0..9ae49d37c5e9 100755 --- a/examples/examples_test.sh +++ b/examples/examples_test.sh @@ -64,6 +64,7 @@ EXAMPLES=( "features/metadata_interceptor" "features/multiplex" "features/name_resolving" + "features/orca" "features/retry" "features/unix_abstract" ) @@ -75,6 +76,7 @@ declare -A SERVER_ARGS=( declare -A CLIENT_ARGS=( ["features/unix_abstract"]="-addr $UNIX_ADDR" + ["features/orca"]="-test=true" ["default"]="-addr localhost:$SERVER_PORT" ) @@ -114,6 +116,7 @@ declare -A EXPECTED_SERVER_OUTPUT=( ["features/metadata_interceptor"]="key1 from metadata: " ["features/multiplex"]=":50051" ["features/name_resolving"]="serving on localhost:50051" + ["features/orca"]="Server listening" ["features/retry"]="request succeeded count: 4" ["features/unix_abstract"]="serving on @abstract-unix-socket" ) @@ -134,6 +137,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=( ["features/metadata_interceptor"]="BidiStreaming Echo: hello world" ["features/multiplex"]="Greeting: Hello multiplex" ["features/name_resolving"]="calling helloworld.Greeter/SayHello to \"example:///resolver.example.grpc.io\"" + ["features/orca"]="Per-call load report received: map\[db_queries:10\]" ["features/retry"]="UnaryEcho reply: message:\"Try and Success\"" ["features/unix_abstract"]="calling echo.Echo/UnaryEcho to unix-abstract:abstract-unix-socket" ) diff --git a/examples/features/orca/README.md b/examples/features/orca/README.md new file mode 100644 index 000000000000..ef99aa255ba5 --- /dev/null +++ b/examples/features/orca/README.md @@ -0,0 +1,48 @@ +# ORCA Load Reporting + +ORCA is a protocol for reporting load between servers and clients. This +example shows how to implement this from both the client and server side. For +more details, please see [gRFC +A51](https://github.com/grpc/proposal/blob/master/A51-custom-backend-metrics.md) + +## Try it + +``` +go run server/main.go +``` + +``` +go run client/main.go +``` + +## Explanation + +gRPC ORCA support provides two different ways to report load data to clients +from servers: out-of-band and per-RPC. Out-of-band metrics are reported +regularly at some interval on a stream, while per-RPC metrics are reported +along with the trailers at the end of a call. Both of these mechanisms are +optional and work independently. + +The full ORCA API documentation is available here: +https://pkg.go.dev/google.golang.org/grpc/orca + +### Out-of-band Metrics + +The server registers an ORCA service that is used for out-of-band metrics. It +does this by using `orca.Register()` and then setting metrics on the returned +`orca.Service` using its methods. + +The client receives out-of-band metrics via the LB policy. It receives +callbacks to a listener by registering the listener on a `SubConn` via +`orca.RegisterOOBListener`. + +### Per-RPC Metrics + +The server is set up to report query cost metrics in its RPC handler. For +per-RPC metrics to be reported, the gRPC server must be created with the +`orca.CallMetricsServerOption()` option, and metrics are set by calling methods +on the returned `orca.CallMetricRecorder` from +`orca.CallMetricRecorderFromContext()`. + +The client performs one RPC per second. Per-RPC metrics are available for each +call via the `Done()` callback returned from the LB policy's picker. diff --git a/examples/features/orca/client/main.go b/examples/features/orca/client/main.go new file mode 100644 index 000000000000..f295cfd3866a --- /dev/null +++ b/examples/features/orca/client/main.go @@ -0,0 +1,153 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Binary client is an example client. +package main + +import ( + "context" + "flag" + "fmt" + "log" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/orca" + + v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" + pb "google.golang.org/grpc/examples/features/proto/echo" +) + +var addr = flag.String("addr", "localhost:50051", "the address to connect to") +var test = flag.Bool("test", false, "if set, only 1 RPC is performed before exiting") + +func main() { + flag.Parse() + + // Set up a connection to the server. Configure to use our custom LB + // policy which will receive all the ORCA load reports. + conn, err := grpc.Dial(*addr, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultServiceConfig(`{"loadBalancingConfig": [{"orca_example":{}}]}`), + ) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + c := pb.NewEchoClient(conn) + + // Perform RPCs once per second. + ticker := time.NewTicker(time.Second) + for range ticker.C { + func() { + // Use an anonymous function to ensure context cancelation via defer. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + if _, err := c.UnaryEcho(ctx, &pb.EchoRequest{Message: "test echo message"}); err != nil { + log.Fatalf("Error from UnaryEcho call: %v", err) + } + }() + if *test { + return + } + } + +} + +// Register an ORCA load balancing policy to receive per-call metrics and +// out-of-band metrics. +func init() { + balancer.Register(orcaLBBuilder{}) +} + +type orcaLBBuilder struct{} + +func (orcaLBBuilder) Name() string { return "orca_example" } +func (orcaLBBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer { + return &orcaLB{cc: cc} +} + +// orcaLB is an incomplete LB policy designed to show basic ORCA load reporting +// functionality. It collects per-call metrics in the `Done` callback returned +// by its picker, and it collects out-of-band metrics by registering a listener +// when its SubConn is created. It does not follow general LB policy best +// practices and makes assumptions about the simple test environment it is +// designed to run within. +type orcaLB struct { + cc balancer.ClientConn +} + +func (o *orcaLB) UpdateClientConnState(ccs balancer.ClientConnState) error { + // We assume only one update, ever, containing exactly one address, given + // the use of the "passthrough" (default) name resolver. + + addrs := ccs.ResolverState.Addresses + if len(addrs) != 1 { + return fmt.Errorf("orcaLB: expected 1 address; received: %v", addrs) + } + + // Create one SubConn for the address and connect it. + sc, err := o.cc.NewSubConn(addrs, balancer.NewSubConnOptions{}) + if err != nil { + return fmt.Errorf("orcaLB: error creating SubConn: %v", err) + } + sc.Connect() + + // Register a simple ORCA OOB listener on the SubConn. We request a 1 + // second report interval, but in this example the server indicated the + // minimum interval it will allow is 3 seconds, so reports will only be + // sent that often. + orca.RegisterOOBListener(sc, orcaLis{}, orca.OOBListenerOptions{ReportInterval: time.Second}) + + return nil +} + +func (o *orcaLB) ResolverError(error) {} + +func (o *orcaLB) UpdateSubConnState(sc balancer.SubConn, scs balancer.SubConnState) { + if scs.ConnectivityState == connectivity.Ready { + o.cc.UpdateState(balancer.State{ConnectivityState: connectivity.Ready, Picker: &picker{sc}}) + } +} + +func (o *orcaLB) Close() {} + +type picker struct { + sc balancer.SubConn +} + +func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { + return balancer.PickResult{ + SubConn: p.sc, + Done: func(di balancer.DoneInfo) { + fmt.Println("Per-call load report received:", di.ServerLoad.(*v3orcapb.OrcaLoadReport).GetRequestCost()) + }, + }, nil +} + +// orcaLis is the out-of-band load report listener that we pass to +// orca.RegisterOOBListener to receive periodic load report information. +type orcaLis struct{} + +func (orcaLis) OnLoadReport(lr *v3orcapb.OrcaLoadReport) { + fmt.Println("Out-of-band load report received:", lr) +} diff --git a/examples/features/orca/server/main.go b/examples/features/orca/server/main.go new file mode 100644 index 000000000000..5d4bdb163a17 --- /dev/null +++ b/examples/features/orca/server/main.go @@ -0,0 +1,92 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Binary server is an example server. +package main + +import ( + "context" + "flag" + "fmt" + "log" + "net" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/internal" + "google.golang.org/grpc/orca" + "google.golang.org/grpc/status" + + pb "google.golang.org/grpc/examples/features/proto/echo" +) + +var port = flag.Int("port", 50051, "the port to serve on") + +type server struct { + pb.UnimplementedEchoServer +} + +func (s *server) UnaryEcho(ctx context.Context, in *pb.EchoRequest) (*pb.EchoResponse, error) { + // Report a sample cost for this query. + cmr := orca.CallMetricRecorderFromContext(ctx) + if cmr == nil { + return nil, status.Errorf(codes.Internal, "unable to retrieve call metric recorder (missing ORCA ServerOption?)") + } + cmr.SetRequestCost("db_queries", 10) + + return &pb.EchoResponse{Message: in.Message}, nil +} + +func main() { + flag.Parse() + + lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port)) + if err != nil { + log.Fatalf("Failed to listen: %v", err) + } + fmt.Printf("Server listening at %v\n", lis.Addr()) + + // Create the gRPC server with the orca.CallMetricsServerOption() option, + // which will enable per-call metric recording. + s := grpc.NewServer(orca.CallMetricsServerOption()) + pb.RegisterEchoServer(s, &server{}) + + // Register the orca service for out-of-band metric reporting, and set the + // minimum reporting interval to 3 seconds. Note that, by default, the + // minimum interval must be at least 30 seconds, but 3 seconds is set via + // an internal-only option for illustration purposes only. + opts := orca.ServiceOptions{MinReportingInterval: 3 * time.Second} + internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&opts) + orcaSvc, err := orca.Register(s, opts) + if err != nil { + log.Fatalf("Failed to register ORCA service: %v", err) + } + + // Simulate CPU utilization reporting. + go func() { + for { + orcaSvc.SetCPUUtilization(.5) + time.Sleep(2 * time.Second) + orcaSvc.SetCPUUtilization(.9) + time.Sleep(2 * time.Second) + } + }() + + s.Serve(lis) +} diff --git a/examples/go.mod b/examples/go.mod index eec23182551e..e4fd33eb6ba1 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -3,6 +3,7 @@ module google.golang.org/grpc/examples go 1.17 require ( + github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b github.com/golang/protobuf v1.5.2 golang.org/x/oauth2 v0.4.0 google.golang.org/genproto v0.0.0-20230110181048-76db0878b65f @@ -16,7 +17,6 @@ require ( github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cncf/udpa/go v0.0.0-20220112060539-c52dc94e7fbe // indirect - github.com/cncf/xds/go v0.0.0-20230105202645-06c439db220b // indirect github.com/envoyproxy/go-control-plane v0.10.3 // indirect github.com/envoyproxy/protoc-gen-validate v0.9.1 // indirect golang.org/x/net v0.8.0 // indirect diff --git a/internal/internal.go b/internal/internal.go index cd68fb3bb929..836b6a3b3e78 100644 --- a/internal/internal.go +++ b/internal/internal.go @@ -137,6 +137,9 @@ var ( // // TODO: Remove this function once the RBAC env var is removed. UnregisterRBACHTTPFilterForTesting func() + + // ORCAAllowAnyMinReportingInterval is for examples/orca use ONLY. + ORCAAllowAnyMinReportingInterval interface{} // func(so *orca.ServiceOptions) ) // HealthChecker defines the signature of the client-side LB channel health checking function. diff --git a/orca/service.go b/orca/service.go index 7816dcc1eca1..9400ae0c7e64 100644 --- a/orca/service.go +++ b/orca/service.go @@ -24,7 +24,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/orca/internal" + "google.golang.org/grpc/internal" + ointernal "google.golang.org/grpc/orca/internal" "google.golang.org/grpc/status" v3orcapb "github.com/cncf/xds/go/xds/data/orca/v3" @@ -33,9 +34,10 @@ import ( ) func init() { - internal.AllowAnyMinReportingInterval = func(so *ServiceOptions) { + ointernal.AllowAnyMinReportingInterval = func(so *ServiceOptions) { so.allowAnyMinReportingInterval = true } + internal.ORCAAllowAnyMinReportingInterval = ointernal.AllowAnyMinReportingInterval } // minReportingInterval is the absolute minimum value supported for