Skip to content

Commit

Permalink
[CLOB-930] Add ctx.Done() support to shutdown, make start-up wait tim…
Browse files Browse the repository at this point in the history
…e configurable, and add support for domain sockets for gRPC server and gRPC web server.

ctx.Done() support and was added in 0.50 with cosmos#15041
server start up time was removed in 0.50 with cosmos#15041
  • Loading branch information
lcwik committed Oct 25, 2023
1 parent b95c66d commit bef8a05
Show file tree
Hide file tree
Showing 6 changed files with 112 additions and 64 deletions.
22 changes: 19 additions & 3 deletions server/grpc/grpc_web.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package grpc

import (
"fmt"
"net"
"net/http"
"strings"
"time"

"github.com/improbable-eng/grpc-web/go/grpcweb"
Expand All @@ -23,24 +25,38 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err
)
}

var proto, addr string
parts := strings.SplitN(config.GRPCWeb.Address, "://", 2)
// Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify
// the network to use.
if len(parts) != 2 {
proto = "tcp"
addr = config.GRPCWeb.Address
} else {
proto, addr = parts[0], parts[1]
}
listener, err := net.Listen(proto, addr)
if err != nil {
return nil, err
}

wrappedServer := grpcweb.WrapServer(grpcSrv, options...)
grpcWebSrv := &http.Server{
Addr: config.GRPCWeb.Address,
Handler: wrappedServer,
ReadHeaderTimeout: 500 * time.Millisecond,
}

errCh := make(chan error)
go func() {
if err := grpcWebSrv.ListenAndServe(); err != nil {
if err := grpcWebSrv.Serve(listener); err != nil {
errCh <- fmt.Errorf("[grpc] failed to serve: %w", err)
}
}()

select {
case err := <-errCh:
return nil, err
case <-time.After(types.ServerStartTime): // assume server started successfully
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
return grpcWebSrv, nil
}
}
25 changes: 18 additions & 7 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package grpc
import (
"fmt"
"net"
"strings"
"time"

"google.golang.org/grpc"
Expand All @@ -18,7 +19,7 @@ import (
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, net.Addr, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -53,16 +54,26 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, nil, err
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

listener, err := net.Listen("tcp", cfg.Address)
var proto, addr string
parts := strings.SplitN(cfg.Address, "://", 2)
// Default to using 'tcp' to maintain backwards compatibility with configurations that don't specify
// the network to use.
if len(parts) != 2 {
proto = "tcp"
addr = cfg.Address
} else {
proto, addr = parts[0], parts[1]
}
listener, err := net.Listen(proto, addr)
if err != nil {
return nil, err
return nil, nil, err
}

errCh := make(chan error)
Expand All @@ -75,10 +86,10 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config

select {
case err := <-errCh:
return nil, err
return nil, nil, err

case <-time.After(types.ServerStartTime):
case <-time.After(time.Duration(types.ServerStartTime.Load())):
// assume server started successfully
return grpcSrv, nil
return grpcSrv, listener.Addr(), nil
}
}
107 changes: 59 additions & 48 deletions server/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package server
// DONTCOVER

import (
"context"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -141,14 +142,14 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
withTM, _ := cmd.Flags().GetBool(flagWithTendermint)
if !withTM {
serverCtx.Logger.Info("starting ABCI without Tendermint")
return wrapCPUProfile(serverCtx, func() error {
return startStandAlone(serverCtx, appCreator)
return wrapCPUProfile(cmd.Context(), serverCtx, func() error {
return startStandAlone(cmd.Context(), serverCtx, appCreator)
})
}

// amino is needed here for backwards compatibility of REST routes
err = wrapCPUProfile(serverCtx, func() error {
return startInProcess(serverCtx, clientCtx, appCreator)
err = wrapCPUProfile(cmd.Context(), serverCtx, func() error {
return startInProcess(cmd.Context(), serverCtx, clientCtx, appCreator)
})
errCode, ok := err.(ErrorCode)
if !ok {
Expand Down Expand Up @@ -206,7 +207,7 @@ is performed. Note, when enabled, gRPC will also be automatically enabled.
return cmd
}

func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
func startStandAlone(parentCtx context.Context, ctx *Context, appCreator types.AppCreator) error {
addr := ctx.Viper.GetString(flagAddress)
transport := ctx.Viper.GetString(flagTransport)
home := ctx.Viper.GetString(flags.FlagHome)
Expand Down Expand Up @@ -260,10 +261,10 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error {
}()

// Wait for SIGINT or SIGTERM signal
return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}

func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
func startInProcess(parentCtx context.Context, ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error {
cfg := ctx.Config
home := cfg.RootDir

Expand Down Expand Up @@ -354,6 +355,32 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
return err
}

var (
grpcSrv *grpc.Server
grpcSrvAddr net.Addr
grpcWebSrv *http.Server
)

if config.GRPC.Enable {
grpcSrv, grpcSrvAddr, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}
defer grpcSrv.Stop()
if config.GRPCWeb.Enable {
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config)
if err != nil {
ctx.Logger.Error("failed to start grpc-web http server: ", err)
return err
}
defer func() {
if err := grpcWebSrv.Close(); err != nil {
ctx.Logger.Error("failed to close grpc-web http server: ", err)
}
}()
}
}

var apiSrv *api.Server
if config.API.Enable {
genDoc, err := genDocProvider()
Expand All @@ -364,11 +391,6 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID)

if config.GRPC.Enable {
_, port, err := net.SplitHostPort(config.GRPC.Address)
if err != nil {
return err
}

maxSendMsgSize := config.GRPC.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize
Expand All @@ -379,11 +401,10 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize
}

grpcAddress := fmt.Sprintf("127.0.0.1:%s", port)

grpcSrvAddrString := fmt.Sprintf("%s://%s", grpcSrvAddr.Network(), grpcSrvAddr.String())
// If grpc is enabled, configure grpc client for grpc gateway.
grpcClient, err := grpc.Dial(
grpcAddress,
grpcSrvAddrString,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultCallOptions(
grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()),
Expand All @@ -396,7 +417,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}

clientCtx = clientCtx.WithGRPCClient(grpcClient)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress)
ctx.Logger.Debug("grpc client assigned to client context", "target", grpcSrvAddrString)
}

apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server"))
Expand All @@ -416,40 +437,30 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime): // assume server started successfully
}
}

