From 509202c318198491e9b2d93d6e66f0b1d92503d9 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Wed, 22 Feb 2023 11:09:08 -0500 Subject: [PATCH 1/8] refactor: cleanup server logic (backport #15041) --- CHANGELOG.md | 1 + go.mod | 1 + go.sum | 2 + server/api/server.go | 46 +++-- server/grpc/grpc_web.go | 20 ++- server/grpc/server.go | 49 ++++-- server/start.go | 331 ++++++++++++++++-------------------- server/types/app.go | 5 - server/util.go | 25 ++- testutil/network/network.go | 34 ++-- testutil/network/util.go | 51 +++--- 11 files changed, 294 insertions(+), 271 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3ca726fd72a6..cf2bb35811c6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (gov) [#15979](https://github.com/cosmos/cosmos-sdk/pull/15979) Improve gov error message when failing to convert v1 proposal to v1beta1. * (server) [#16061](https://github.com/cosmos/cosmos-sdk/pull/16061) add comet bootstrap command * (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands. +* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready. ### Bug Fixes diff --git a/go.mod b/go.mod index 9b8fa6be2d93..bcb7abfa59cb 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/tidwall/btree v1.5.0 golang.org/x/crypto v0.7.0 golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 + golang.org/x/sync v0.1.0 google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 google.golang.org/grpc v1.54.0 google.golang.org/protobuf v1.30.0 diff --git a/go.sum b/go.sum index e190a029afe7..14fba23e7c2e 100644 --- a/go.sum +++ b/go.sum @@ -1146,6 +1146,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/api/server.go b/server/api/server.go index 5bc406acfa38..978ec5f503c0 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "net" "net/http" @@ -50,6 +51,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) { switch strings.ToLower(key) { case grpctypes.GRPCBlockHeightHeader: return grpctypes.GRPCBlockHeightHeader, true + default: return runtime.DefaultHeaderMatcher(key) } @@ -86,9 +88,12 @@ func New(clientCtx client.Context, logger log.Logger) *Server { // Start starts the API server. Internally, the API server leverages Tendermint's // JSON RPC server. Configuration options are provided via config.APIConfig -// and are delegated to the Tendermint JSON RPC server. The process is -// non-blocking, so an external signal handler must be used. -func (s *Server) Start(cfg config.Config) error { +// and are delegated to the Tendermint JSON RPC server. +// +// Note, this creates a blocking process if the server is started successfully. +// Otherwise, an error is returned. The caller is expected to provide a Context +// that is properly canceled or closed to indicate the server should be stopped. +func (s *Server) Start(ctx context.Context, cfg config.Config) error { s.mtx.Lock() tmCfg := tmrpcserver.DefaultConfig() @@ -105,17 +110,38 @@ func (s *Server) Start(cfg config.Config) error { s.registerGRPCGatewayRoutes() s.listener = listener - var h http.Handler = s.Router s.mtx.Unlock() - if cfg.API.EnableUnsafeCORS { - allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) - return tmrpcserver.Serve(s.listener, allowAllCORS(h), s.logger, tmCfg) - } + errCh := make(chan error) + + // Start the API in an external goroutine as Serve is blocking and will return + // an error upon failure, which we'll send on the error channel that will be + // consumed by the for block below. + go func(enableUnsafeCORS bool) { + s.logger.Info("starting API server...", "address", cfg.API.Address) - s.logger.Info("starting API server...") - return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) + if enableUnsafeCORS { + allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) + errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg) + } else { + errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) + } + }(cfg.API.EnableUnsafeCORS) + + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. + select { + case <-ctx.Done(): + // The calling process cancelled or closed the provided context, so we must + // gracefully stop the API server. + s.logger.Info("stopping API server...", "address", cfg.API.Address) + return s.Close() + + case err := <-errCh: + s.logger.Error("failed to start API server", "err", err) + return err + } } // Close closes the API server. diff --git a/server/grpc/grpc_web.go b/server/grpc/grpc_web.go index 99040ae26302..31a4355165a9 100644 --- a/server/grpc/grpc_web.go +++ b/server/grpc/grpc_web.go @@ -1,19 +1,20 @@ package grpc import ( + "context" "fmt" "net/http" "time" "github.com/improbable-eng/grpc-web/go/grpcweb" + "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/server/config" - "github.com/cosmos/cosmos-sdk/server/types" ) // StartGRPCWeb starts a gRPC-Web server on the given address. -func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, error) { +func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, config config.Config) error { var options []grpcweb.Option if config.GRPCWeb.EnableUnsafeCORS { options = append(options, @@ -32,15 +33,24 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err errCh := make(chan error) go func() { + logger.Info("starting gRPC web server...", "address", config.GRPCWeb.Address) if err := grpcWebSrv.ListenAndServe(); err != nil { errCh <- fmt.Errorf("[grpc] failed to serve: %w", err) } }() + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. select { + case <-ctx.Done(): + // The calling process cancelled or closed the provided context, so we must + // gracefully stop the gRPC-web server. + logger.Info("stopping gRPC web server...", "address", config.GRPCWeb.Address) + grpcWebSrv.Close() + return nil + case err := <-errCh: - return nil, err - case <-time.After(types.ServerStartTime): // assume server started successfully - return grpcWebSrv, nil + logger.Error("failed to start gRPC Web server", "err", err) + return err } } diff --git a/server/grpc/server.go b/server/grpc/server.go index 78a8e1955af5..a04f8f2c782c 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -1,10 +1,11 @@ package grpc import ( + "context" "fmt" "net" - "time" + "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/client" @@ -16,8 +17,9 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -// StartGRPCServer starts a gRPC server on the given address. -func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { +// NewGRPCServer returns a correctly configured and initialized gRPC server. +// Note, the caller is responsible for starting the server. See StartGRPCServer. +func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { maxSendMsgSize := cfg.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize @@ -45,6 +47,7 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() { modes[m.String()] = (int32)(m) } + return modes }(), ChainID: clientCtx.ChainID, @@ -52,32 +55,50 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config InterfaceRegistry: clientCtx.InterfaceRegistry, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to register reflection service: %w", err) } // Reflection allows external clients to see what services and methods // the gRPC server exposes. gogoreflection.Register(grpcSrv) + return grpcSrv, nil +} + +// StartGRPCServer starts the provided gRPC server on the address specified in cfg. +// +// Note, this creates a blocking process if the server is started successfully. +// Otherwise, an error is returned. The caller is expected to provide a Context +// that is properly canceled or closed to indicate the server should be stopped. +func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error { listener, err := net.Listen("tcp", cfg.Address) if err != nil { - return nil, err + return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err) } errCh := make(chan error) + + // Start the gRPC in an external goroutine as Serve is blocking and will return + // an error upon failure, which we'll send on the error channel that will be + // consumed by the for block below. go func() { - err = grpcSrv.Serve(listener) - if err != nil { - errCh <- fmt.Errorf("failed to serve: %w", err) - } + logger.Info("starting gRPC server...", "address", cfg.Address) + errCh <- grpcSrv.Serve(listener) }() + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. select { - case err := <-errCh: - return nil, err + case <-ctx.Done(): + // The calling process cancelled or closed the provided context, so we must + // gracefully stop the gRPC server. + logger.Info("stopping gRPC server...", "address", cfg.Address) + grpcSrv.GracefulStop() + + return nil - case <-time.After(types.ServerStartTime): - // assume server started successfully - return grpcSrv, nil + case err := <-errCh: + logger.Error("failed to start gRPC server", "err", err) + return err } } diff --git a/server/start.go b/server/start.go index 208bfb8db674..7992e58a9ace 100644 --- a/server/start.go +++ b/server/start.go @@ -3,22 +3,21 @@ package server // DONTCOVER import ( + "context" "fmt" "net" - "net/http" "os" "runtime/pprof" - "time" "github.com/spf13/cobra" "github.com/tendermint/tendermint/abci/server" tcmd "github.com/tendermint/tendermint/cmd/cometbft/commands" - tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/p2p" pvm "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/client/local" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -29,11 +28,8 @@ import ( "github.com/cosmos/cosmos-sdk/server/api" serverconfig "github.com/cosmos/cosmos-sdk/server/config" servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" - "github.com/cosmos/cosmos-sdk/server/rosetta" - crgserver "github.com/cosmos/cosmos-sdk/server/rosetta/lib/server" "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/telemetry" - sdktypes "github.com/cosmos/cosmos-sdk/types" ) const ( @@ -142,17 +138,9 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. }) } - // amino is needed here for backwards compatibility of REST routes - err = wrapCPUProfile(serverCtx, func() error { + return wrapCPUProfile(serverCtx, func() error { return startInProcess(serverCtx, clientCtx, appCreator) }) - errCode, ok := err.(ErrorCode) - if !ok { - return err - } - - serverCtx.Logger.Debug(fmt.Sprintf("received quit signal: %d", errCode.Code)) - return nil }, } @@ -200,31 +188,30 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. return cmd } -func startStandAlone(ctx *Context, appCreator types.AppCreator) error { - addr := ctx.Viper.GetString(flagAddress) - transport := ctx.Viper.GetString(flagTransport) - home := ctx.Viper.GetString(flags.FlagHome) +func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { + addr := svrCtx.Viper.GetString(flagAddress) + transport := svrCtx.Viper.GetString(flagTransport) + home := svrCtx.Viper.GetString(flags.FlagHome) - db, err := openDB(home, GetAppDBBackend(ctx.Viper)) + db, err := openDB(home, GetAppDBBackend(svrCtx.Viper)) if err != nil { return err } - traceWriterFile := ctx.Viper.GetString(flagTraceStore) + traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) traceWriter, err := openTraceWriter(traceWriterFile) if err != nil { return err } - app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper) + app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper) - config, err := serverconfig.GetConfig(ctx.Viper) + config, err := serverconfig.GetConfig(svrCtx.Viper) if err != nil { return err } - _, err = startTelemetry(config) - if err != nil { + if _, err := startTelemetry(config); err != nil { return err } @@ -233,39 +220,58 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error { return fmt.Errorf("error creating listener: %v", err) } - svr.SetLogger(ctx.Logger.With("module", "abci-server")) + svr.SetLogger(svrCtx.Logger.With("module", "abci-server")) - err = svr.Start() - if err != nil { - tmos.Exit(err.Error()) - } + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) - defer func() { - if err = svr.Stop(); err != nil { - tmos.Exit(err.Error()) + // listen for quit signals so the calling parent process can gracefully exit + ListenForQuitSignals(cancelFn, svrCtx.Logger) + + g.Go(func() error { + if err := svr.Start(); err != nil { + svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err) + return err } - }() - // Wait for SIGINT or SIGTERM signal - return WaitForQuitSignals() + // Wait for the calling process to be cancelled or close the provided context, + // so we can gracefully stop the ABCI server. + <-ctx.Done() + svrCtx.Logger.Info("stopping the ABCI server...") + return svr.Stop() + }) + + return g.Wait() } -func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { - cfg := ctx.Config +func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error { + cfg := svrCtx.Config home := cfg.RootDir - db, err := openDB(home, GetAppDBBackend(ctx.Viper)) + db, err := openDB(home, GetAppDBBackend(svrCtx.Viper)) if err != nil { return err } - traceWriterFile := ctx.Viper.GetString(flagTraceStore) + traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) traceWriter, err := openTraceWriter(traceWriterFile) if err != nil { return err } - config, err := serverconfig.GetConfig(ctx.Viper) + // clean up the traceWriter when the server is shutting down + var traceWriterCleanup func() + + // if flagTraceStore is not used then traceWriter is nil + if traceWriter != nil { + traceWriterCleanup = func() { + if err = traceWriter.Close(); err != nil { + svrCtx.Logger.Error("failed to close trace writer", "err", err) + } + } + } + + config, err := serverconfig.GetConfig(svrCtx.Viper) if err != nil { return err } @@ -274,7 +280,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } - app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper) + app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper) nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile()) if err != nil { @@ -285,14 +291,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App var ( tmNode *node.Node - gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly) + gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly) ) if gRPCOnly { - ctx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") + svrCtx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") config.GRPC.Enable = true } else { - ctx.Logger.Info("starting node with ABCI Tendermint in-process") + svrCtx.Logger.Info("starting node with ABCI Tendermint in-process") tmNode, err = node.NewNode( cfg, @@ -302,7 +308,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App genDocProvider, node.DefaultDBProvider, node.DefaultMetricsProvider(cfg.Instrumentation), - ctx.Logger, + svrCtx.Logger, ) if err != nil { return err @@ -316,8 +322,8 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App // service if API or gRPC is enabled, and avoid doing so in the general // case, because it spawns a new local tendermint RPC client. if (config.API.Enable || config.GRPC.Enable) && tmNode != nil { - // re-assign for making the client available below - // do not use := to avoid shadowing clientCtx + // Re-assign for making the client available below do not use := to avoid + // shadowing the clientCtx variable. clientCtx = clientCtx.WithClient(local.New(tmNode)) app.RegisterTxService(clientCtx) @@ -333,211 +339,160 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } - var apiSrv *api.Server - if config.API.Enable { - genDoc, err := genDocProvider() - if err != nil { - return err - } - - clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) - - if config.GRPC.Enable { - _, port, err := net.SplitHostPort(config.GRPC.Address) - if err != nil { - return err - } - - maxSendMsgSize := config.GRPC.MaxSendMsgSize - if maxSendMsgSize == 0 { - maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize - } + var ( + apiSrv *api.Server + grpcSrv *grpc.Server + ) - maxRecvMsgSize := config.GRPC.MaxRecvMsgSize - if maxRecvMsgSize == 0 { - maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize - } + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) - grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) - - // If grpc is enabled, configure grpc client for grpc gateway. - grpcClient, err := grpc.Dial( - grpcAddress, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), - grpc.MaxCallRecvMsgSize(maxRecvMsgSize), - grpc.MaxCallSendMsgSize(maxSendMsgSize), - ), - ) - if err != nil { - return err - } + // listen for quit signals so the calling parent process can gracefully exit + ListenForQuitSignals(cancelFn, svrCtx.Logger) - clientCtx = clientCtx.WithGRPCClient(grpcClient) - ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) + if config.GRPC.Enable { + _, port, err := net.SplitHostPort(config.GRPC.Address) + if err != nil { + return err } - apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server")) - app.RegisterAPIRoutes(apiSrv, config.API) - if config.Telemetry.Enabled { - apiSrv.SetTelemetry(metrics) + maxSendMsgSize := config.GRPC.MaxSendMsgSize + if maxSendMsgSize == 0 { + maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize } - errCh := make(chan error) - - go func() { - if err := apiSrv.Start(config); err != nil { - errCh <- err - } - }() - - select { - case err := <-errCh: - return err - case <-time.After(types.ServerStartTime): // assume server started successfully + maxRecvMsgSize := config.GRPC.MaxRecvMsgSize + if maxRecvMsgSize == 0 { + maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize } - } - - var ( - grpcSrv *grpc.Server - grpcWebSrv *http.Server - ) - if config.GRPC.Enable { - grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC) + grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) + + // if gRPC is enabled, configure gRPC client for gRPC gateway + grpcClient, err := grpc.Dial( + grpcAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + ) if err != nil { return err } - defer grpcSrv.Stop() - if config.GRPCWeb.Enable { - grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config) - if err != nil { - ctx.Logger.Error("failed to start grpc-web http server: ", err) - return err - } - defer func() { - if err := grpcWebSrv.Close(); err != nil { - ctx.Logger.Error("failed to close grpc-web http server: ", err) - } - }() - } - } - // At this point it is safe to block the process if we're in gRPC only mode as - // we do not need to start Rosetta or handle any Tendermint related processes. - if gRPCOnly { - // wait for signal capture and gracefully return - return WaitForQuitSignals() - } - - var rosettaSrv crgserver.Server - if config.Rosetta.Enable { - offlineMode := config.Rosetta.Offline - - // If GRPC is not enabled rosetta cannot work in online mode, so it works in - // offline mode. - if !config.GRPC.Enable { - offlineMode = true - } + clientCtx = clientCtx.WithGRPCClient(grpcClient) + svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress) - minGasPrices, err := sdktypes.ParseDecCoins(config.MinGasPrices) + grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC) if err != nil { - ctx.Logger.Error("failed to parse minimum-gas-prices: ", err) return err } - conf := &rosetta.Config{ - Blockchain: config.Rosetta.Blockchain, - Network: config.Rosetta.Network, - TendermintRPC: ctx.Config.RPC.ListenAddress, - GRPCEndpoint: config.GRPC.Address, - Addr: config.Rosetta.Address, - Retries: config.Rosetta.Retries, - Offline: offlineMode, - GasToSuggest: config.Rosetta.GasToSuggest, - EnableFeeSuggestion: config.Rosetta.EnableFeeSuggestion, - GasPrices: minGasPrices.Sort(), - Codec: clientCtx.Codec.(*codec.ProtoCodec), - InterfaceRegistry: clientCtx.InterfaceRegistry, + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + g.Go(func() error { + return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv) + }) + + if config.GRPCWeb.Enable { + g.Go(func() error { + return servergrpc.StartGRPCWeb(ctx, svrCtx.Logger.With("module", "grpc-web"), grpcSrv, config) + }) } + } - rosettaSrv, err = rosetta.ServerFromConfig(conf) + if config.API.Enable { + genDoc, err := genDocProvider() if err != nil { return err } - errCh := make(chan error) - go func() { - if err := rosettaSrv.Start(); err != nil { - errCh <- err - } - }() + clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) - select { - case err := <-errCh: - return err + apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server")) + app.RegisterAPIRoutes(apiSrv, config.API) - case <-time.After(types.ServerStartTime): // assume server started successfully + if config.Telemetry.Enabled { + apiSrv.SetTelemetry(metrics) } + + g.Go(func() error { + return apiSrv.Start(ctx, config) + }) + } + + // At this point it is safe to block the process if we're in gRPC-only mode as + // we do not need to handle any CometBFT related processes. + if gRPCOnly { + // wait for signal capture and gracefully return + return g.Wait() } + // In case the operator has both gRPC and API servers disabled, there is + // nothing blocking this root process, so we need to block manually, so we'll + // create an empty blocking loop. + g.Go(func() error { + <-ctx.Done() + return nil + }) + + // deferred cleanup function defer func() { if tmNode != nil && tmNode.IsRunning() { _ = tmNode.Stop() } - if apiSrv != nil { - _ = apiSrv.Close() + if traceWriterCleanup != nil { + traceWriterCleanup() } - - ctx.Logger.Info("exiting...") }() // wait for signal capture and gracefully return - return WaitForQuitSignals() + return g.Wait() } func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { if !cfg.Telemetry.Enabled { return nil, nil } + return telemetry.New(cfg.Telemetry) } -// wrapCPUProfile runs callback in a goroutine, then wait for quit signals. -func wrapCPUProfile(ctx *Context, callback func() error) error { - if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" { +// wrapCPUProfile starts CPU profiling, if enabled, and executes the provided +// callbackFn in a separate goroutine, then will wait for that callback to +// return. +// +// NOTE: We expect the caller to handle graceful shutdown and signal handling. +func wrapCPUProfile(svrCtx *Context, callbackFn func() error) error { + if cpuProfile := svrCtx.Viper.GetString(flagCPUProfile); cpuProfile != "" { f, err := os.Create(cpuProfile) if err != nil { return err } - ctx.Logger.Info("starting CPU profiler", "profile", cpuProfile) + svrCtx.Logger.Info("starting CPU profiler", "profile", cpuProfile) + if err := pprof.StartCPUProfile(f); err != nil { return err } defer func() { - ctx.Logger.Info("stopping CPU profiler", "profile", cpuProfile) + svrCtx.Logger.Info("stopping CPU profiler", "profile", cpuProfile) pprof.StopCPUProfile() + if err := f.Close(); err != nil { - ctx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error()) + svrCtx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error()) } }() } errCh := make(chan error) go func() { - errCh <- callback() + errCh <- callbackFn() }() - select { - case err := <-errCh: - return err - - case <-time.After(types.ServerStartTime): - } - - return WaitForQuitSignals() + return <-errCh } diff --git a/server/types/app.go b/server/types/app.go index 124dcf4c25ad..7911266dd0f1 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -3,7 +3,6 @@ package types import ( "encoding/json" "io" - "time" "github.com/gogo/protobuf/grpc" "github.com/spf13/cobra" @@ -19,10 +18,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -// ServerStartTime defines the time duration that the server need to stay running after startup -// for the startup be considered successful -const ServerStartTime = 5 * time.Second - type ( // AppOptions defines an interface that is passed into an application // constructor, typically used to set BaseApp options that are either supplied diff --git a/server/util.go b/server/util.go index 019d0132085c..c39be5622a17 100644 --- a/server/util.go +++ b/server/util.go @@ -1,6 +1,7 @@ package server import ( + "context" "errors" "fmt" "io" @@ -355,12 +356,22 @@ func TrapSignal(cleanupFunc func()) { }() } -// WaitForQuitSignals waits for SIGINT and SIGTERM and returns. -func WaitForQuitSignals() ErrorCode { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigs - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} +// ListenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received, +// the cleanup function is called, indicating the caller can gracefully exit or +// return. +// +// Note, this performs a non-blocking process so the caller must ensure the +// corresponding context derived from the cancelFn is used correctly. +func ListenForQuitSignals(cancelFn context.CancelFunc, logger tmlog.Logger) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigCh + cancelFn() + + logger.Info("caught signal", "signal", sig.String()) + }() } // GetAppDBBackend gets the backend type to use for the application DBs. @@ -407,7 +418,7 @@ func openDB(rootDir string, backendType dbm.BackendType) (dbm.DB, error) { return dbm.NewDB("application", backendType, dataDir) } -func openTraceWriter(traceWriterFile string) (w io.Writer, err error) { +func openTraceWriter(traceWriterFile string) (w io.WriteCloser, err error) { if traceWriterFile == "" { return } diff --git a/testutil/network/network.go b/testutil/network/network.go index e08787435037..985e3806c5a2 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -22,6 +22,7 @@ import ( "github.com/tendermint/tendermint/node" tmclient "github.com/tendermint/tendermint/rpc/client" dbm "github.com/tendermint/tm-db" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/baseapp" @@ -164,10 +165,12 @@ type ( ValAddress sdk.ValAddress RPCClient tmclient.Client - tmNode *node.Node - api *api.Server - grpc *grpc.Server - grpcWeb *http.Server + tmNode *node.Node + api *api.Server + grpc *grpc.Server + grpcWeb *http.Server + errGroup *errgroup.Group + cancelFn context.CancelFunc } ) @@ -593,24 +596,25 @@ func (n *Network) Cleanup() { n.Logger.Log("cleaning up test network...") for _, v := range n.Validators { - if v.tmNode != nil && v.tmNode.IsRunning() { - _ = v.tmNode.Stop() - } + // cancel the validator's context which will signal to the gRPC and API + // goroutines that they should gracefully exit. + v.cancelFn() - if v.api != nil { - _ = v.api.Close() + if err := v.errGroup.Wait(); err != nil { + n.Logger.Log("unexpected error waiting for validator gRPC and API processes to exit", "err", err) } - if v.grpc != nil { - v.grpc.Stop() - if v.grpcWeb != nil { - _ = v.grpcWeb.Close() + if v.tmNode != nil && v.tmNode.IsRunning() { + if err := v.tmNode.Stop(); err != nil { + n.Logger.Log("failed to stop validator CometBFT node", "err", err) } } + + if v.grpcWeb != nil { + _ = v.grpcWeb.Close() + } } - // Give a brief pause for things to finish closing in other processes. Hopefully this helps with the address-in-use errors. - // 100ms chosen randomly. time.Sleep(100 * time.Millisecond) if n.Config.CleanupDir { diff --git a/testutil/network/util.go b/testutil/network/util.go index a29705c0fa38..e3d6a62411fa 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -1,10 +1,10 @@ package network import ( + "context" "encoding/json" "os" "path/filepath" - "time" tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/node" @@ -14,6 +14,7 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" + "golang.org/x/sync/errgroup" "github.com/cosmos/cosmos-sdk/server/api" servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" @@ -78,42 +79,38 @@ func startInProcess(cfg Config, val *Validator) error { } } - if val.APIAddress != "" { - apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server")) - app.RegisterAPIRoutes(apiSrv, val.AppConfig.API) + ctx := context.Background() + ctx, val.cancelFn = context.WithCancel(ctx) + val.errGroup, ctx = errgroup.WithContext(ctx) - errCh := make(chan error) + grpcCfg := val.AppConfig.GRPC - go func() { - if err := apiSrv.Start(*val.AppConfig); err != nil { - errCh <- err - } - }() - - select { - case err := <-errCh: + if grpcCfg.Enable { + grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg) + if err != nil { return err - case <-time.After(srvtypes.ServerStartTime): // assume server started successfully } - val.api = apiSrv + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + val.errGroup.Go(func() error { + return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv) + }) + + val.grpc = grpcSrv } - if val.AppConfig.GRPC.Enable { - grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) - if err != nil { - return err - } + if val.APIAddress != "" { + apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server")) + app.RegisterAPIRoutes(apiSrv, val.AppConfig.API) - val.grpc = grpcSrv + val.errGroup.Go(func() error { + return apiSrv.Start(ctx, *val.AppConfig) + }) - if val.AppConfig.GRPCWeb.Enable { - val.grpcWeb, err = servergrpc.StartGRPCWeb(grpcSrv, *val.AppConfig) - if err != nil { - return err - } - } + val.api = apiSrv } + return nil } From 42ca2d15bac8c5c9cfb51c060b5fd35df70c318a Mon Sep 17 00:00:00 2001 From: HuangYi Date: Thu, 18 May 2023 12:36:59 +0800 Subject: [PATCH 2/8] cleanup for easier review --- server/start.go | 166 +++++++++++++++++++++++++++++++++--------------- 1 file changed, 116 insertions(+), 50 deletions(-) diff --git a/server/start.go b/server/start.go index 7992e58a9ace..f72ee1450f9a 100644 --- a/server/start.go +++ b/server/start.go @@ -28,8 +28,11 @@ import ( "github.com/cosmos/cosmos-sdk/server/api" serverconfig "github.com/cosmos/cosmos-sdk/server/config" servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" + "github.com/cosmos/cosmos-sdk/server/rosetta" + crgserver "github.com/cosmos/cosmos-sdk/server/rosetta/lib/server" "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/telemetry" + sdktypes "github.com/cosmos/cosmos-sdk/types" ) const ( @@ -138,9 +141,17 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. }) } - return wrapCPUProfile(serverCtx, func() error { + // amino is needed here for backwards compatibility of REST routes + err = wrapCPUProfile(serverCtx, func() error { return startInProcess(serverCtx, clientCtx, appCreator) }) + errCode, ok := err.(ErrorCode) + if !ok { + return err + } + + serverCtx.Logger.Debug(fmt.Sprintf("received quit signal: %d", errCode.Code)) + return nil }, } @@ -211,7 +222,8 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { return err } - if _, err := startTelemetry(config); err != nil { + _, err = startTelemetry(config) + if err != nil { return err } @@ -234,7 +246,7 @@ func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { return err } - // Wait for the calling process to be cancelled or close the provided context, + // Wait for the calling process to be canceled or close the provided context, // so we can gracefully stop the ABCI server. <-ctx.Done() svrCtx.Logger.Info("stopping the ABCI server...") @@ -339,52 +351,71 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. return err } - var ( - apiSrv *api.Server - grpcSrv *grpc.Server - ) - ctx, cancelFn := context.WithCancel(context.Background()) g, ctx := errgroup.WithContext(ctx) // listen for quit signals so the calling parent process can gracefully exit ListenForQuitSignals(cancelFn, svrCtx.Logger) - if config.GRPC.Enable { - _, port, err := net.SplitHostPort(config.GRPC.Address) + var apiSrv *api.Server + if config.API.Enable { + genDoc, err := genDocProvider() if err != nil { return err } - maxSendMsgSize := config.GRPC.MaxSendMsgSize - if maxSendMsgSize == 0 { - maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize - } + clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) + + if config.GRPC.Enable { + _, port, err := net.SplitHostPort(config.GRPC.Address) + if err != nil { + return err + } + + maxSendMsgSize := config.GRPC.MaxSendMsgSize + if maxSendMsgSize == 0 { + maxSendMsgSize = serverconfig.DefaultGRPCMaxSendMsgSize + } - maxRecvMsgSize := config.GRPC.MaxRecvMsgSize - if maxRecvMsgSize == 0 { - maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize + maxRecvMsgSize := config.GRPC.MaxRecvMsgSize + if maxRecvMsgSize == 0 { + maxRecvMsgSize = serverconfig.DefaultGRPCMaxRecvMsgSize + } + + grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) + + // If grpc is enabled, configure grpc client for grpc gateway. + grpcClient, err := grpc.Dial( + grpcAddress, + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithDefaultCallOptions( + grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), + grpc.MaxCallRecvMsgSize(maxRecvMsgSize), + grpc.MaxCallSendMsgSize(maxSendMsgSize), + ), + ) + if err != nil { + return err + } + + clientCtx = clientCtx.WithGRPCClient(grpcClient) + svrCtx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) } - grpcAddress := fmt.Sprintf("127.0.0.1:%s", port) - - // if gRPC is enabled, configure gRPC client for gRPC gateway - grpcClient, err := grpc.Dial( - grpcAddress, - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithDefaultCallOptions( - grpc.ForceCodec(codec.NewProtoCodec(clientCtx.InterfaceRegistry).GRPCCodec()), - grpc.MaxCallRecvMsgSize(maxRecvMsgSize), - grpc.MaxCallSendMsgSize(maxSendMsgSize), - ), - ) - if err != nil { - return err + apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server")) + app.RegisterAPIRoutes(apiSrv, config.API) + if config.Telemetry.Enabled { + apiSrv.SetTelemetry(metrics) } - clientCtx = clientCtx.WithGRPCClient(grpcClient) - svrCtx.Logger.Debug("gRPC client assigned to client context", "target", grpcAddress) + g.Go(func() error { + return apiSrv.Start(ctx, config) + }) + } + var grpcSrv *grpc.Server + + if config.GRPC.Enable { grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC) if err != nil { return err @@ -403,33 +434,69 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. } } - if config.API.Enable { - genDoc, err := genDocProvider() + // At this point it is safe to block the process if we're in gRPC-only mode as + // we do not need to handle any CometBFT related processes. + if gRPCOnly { + // wait for signal capture and gracefully return + return g.Wait() + } + + var rosettaSrv crgserver.Server + if config.Rosetta.Enable { + offlineMode := config.Rosetta.Offline + + // If GRPC is not enabled rosetta cannot work in online mode, so it works in + // offline mode. + if !config.GRPC.Enable { + offlineMode = true + } + + minGasPrices, err := sdktypes.ParseDecCoins(config.MinGasPrices) if err != nil { + svrCtx.Logger.Error("failed to parse minimum-gas-prices: ", err) return err } - clientCtx := clientCtx.WithHomeDir(home).WithChainID(genDoc.ChainID) - - apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server")) - app.RegisterAPIRoutes(apiSrv, config.API) + conf := &rosetta.Config{ + Blockchain: config.Rosetta.Blockchain, + Network: config.Rosetta.Network, + TendermintRPC: svrCtx.Config.RPC.ListenAddress, + GRPCEndpoint: config.GRPC.Address, + Addr: config.Rosetta.Address, + Retries: config.Rosetta.Retries, + Offline: offlineMode, + GasToSuggest: config.Rosetta.GasToSuggest, + EnableFeeSuggestion: config.Rosetta.EnableFeeSuggestion, + GasPrices: minGasPrices.Sort(), + Codec: clientCtx.Codec.(*codec.ProtoCodec), + InterfaceRegistry: clientCtx.InterfaceRegistry, + } - if config.Telemetry.Enabled { - apiSrv.SetTelemetry(metrics) + rosettaSrv, err = rosetta.ServerFromConfig(conf) + if err != nil { + return err } g.Go(func() error { - return apiSrv.Start(ctx, config) + errCh := make(chan error) + go func() { + svrCtx.Logger.Info("starting rosetta server...", "address", config.Rosetta.Address) + if err := rosettaSrv.Start(); err != nil { + errCh <- err + } + }() + + select { + case <-ctx.Done(): + return nil + + case err := <-errCh: + svrCtx.Logger.Error("failed to start rosetta server", "err", err) + return err + } }) } - // At this point it is safe to block the process if we're in gRPC-only mode as - // we do not need to handle any CometBFT related processes. - if gRPCOnly { - // wait for signal capture and gracefully return - return g.Wait() - } - // In case the operator has both gRPC and API servers disabled, there is // nothing blocking this root process, so we need to block manually, so we'll // create an empty blocking loop. @@ -457,7 +524,6 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { if !cfg.Telemetry.Enabled { return nil, nil } - return telemetry.New(cfg.Telemetry) } From 1b894c3b813ab6cd05826ab2868261f0baa25bdf Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 19 May 2023 09:45:06 +0800 Subject: [PATCH 3/8] fix grpc-web server in test --- server/grpc/grpc_web.go | 10 +++++----- server/start.go | 2 +- testutil/network/network.go | 10 ---------- testutil/network/util.go | 9 ++++++--- 4 files changed, 12 insertions(+), 19 deletions(-) diff --git a/server/grpc/grpc_web.go b/server/grpc/grpc_web.go index 31a4355165a9..0e9a5cf6bef6 100644 --- a/server/grpc/grpc_web.go +++ b/server/grpc/grpc_web.go @@ -14,9 +14,9 @@ import ( ) // StartGRPCWeb starts a gRPC-Web server on the given address. -func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, config config.Config) error { +func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, config config.GRPCWebConfig) error { var options []grpcweb.Option - if config.GRPCWeb.EnableUnsafeCORS { + if config.EnableUnsafeCORS { options = append(options, grpcweb.WithOriginFunc(func(origin string) bool { return true @@ -26,14 +26,14 @@ func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, wrappedServer := grpcweb.WrapServer(grpcSrv, options...) grpcWebSrv := &http.Server{ - Addr: config.GRPCWeb.Address, + Addr: config.Address, Handler: wrappedServer, ReadHeaderTimeout: 500 * time.Millisecond, } errCh := make(chan error) go func() { - logger.Info("starting gRPC web server...", "address", config.GRPCWeb.Address) + logger.Info("starting gRPC web server...", "address", config.Address) if err := grpcWebSrv.ListenAndServe(); err != nil { errCh <- fmt.Errorf("[grpc] failed to serve: %w", err) } @@ -45,7 +45,7 @@ func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, case <-ctx.Done(): // The calling process cancelled or closed the provided context, so we must // gracefully stop the gRPC-web server. - logger.Info("stopping gRPC web server...", "address", config.GRPCWeb.Address) + logger.Info("stopping gRPC web server...", "address", config.Address) grpcWebSrv.Close() return nil diff --git a/server/start.go b/server/start.go index f72ee1450f9a..eb704dda4b6c 100644 --- a/server/start.go +++ b/server/start.go @@ -429,7 +429,7 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. if config.GRPCWeb.Enable { g.Go(func() error { - return servergrpc.StartGRPCWeb(ctx, svrCtx.Logger.With("module", "grpc-web"), grpcSrv, config) + return servergrpc.StartGRPCWeb(ctx, svrCtx.Logger.With("module", "grpc-web"), grpcSrv, config.GRPCWeb) }) } } diff --git a/testutil/network/network.go b/testutil/network/network.go index 985e3806c5a2..57a8f1029581 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "net/url" "os" "path/filepath" @@ -23,7 +22,6 @@ import ( tmclient "github.com/tendermint/tendermint/rpc/client" dbm "github.com/tendermint/tm-db" "golang.org/x/sync/errgroup" - "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" @@ -35,7 +33,6 @@ import ( cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" pruningtypes "github.com/cosmos/cosmos-sdk/pruning/types" "github.com/cosmos/cosmos-sdk/server" - "github.com/cosmos/cosmos-sdk/server/api" srvconfig "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/simapp" @@ -166,9 +163,6 @@ type ( RPCClient tmclient.Client tmNode *node.Node - api *api.Server - grpc *grpc.Server - grpcWeb *http.Server errGroup *errgroup.Group cancelFn context.CancelFunc } @@ -609,10 +603,6 @@ func (n *Network) Cleanup() { n.Logger.Log("failed to stop validator CometBFT node", "err", err) } } - - if v.grpcWeb != nil { - _ = v.grpcWeb.Close() - } } time.Sleep(100 * time.Millisecond) diff --git a/testutil/network/util.go b/testutil/network/util.go index e3d6a62411fa..058046fd0d46 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -97,7 +97,12 @@ func startInProcess(cfg Config, val *Validator) error { return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv) }) - val.grpc = grpcSrv + grpcWebCfg := val.AppConfig.GRPCWeb + if grpcWebCfg.Enable { + val.errGroup.Go(func() error { + return servergrpc.StartGRPCWeb(ctx, logger.With("module", "grpc-web"), grpcSrv, grpcWebCfg) + }) + } } if val.APIAddress != "" { @@ -107,8 +112,6 @@ func startInProcess(cfg Config, val *Validator) error { val.errGroup.Go(func() error { return apiSrv.Start(ctx, *val.AppConfig) }) - - val.api = apiSrv } return nil From 13731f64cd070d3f16ba9e57e15d3bb6d308c526 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 19 May 2023 10:26:46 +0800 Subject: [PATCH 4/8] cleanup --- client/grpc/tmservice/service_test.go | 2 +- server/api/server.go | 18 +++++++++--------- server/start.go | 2 +- testutil/network/util.go | 19 +++++++++---------- 4 files changed, 20 insertions(+), 21 deletions(-) diff --git a/client/grpc/tmservice/service_test.go b/client/grpc/tmservice/service_test.go index 226b2feb88c9..3d3d296c2f50 100644 --- a/client/grpc/tmservice/service_test.go +++ b/client/grpc/tmservice/service_test.go @@ -334,7 +334,7 @@ func (s *IntegrationTestSuite) TestABCIQuery() { } else { s.Require().NoError(err) s.Require().NotNil(res) - s.Require().Equal(res.Code, tc.expectedCode) + s.Require().Equal(tc.expectedCode, res.Code) } if tc.validQuery { diff --git a/server/api/server.go b/server/api/server.go index 978ec5f503c0..9af0f3b229e0 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -93,16 +93,16 @@ func New(clientCtx client.Context, logger log.Logger) *Server { // Note, this creates a blocking process if the server is started successfully. // Otherwise, an error is returned. The caller is expected to provide a Context // that is properly canceled or closed to indicate the server should be stopped. -func (s *Server) Start(ctx context.Context, cfg config.Config) error { +func (s *Server) Start(ctx context.Context, cfg config.APIConfig) error { s.mtx.Lock() tmCfg := tmrpcserver.DefaultConfig() - tmCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections) - tmCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second - tmCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second - tmCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes) + tmCfg.MaxOpenConnections = int(cfg.MaxOpenConnections) + tmCfg.ReadTimeout = time.Duration(cfg.RPCReadTimeout) * time.Second + tmCfg.WriteTimeout = time.Duration(cfg.RPCWriteTimeout) * time.Second + tmCfg.MaxBodyBytes = int64(cfg.RPCMaxBodyBytes) - listener, err := tmrpcserver.Listen(cfg.API.Address, tmCfg) + listener, err := tmrpcserver.Listen(cfg.Address, tmCfg) if err != nil { s.mtx.Unlock() return err @@ -119,7 +119,7 @@ func (s *Server) Start(ctx context.Context, cfg config.Config) error { // an error upon failure, which we'll send on the error channel that will be // consumed by the for block below. go func(enableUnsafeCORS bool) { - s.logger.Info("starting API server...", "address", cfg.API.Address) + s.logger.Info("starting API server...", "address", cfg.Address) if enableUnsafeCORS { allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) @@ -127,7 +127,7 @@ func (s *Server) Start(ctx context.Context, cfg config.Config) error { } else { errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) } - }(cfg.API.EnableUnsafeCORS) + }(cfg.EnableUnsafeCORS) // Start a blocking select to wait for an indication to stop the server or that // the server failed to start properly. @@ -135,7 +135,7 @@ func (s *Server) Start(ctx context.Context, cfg config.Config) error { case <-ctx.Done(): // The calling process cancelled or closed the provided context, so we must // gracefully stop the API server. - s.logger.Info("stopping API server...", "address", cfg.API.Address) + s.logger.Info("stopping API server...", "address", cfg.Address) return s.Close() case err := <-errCh: diff --git a/server/start.go b/server/start.go index eb704dda4b6c..c2cd01374c2c 100644 --- a/server/start.go +++ b/server/start.go @@ -409,7 +409,7 @@ func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types. } g.Go(func() error { - return apiSrv.Start(ctx, config) + return apiSrv.Start(ctx, config.API) }) } diff --git a/testutil/network/util.go b/testutil/network/util.go index 058046fd0d46..72437399f4c3 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -83,8 +83,16 @@ func startInProcess(cfg Config, val *Validator) error { ctx, val.cancelFn = context.WithCancel(ctx) val.errGroup, ctx = errgroup.WithContext(ctx) - grpcCfg := val.AppConfig.GRPC + if val.APIAddress != "" { + apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server")) + app.RegisterAPIRoutes(apiSrv, val.AppConfig.API) + val.errGroup.Go(func() error { + return apiSrv.Start(ctx, val.AppConfig.API) + }) + } + + grpcCfg := val.AppConfig.GRPC if grpcCfg.Enable { grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg) if err != nil { @@ -105,15 +113,6 @@ func startInProcess(cfg Config, val *Validator) error { } } - if val.APIAddress != "" { - apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server")) - app.RegisterAPIRoutes(apiSrv, val.AppConfig.API) - - val.errGroup.Go(func() error { - return apiSrv.Start(ctx, *val.AppConfig) - }) - } - return nil } From 58080eeedaab32f3edf8c3b379c13f9b927dfe7d Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 19 May 2023 11:57:21 +0800 Subject: [PATCH 5/8] fix TestCLIQueryConn --- client/context_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/context_test.go b/client/context_test.go index 82feeebe4dfe..052b43e97b87 100644 --- a/client/context_test.go +++ b/client/context_test.go @@ -152,6 +152,9 @@ func TestCLIQueryConn(t *testing.T) { require.NoError(t, err) defer n.Cleanup() + _, err = n.WaitForHeight(1) + require.NoError(t, err) + testClient := testdata.NewQueryClient(n.Validators[0].ClientCtx) res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) require.NoError(t, err) From 86f3c874cabdb56c615554c56b3e0b77991ce5b1 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 19 May 2023 12:05:58 +0800 Subject: [PATCH 6/8] fix grpc test --- x/distribution/client/testutil/grpc_query_suite.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/x/distribution/client/testutil/grpc_query_suite.go b/x/distribution/client/testutil/grpc_query_suite.go index 956449a39743..5bacb7142492 100644 --- a/x/distribution/client/testutil/grpc_query_suite.go +++ b/x/distribution/client/testutil/grpc_query_suite.go @@ -242,6 +242,9 @@ func (s *GRPCQueryTestSuite) TestQuerySlashesGRPC() { } func (s *GRPCQueryTestSuite) TestQueryDelegatorRewardsGRPC() { + _, err := s.network.WaitForHeight(2) + s.Require().NoError(err) + val := s.network.Validators[0] baseURL := val.APIAddress From 7b009281424d47cd3688008fd9c689a659d32017 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 19 May 2023 12:11:23 +0800 Subject: [PATCH 7/8] fix test --- client/grpc/tmservice/service_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/client/grpc/tmservice/service_test.go b/client/grpc/tmservice/service_test.go index 3d3d296c2f50..9c86a2d7136e 100644 --- a/client/grpc/tmservice/service_test.go +++ b/client/grpc/tmservice/service_test.go @@ -262,6 +262,10 @@ func (s *IntegrationTestSuite) TestValidatorSetByHeight_GRPCGateway() { } func (s *IntegrationTestSuite) TestABCIQuery() { + // proof query don't work on height 1 + _, err := s.network.WaitForHeight(2) + s.Require().NoError(err) + testCases := []struct { name string req *tmservice.ABCIQueryRequest @@ -332,6 +336,7 @@ func (s *IntegrationTestSuite) TestABCIQuery() { s.Require().Error(err) s.Require().Nil(res) } else { + fmt.Println("res", res) s.Require().NoError(err) s.Require().NotNil(res) s.Require().Equal(tc.expectedCode, res.Code) From 31c5010a2dd9f42fba1355d412f7665fb2e573b4 Mon Sep 17 00:00:00 2001 From: HuangYi Date: Fri, 19 May 2023 12:50:08 +0800 Subject: [PATCH 8/8] add missing WaitForHeight --- x/authz/client/testutil/tx.go | 3 +++ x/upgrade/client/testutil/suite.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/x/authz/client/testutil/tx.go b/x/authz/client/testutil/tx.go index 9ca24b00758b..46498ac7c406 100644 --- a/x/authz/client/testutil/tx.go +++ b/x/authz/client/testutil/tx.go @@ -44,6 +44,9 @@ func (s *IntegrationTestSuite) SetupSuite() { s.network, err = network.New(s.T(), s.T().TempDir(), s.cfg) s.Require().NoError(err) + _, err = s.network.WaitForHeight(1) + s.Require().NoError(err) + val := s.network.Validators[0] s.grantee = make([]sdk.AccAddress, 3) diff --git a/x/upgrade/client/testutil/suite.go b/x/upgrade/client/testutil/suite.go index d5e21ce47ae6..a4f7774317a1 100644 --- a/x/upgrade/client/testutil/suite.go +++ b/x/upgrade/client/testutil/suite.go @@ -41,6 +41,9 @@ func (s *IntegrationTestSuite) SetupSuite() { var err error s.network, err = network.New(s.T(), s.T().TempDir(), cfg) s.Require().NoError(err) + + _, err = s.network.WaitForHeight(1) + s.Require().NoError(err) } func (s *IntegrationTestSuite) TearDownSuite() {