var (
grpcSrv *grpc.Server
grpcWebSrv *http.Server
)

if config.GRPC.Enable {
grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC)
if err != nil {
return err
}
defer grpcSrv.Stop()
if config.GRPCWeb.Enable {
grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config)
if err != nil {
ctx.Logger.Error("failed to start grpc-web http server: ", err)
return err
}
defer func() {
if err := grpcWebSrv.Close(); err != nil {
ctx.Logger.Error("failed to close grpc-web http server: ", err)
}
}()
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
}
}

// At this point it is safe to block the process if we're in gRPC only mode as
// we do not need to start Rosetta or handle any Tendermint related processes.
if gRPCOnly {
// Fix application shutdown
defer func() {
_ = app.Close()

if traceWriterCleanup != nil {
traceWriterCleanup()
}

if apiSrv != nil {
_ = apiSrv.Close()
}

ctx.Logger.Info("exiting...")
}()

// wait for signal capture and gracefully return
return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}

var rosettaSrv crgserver.Server
Expand Down Expand Up @@ -498,7 +509,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime): // assume server started successfully
case <-time.After(time.Duration(types.ServerStartTime.Load())): // assume server started successfully
}
}

Expand All @@ -520,7 +531,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App
}()

// wait for signal capture and gracefully return
return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}

func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
Expand All @@ -531,7 +542,7 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) {
}

// wrapCPUProfile runs callback in a goroutine, then wait for quit signals.
func wrapCPUProfile(ctx *Context, callback func() error) error {
func wrapCPUProfile(parentCtx context.Context, ctx *Context, callback func() error) error {
if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" {
f, err := os.Create(cpuProfile)
if err != nil {
Expand Down Expand Up @@ -561,8 +572,8 @@ func wrapCPUProfile(ctx *Context, callback func() error) error {
case err := <-errCh:
return err

case <-time.After(types.ServerStartTime):
case <-time.After(time.Duration(types.ServerStartTime.Load())):
}

return WaitForQuitSignals()
return WaitForQuitSignals(parentCtx)
}
7 changes: 6 additions & 1 deletion server/types/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package types
import (
"encoding/json"
"io"
"sync/atomic"
"time"

dbm "github.com/cometbft/cometbft-db"
Expand All @@ -22,7 +23,11 @@ import (

// ServerStartTime defines the time duration that the server need to stay running after startup
// for the startup be considered successful
const ServerStartTime = 5 * time.Second
var ServerStartTime = atomic.Int64{}

func init() {
ServerStartTime.Add(int64(5 * time.Second))
}

type (
// AppOptions defines an interface that is passed into an application
Expand Down
11 changes: 8 additions & 3 deletions server/util.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -381,11 +382,15 @@ func TrapSignal(cleanupFunc func()) {
}

// WaitForQuitSignals waits for SIGINT and SIGTERM and returns.
func WaitForQuitSignals() ErrorCode {
func WaitForQuitSignals(ctx context.Context) error {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigs
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
select {
case sig := <-sigs:
return ErrorCode{Code: int(sig.(syscall.Signal)) + 128}
case <-ctx.Done():
return nil
}
}

// GetAppDBBackend gets the backend type to use for the application DBs.
Expand Down
4 changes: 2 additions & 2 deletions testutil/network/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@ func startInProcess(cfg Config, val *Validator) error {
select {
case err := <-errCh:
return err
case <-time.After(srvtypes.ServerStartTime): // assume server started successfully
case <-time.After(time.Duration(srvtypes.ServerStartTime.Load())): // assume server started successfully
}

val.api = apiSrv
}

if val.AppConfig.GRPC.Enable {
grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
grpcSrv, _, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC)
if err != nil {
return err
}
Expand Down

0 comments on commit bef8a05

Please sign in to comment.