diff --git a/CHANGELOG.md b/CHANGELOG.md index 27d224d77100..ac5cb706d4e6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -46,6 +46,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ * (server) [#16061](https://github.com/cosmos/cosmos-sdk/pull/16061) add comet bootstrap command * (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands. * (baseapp) [#16193](https://github.com/cosmos/cosmos-sdk/pull/16193) Add `Close` method to `BaseApp` for custom app to cleanup resource in graceful shutdown. +* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready. ### Bug Fixes diff --git a/client/context_test.go b/client/context_test.go index 82feeebe4dfe..052b43e97b87 100644 --- a/client/context_test.go +++ b/client/context_test.go @@ -152,6 +152,9 @@ func TestCLIQueryConn(t *testing.T) { require.NoError(t, err) defer n.Cleanup() + _, err = n.WaitForHeight(1) + require.NoError(t, err) + testClient := testdata.NewQueryClient(n.Validators[0].ClientCtx) res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"}) require.NoError(t, err) diff --git a/client/grpc/tmservice/service_test.go b/client/grpc/tmservice/service_test.go index 226b2feb88c9..9c86a2d7136e 100644 --- a/client/grpc/tmservice/service_test.go +++ b/client/grpc/tmservice/service_test.go @@ -262,6 +262,10 @@ func (s *IntegrationTestSuite) TestValidatorSetByHeight_GRPCGateway() { } func (s *IntegrationTestSuite) TestABCIQuery() { + // proof query don't work on height 1 + _, err := s.network.WaitForHeight(2) + s.Require().NoError(err) + testCases := []struct { name string req *tmservice.ABCIQueryRequest @@ -332,9 +336,10 @@ func (s *IntegrationTestSuite) TestABCIQuery() { s.Require().Error(err) s.Require().Nil(res) } else { + fmt.Println("res", res) s.Require().NoError(err) s.Require().NotNil(res) - s.Require().Equal(res.Code, tc.expectedCode) + s.Require().Equal(tc.expectedCode, res.Code) } if tc.validQuery { diff --git a/go.mod b/go.mod index 9b8fa6be2d93..bcb7abfa59cb 100644 --- a/go.mod +++ b/go.mod @@ -53,6 +53,7 @@ require ( github.com/tidwall/btree v1.5.0 golang.org/x/crypto v0.7.0 golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 + golang.org/x/sync v0.1.0 google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4 google.golang.org/grpc v1.54.0 google.golang.org/protobuf v1.30.0 diff --git a/go.sum b/go.sum index e190a029afe7..14fba23e7c2e 100644 --- a/go.sum +++ b/go.sum @@ -1146,6 +1146,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/server/api/server.go b/server/api/server.go index 5bc406acfa38..9af0f3b229e0 100644 --- a/server/api/server.go +++ b/server/api/server.go @@ -1,6 +1,7 @@ package api import ( + "context" "fmt" "net" "net/http" @@ -50,6 +51,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) { switch strings.ToLower(key) { case grpctypes.GRPCBlockHeightHeader: return grpctypes.GRPCBlockHeightHeader, true + default: return runtime.DefaultHeaderMatcher(key) } @@ -86,18 +88,21 @@ func New(clientCtx client.Context, logger log.Logger) *Server { // Start starts the API server. Internally, the API server leverages Tendermint's // JSON RPC server. Configuration options are provided via config.APIConfig -// and are delegated to the Tendermint JSON RPC server. The process is -// non-blocking, so an external signal handler must be used. -func (s *Server) Start(cfg config.Config) error { +// and are delegated to the Tendermint JSON RPC server. +// +// Note, this creates a blocking process if the server is started successfully. +// Otherwise, an error is returned. The caller is expected to provide a Context +// that is properly canceled or closed to indicate the server should be stopped. +func (s *Server) Start(ctx context.Context, cfg config.APIConfig) error { s.mtx.Lock() tmCfg := tmrpcserver.DefaultConfig() - tmCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections) - tmCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second - tmCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second - tmCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes) + tmCfg.MaxOpenConnections = int(cfg.MaxOpenConnections) + tmCfg.ReadTimeout = time.Duration(cfg.RPCReadTimeout) * time.Second + tmCfg.WriteTimeout = time.Duration(cfg.RPCWriteTimeout) * time.Second + tmCfg.MaxBodyBytes = int64(cfg.RPCMaxBodyBytes) - listener, err := tmrpcserver.Listen(cfg.API.Address, tmCfg) + listener, err := tmrpcserver.Listen(cfg.Address, tmCfg) if err != nil { s.mtx.Unlock() return err @@ -105,17 +110,38 @@ func (s *Server) Start(cfg config.Config) error { s.registerGRPCGatewayRoutes() s.listener = listener - var h http.Handler = s.Router s.mtx.Unlock() - if cfg.API.EnableUnsafeCORS { - allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) - return tmrpcserver.Serve(s.listener, allowAllCORS(h), s.logger, tmCfg) - } + errCh := make(chan error) + + // Start the API in an external goroutine as Serve is blocking and will return + // an error upon failure, which we'll send on the error channel that will be + // consumed by the for block below. + go func(enableUnsafeCORS bool) { + s.logger.Info("starting API server...", "address", cfg.Address) - s.logger.Info("starting API server...") - return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) + if enableUnsafeCORS { + allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"})) + errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg) + } else { + errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg) + } + }(cfg.EnableUnsafeCORS) + + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. + select { + case <-ctx.Done(): + // The calling process cancelled or closed the provided context, so we must + // gracefully stop the API server. + s.logger.Info("stopping API server...", "address", cfg.Address) + return s.Close() + + case err := <-errCh: + s.logger.Error("failed to start API server", "err", err) + return err + } } // Close closes the API server. diff --git a/server/grpc/grpc_web.go b/server/grpc/grpc_web.go index 99040ae26302..0e9a5cf6bef6 100644 --- a/server/grpc/grpc_web.go +++ b/server/grpc/grpc_web.go @@ -1,21 +1,22 @@ package grpc import ( + "context" "fmt" "net/http" "time" "github.com/improbable-eng/grpc-web/go/grpcweb" + "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/server/config" - "github.com/cosmos/cosmos-sdk/server/types" ) // StartGRPCWeb starts a gRPC-Web server on the given address. -func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, error) { +func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, config config.GRPCWebConfig) error { var options []grpcweb.Option - if config.GRPCWeb.EnableUnsafeCORS { + if config.EnableUnsafeCORS { options = append(options, grpcweb.WithOriginFunc(func(origin string) bool { return true @@ -25,22 +26,31 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err wrappedServer := grpcweb.WrapServer(grpcSrv, options...) grpcWebSrv := &http.Server{ - Addr: config.GRPCWeb.Address, + Addr: config.Address, Handler: wrappedServer, ReadHeaderTimeout: 500 * time.Millisecond, } errCh := make(chan error) go func() { + logger.Info("starting gRPC web server...", "address", config.Address) if err := grpcWebSrv.ListenAndServe(); err != nil { errCh <- fmt.Errorf("[grpc] failed to serve: %w", err) } }() + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. select { + case <-ctx.Done(): + // The calling process cancelled or closed the provided context, so we must + // gracefully stop the gRPC-web server. + logger.Info("stopping gRPC web server...", "address", config.Address) + grpcWebSrv.Close() + return nil + case err := <-errCh: - return nil, err - case <-time.After(types.ServerStartTime): // assume server started successfully - return grpcWebSrv, nil + logger.Error("failed to start gRPC Web server", "err", err) + return err } } diff --git a/server/grpc/server.go b/server/grpc/server.go index 78a8e1955af5..a04f8f2c782c 100644 --- a/server/grpc/server.go +++ b/server/grpc/server.go @@ -1,10 +1,11 @@ package grpc import ( + "context" "fmt" "net" - "time" + "github.com/tendermint/tendermint/libs/log" "google.golang.org/grpc" "github.com/cosmos/cosmos-sdk/client" @@ -16,8 +17,9 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -// StartGRPCServer starts a gRPC server on the given address. -func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { +// NewGRPCServer returns a correctly configured and initialized gRPC server. +// Note, the caller is responsible for starting the server. See StartGRPCServer. +func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) { maxSendMsgSize := cfg.MaxSendMsgSize if maxSendMsgSize == 0 { maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize @@ -45,6 +47,7 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() { modes[m.String()] = (int32)(m) } + return modes }(), ChainID: clientCtx.ChainID, @@ -52,32 +55,50 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config InterfaceRegistry: clientCtx.InterfaceRegistry, }) if err != nil { - return nil, err + return nil, fmt.Errorf("failed to register reflection service: %w", err) } // Reflection allows external clients to see what services and methods // the gRPC server exposes. gogoreflection.Register(grpcSrv) + return grpcSrv, nil +} + +// StartGRPCServer starts the provided gRPC server on the address specified in cfg. +// +// Note, this creates a blocking process if the server is started successfully. +// Otherwise, an error is returned. The caller is expected to provide a Context +// that is properly canceled or closed to indicate the server should be stopped. +func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error { listener, err := net.Listen("tcp", cfg.Address) if err != nil { - return nil, err + return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err) } errCh := make(chan error) + + // Start the gRPC in an external goroutine as Serve is blocking and will return + // an error upon failure, which we'll send on the error channel that will be + // consumed by the for block below. go func() { - err = grpcSrv.Serve(listener) - if err != nil { - errCh <- fmt.Errorf("failed to serve: %w", err) - } + logger.Info("starting gRPC server...", "address", cfg.Address) + errCh <- grpcSrv.Serve(listener) }() + // Start a blocking select to wait for an indication to stop the server or that + // the server failed to start properly. select { - case err := <-errCh: - return nil, err + case <-ctx.Done(): + // The calling process cancelled or closed the provided context, so we must + // gracefully stop the gRPC server. + logger.Info("stopping gRPC server...", "address", cfg.Address) + grpcSrv.GracefulStop() + + return nil - case <-time.After(types.ServerStartTime): - // assume server started successfully - return grpcSrv, nil + case err := <-errCh: + logger.Error("failed to start gRPC server", "err", err) + return err } } diff --git a/server/start.go b/server/start.go index 2ea63c5fe1a9..3c560d0b6efc 100644 --- a/server/start.go +++ b/server/start.go @@ -3,22 +3,21 @@ package server // DONTCOVER import ( + "context" "fmt" "net" - "net/http" "os" "runtime/pprof" - "time" "github.com/spf13/cobra" "github.com/tendermint/tendermint/abci/server" tcmd "github.com/tendermint/tendermint/cmd/cometbft/commands" - tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/node" "github.com/tendermint/tendermint/p2p" pvm "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" "github.com/tendermint/tendermint/rpc/client/local" + "golang.org/x/sync/errgroup" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -200,25 +199,25 @@ is performed. Note, when enabled, gRPC will also be automatically enabled. return cmd } -func startStandAlone(ctx *Context, appCreator types.AppCreator) error { - addr := ctx.Viper.GetString(flagAddress) - transport := ctx.Viper.GetString(flagTransport) - home := ctx.Viper.GetString(flags.FlagHome) +func startStandAlone(svrCtx *Context, appCreator types.AppCreator) error { + addr := svrCtx.Viper.GetString(flagAddress) + transport := svrCtx.Viper.GetString(flagTransport) + home := svrCtx.Viper.GetString(flags.FlagHome) - db, err := openDB(home, GetAppDBBackend(ctx.Viper)) + db, err := openDB(home, GetAppDBBackend(svrCtx.Viper)) if err != nil { return err } - traceWriterFile := ctx.Viper.GetString(flagTraceStore) + traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) traceWriter, err := openTraceWriter(traceWriterFile) if err != nil { return err } - app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper) + app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper) - config, err := serverconfig.GetConfig(ctx.Viper) + config, err := serverconfig.GetConfig(svrCtx.Viper) if err != nil { return err } @@ -233,43 +232,63 @@ func startStandAlone(ctx *Context, appCreator types.AppCreator) error { return fmt.Errorf("error creating listener: %v", err) } - svr.SetLogger(ctx.Logger.With("module", "abci-server")) + svr.SetLogger(svrCtx.Logger.With("module", "abci-server")) - err = svr.Start() - if err != nil { - tmos.Exit(err.Error()) - } + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) - defer func() { - if err = svr.Stop(); err != nil { - tmos.Exit(err.Error()) + // listen for quit signals so the calling parent process can gracefully exit + ListenForQuitSignals(cancelFn, svrCtx.Logger) + + g.Go(func() error { + if err := svr.Start(); err != nil { + svrCtx.Logger.Error("failed to start out-of-process ABCI server", "err", err) + return err } if err = app.Close(); err != nil { - tmos.Exit(err.Error()) + svrCtx.Logger.Error("failed to close application", "err", err) + return err } - }() - // Wait for SIGINT or SIGTERM signal - return WaitForQuitSignals() + // Wait for the calling process to be canceled or close the provided context, + // so we can gracefully stop the ABCI server. + <-ctx.Done() + svrCtx.Logger.Info("stopping the ABCI server...") + return svr.Stop() + }) + + return g.Wait() } -func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.AppCreator) error { - cfg := ctx.Config +func startInProcess(svrCtx *Context, clientCtx client.Context, appCreator types.AppCreator) error { + cfg := svrCtx.Config home := cfg.RootDir - db, err := openDB(home, GetAppDBBackend(ctx.Viper)) + db, err := openDB(home, GetAppDBBackend(svrCtx.Viper)) if err != nil { return err } - traceWriterFile := ctx.Viper.GetString(flagTraceStore) + traceWriterFile := svrCtx.Viper.GetString(flagTraceStore) traceWriter, err := openTraceWriter(traceWriterFile) if err != nil { return err } - config, err := serverconfig.GetConfig(ctx.Viper) + // clean up the traceWriter when the server is shutting down + var traceWriterCleanup func() + + // if flagTraceStore is not used then traceWriter is nil + if traceWriter != nil { + traceWriterCleanup = func() { + if err = traceWriter.Close(); err != nil { + svrCtx.Logger.Error("failed to close trace writer", "err", err) + } + } + } + + config, err := serverconfig.GetConfig(svrCtx.Viper) if err != nil { return err } @@ -278,7 +297,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } - app := appCreator(ctx.Logger, db, traceWriter, ctx.Viper) + app := appCreator(svrCtx.Logger, db, traceWriter, svrCtx.Viper) nodeKey, err := p2p.LoadOrGenNodeKey(cfg.NodeKeyFile()) if err != nil { @@ -289,14 +308,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App var ( tmNode *node.Node - gRPCOnly = ctx.Viper.GetBool(flagGRPCOnly) + gRPCOnly = svrCtx.Viper.GetBool(flagGRPCOnly) ) if gRPCOnly { - ctx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") + svrCtx.Logger.Info("starting node in gRPC only mode; Tendermint is disabled") config.GRPC.Enable = true } else { - ctx.Logger.Info("starting node with ABCI Tendermint in-process") + svrCtx.Logger.Info("starting node with ABCI Tendermint in-process") tmNode, err = node.NewNode( cfg, @@ -306,7 +325,7 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App genDocProvider, node.DefaultDBProvider, node.DefaultMetricsProvider(cfg.Instrumentation), - ctx.Logger, + svrCtx.Logger, ) if err != nil { return err @@ -320,8 +339,8 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App // service if API or gRPC is enabled, and avoid doing so in the general // case, because it spawns a new local tendermint RPC client. if (config.API.Enable || config.GRPC.Enable) && tmNode != nil { - // re-assign for making the client available below - // do not use := to avoid shadowing clientCtx + // Re-assign for making the client available below do not use := to avoid + // shadowing the clientCtx variable. clientCtx = clientCtx.WithClient(local.New(tmNode)) app.RegisterTxService(clientCtx) @@ -337,6 +356,12 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } + ctx, cancelFn := context.WithCancel(context.Background()) + g, ctx := errgroup.WithContext(ctx) + + // listen for quit signals so the calling parent process can gracefully exit + ListenForQuitSignals(cancelFn, svrCtx.Logger) + var apiSrv *api.Server if config.API.Enable { genDoc, err := genDocProvider() @@ -379,60 +404,46 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App } clientCtx = clientCtx.WithGRPCClient(grpcClient) - ctx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) + svrCtx.Logger.Debug("grpc client assigned to client context", "target", grpcAddress) } - apiSrv = api.New(clientCtx, ctx.Logger.With("module", "api-server")) + apiSrv = api.New(clientCtx, svrCtx.Logger.With("module", "api-server")) app.RegisterAPIRoutes(apiSrv, config.API) if config.Telemetry.Enabled { apiSrv.SetTelemetry(metrics) } - errCh := make(chan error) - - go func() { - if err := apiSrv.Start(config); err != nil { - errCh <- err - } - }() - select { - case err := <-errCh: - return err - - case <-time.After(types.ServerStartTime): // assume server started successfully - } + g.Go(func() error { + return apiSrv.Start(ctx, config.API) + }) } - var ( - grpcSrv *grpc.Server - grpcWebSrv *http.Server - ) + var grpcSrv *grpc.Server if config.GRPC.Enable { - grpcSrv, err = servergrpc.StartGRPCServer(clientCtx, app, config.GRPC) + grpcSrv, err = servergrpc.NewGRPCServer(clientCtx, app, config.GRPC) if err != nil { return err } - defer grpcSrv.Stop() + + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + g.Go(func() error { + return servergrpc.StartGRPCServer(ctx, svrCtx.Logger.With("module", "grpc-server"), config.GRPC, grpcSrv) + }) + if config.GRPCWeb.Enable { - grpcWebSrv, err = servergrpc.StartGRPCWeb(grpcSrv, config) - if err != nil { - ctx.Logger.Error("failed to start grpc-web http server: ", err) - return err - } - defer func() { - if err := grpcWebSrv.Close(); err != nil { - ctx.Logger.Error("failed to close grpc-web http server: ", err) - } - }() + g.Go(func() error { + return servergrpc.StartGRPCWeb(ctx, svrCtx.Logger.With("module", "grpc-web"), grpcSrv, config.GRPCWeb) + }) } } - // At this point it is safe to block the process if we're in gRPC only mode as - // we do not need to start Rosetta or handle any Tendermint related processes. + // At this point it is safe to block the process if we're in gRPC-only mode as + // we do not need to handle any CometBFT related processes. if gRPCOnly { // wait for signal capture and gracefully return - return WaitForQuitSignals() + return g.Wait() } var rosettaSrv crgserver.Server @@ -447,14 +458,14 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App minGasPrices, err := sdktypes.ParseDecCoins(config.MinGasPrices) if err != nil { - ctx.Logger.Error("failed to parse minimum-gas-prices: ", err) + svrCtx.Logger.Error("failed to parse minimum-gas-prices: ", err) return err } conf := &rosetta.Config{ Blockchain: config.Rosetta.Blockchain, Network: config.Rosetta.Network, - TendermintRPC: ctx.Config.RPC.ListenAddress, + TendermintRPC: svrCtx.Config.RPC.ListenAddress, GRPCEndpoint: config.GRPC.Address, Addr: config.Rosetta.Address, Retries: config.Rosetta.Retries, @@ -471,36 +482,48 @@ func startInProcess(ctx *Context, clientCtx client.Context, appCreator types.App return err } - errCh := make(chan error) - go func() { - if err := rosettaSrv.Start(); err != nil { - errCh <- err - } - }() + g.Go(func() error { + errCh := make(chan error) + go func() { + svrCtx.Logger.Info("starting rosetta server...", "address", config.Rosetta.Address) + if err := rosettaSrv.Start(); err != nil { + errCh <- err + } + }() - select { - case err := <-errCh: - return err + select { + case <-ctx.Done(): + return nil - case <-time.After(types.ServerStartTime): // assume server started successfully - } + case err := <-errCh: + svrCtx.Logger.Error("failed to start rosetta server", "err", err) + return err + } + }) } + // In case the operator has both gRPC and API servers disabled, there is + // nothing blocking this root process, so we need to block manually, so we'll + // create an empty blocking loop. + g.Go(func() error { + <-ctx.Done() + return nil + }) + + // deferred cleanup function defer func() { if tmNode != nil && tmNode.IsRunning() { _ = tmNode.Stop() _ = app.Close() } - if apiSrv != nil { - _ = apiSrv.Close() + if traceWriterCleanup != nil { + traceWriterCleanup() } - - ctx.Logger.Info("exiting...") }() // wait for signal capture and gracefully return - return WaitForQuitSignals() + return g.Wait() } func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { @@ -510,39 +533,38 @@ func startTelemetry(cfg serverconfig.Config) (*telemetry.Metrics, error) { return telemetry.New(cfg.Telemetry) } -// wrapCPUProfile runs callback in a goroutine, then wait for quit signals. -func wrapCPUProfile(ctx *Context, callback func() error) error { - if cpuProfile := ctx.Viper.GetString(flagCPUProfile); cpuProfile != "" { +// wrapCPUProfile starts CPU profiling, if enabled, and executes the provided +// callbackFn in a separate goroutine, then will wait for that callback to +// return. +// +// NOTE: We expect the caller to handle graceful shutdown and signal handling. +func wrapCPUProfile(svrCtx *Context, callbackFn func() error) error { + if cpuProfile := svrCtx.Viper.GetString(flagCPUProfile); cpuProfile != "" { f, err := os.Create(cpuProfile) if err != nil { return err } - ctx.Logger.Info("starting CPU profiler", "profile", cpuProfile) + svrCtx.Logger.Info("starting CPU profiler", "profile", cpuProfile) + if err := pprof.StartCPUProfile(f); err != nil { return err } defer func() { - ctx.Logger.Info("stopping CPU profiler", "profile", cpuProfile) + svrCtx.Logger.Info("stopping CPU profiler", "profile", cpuProfile) pprof.StopCPUProfile() + if err := f.Close(); err != nil { - ctx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error()) + svrCtx.Logger.Info("failed to close cpu-profile file", "profile", cpuProfile, "err", err.Error()) } }() } errCh := make(chan error) go func() { - errCh <- callback() + errCh <- callbackFn() }() - select { - case err := <-errCh: - return err - - case <-time.After(types.ServerStartTime): - } - - return WaitForQuitSignals() + return <-errCh } diff --git a/server/types/app.go b/server/types/app.go index d886936f75c5..0ada9ca9a9fb 100644 --- a/server/types/app.go +++ b/server/types/app.go @@ -3,7 +3,6 @@ package types import ( "encoding/json" "io" - "time" "github.com/gogo/protobuf/grpc" "github.com/spf13/cobra" @@ -19,10 +18,6 @@ import ( sdk "github.com/cosmos/cosmos-sdk/types" ) -// ServerStartTime defines the time duration that the server need to stay running after startup -// for the startup be considered successful -const ServerStartTime = 5 * time.Second - type ( // AppOptions defines an interface that is passed into an application // constructor, typically used to set BaseApp options that are either supplied diff --git a/server/util.go b/server/util.go index 019d0132085c..c39be5622a17 100644 --- a/server/util.go +++ b/server/util.go @@ -1,6 +1,7 @@ package server import ( + "context" "errors" "fmt" "io" @@ -355,12 +356,22 @@ func TrapSignal(cleanupFunc func()) { }() } -// WaitForQuitSignals waits for SIGINT and SIGTERM and returns. -func WaitForQuitSignals() ErrorCode { - sigs := make(chan os.Signal, 1) - signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) - sig := <-sigs - return ErrorCode{Code: int(sig.(syscall.Signal)) + 128} +// ListenForQuitSignals listens for SIGINT and SIGTERM. When a signal is received, +// the cleanup function is called, indicating the caller can gracefully exit or +// return. +// +// Note, this performs a non-blocking process so the caller must ensure the +// corresponding context derived from the cancelFn is used correctly. +func ListenForQuitSignals(cancelFn context.CancelFunc, logger tmlog.Logger) { + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigCh + cancelFn() + + logger.Info("caught signal", "signal", sig.String()) + }() } // GetAppDBBackend gets the backend type to use for the application DBs. @@ -407,7 +418,7 @@ func openDB(rootDir string, backendType dbm.BackendType) (dbm.DB, error) { return dbm.NewDB("application", backendType, dataDir) } -func openTraceWriter(traceWriterFile string) (w io.Writer, err error) { +func openTraceWriter(traceWriterFile string) (w io.WriteCloser, err error) { if traceWriterFile == "" { return } diff --git a/testutil/network/network.go b/testutil/network/network.go index e08787435037..57a8f1029581 100644 --- a/testutil/network/network.go +++ b/testutil/network/network.go @@ -6,7 +6,6 @@ import ( "encoding/json" "errors" "fmt" - "net/http" "net/url" "os" "path/filepath" @@ -22,7 +21,7 @@ import ( "github.com/tendermint/tendermint/node" tmclient "github.com/tendermint/tendermint/rpc/client" dbm "github.com/tendermint/tm-db" - "google.golang.org/grpc" + "golang.org/x/sync/errgroup" "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/client" @@ -34,7 +33,6 @@ import ( cryptotypes "github.com/cosmos/cosmos-sdk/crypto/types" pruningtypes "github.com/cosmos/cosmos-sdk/pruning/types" "github.com/cosmos/cosmos-sdk/server" - "github.com/cosmos/cosmos-sdk/server/api" srvconfig "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/simapp" @@ -164,10 +162,9 @@ type ( ValAddress sdk.ValAddress RPCClient tmclient.Client - tmNode *node.Node - api *api.Server - grpc *grpc.Server - grpcWeb *http.Server + tmNode *node.Node + errGroup *errgroup.Group + cancelFn context.CancelFunc } ) @@ -593,24 +590,21 @@ func (n *Network) Cleanup() { n.Logger.Log("cleaning up test network...") for _, v := range n.Validators { - if v.tmNode != nil && v.tmNode.IsRunning() { - _ = v.tmNode.Stop() - } + // cancel the validator's context which will signal to the gRPC and API + // goroutines that they should gracefully exit. + v.cancelFn() - if v.api != nil { - _ = v.api.Close() + if err := v.errGroup.Wait(); err != nil { + n.Logger.Log("unexpected error waiting for validator gRPC and API processes to exit", "err", err) } - if v.grpc != nil { - v.grpc.Stop() - if v.grpcWeb != nil { - _ = v.grpcWeb.Close() + if v.tmNode != nil && v.tmNode.IsRunning() { + if err := v.tmNode.Stop(); err != nil { + n.Logger.Log("failed to stop validator CometBFT node", "err", err) } } } - // Give a brief pause for things to finish closing in other processes. Hopefully this helps with the address-in-use errors. - // 100ms chosen randomly. time.Sleep(100 * time.Millisecond) if n.Config.CleanupDir { diff --git a/testutil/network/util.go b/testutil/network/util.go index a29705c0fa38..72437399f4c3 100644 --- a/testutil/network/util.go +++ b/testutil/network/util.go @@ -1,10 +1,10 @@ package network import ( + "context" "encoding/json" "os" "path/filepath" - "time" tmos "github.com/tendermint/tendermint/libs/os" "github.com/tendermint/tendermint/node" @@ -14,6 +14,7 @@ import ( "github.com/tendermint/tendermint/rpc/client/local" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" + "golang.org/x/sync/errgroup" "github.com/cosmos/cosmos-sdk/server/api" servergrpc "github.com/cosmos/cosmos-sdk/server/grpc" @@ -78,42 +79,40 @@ func startInProcess(cfg Config, val *Validator) error { } } + ctx := context.Background() + ctx, val.cancelFn = context.WithCancel(ctx) + val.errGroup, ctx = errgroup.WithContext(ctx) + if val.APIAddress != "" { apiSrv := api.New(val.ClientCtx, logger.With("module", "api-server")) app.RegisterAPIRoutes(apiSrv, val.AppConfig.API) - errCh := make(chan error) - - go func() { - if err := apiSrv.Start(*val.AppConfig); err != nil { - errCh <- err - } - }() - - select { - case err := <-errCh: - return err - case <-time.After(srvtypes.ServerStartTime): // assume server started successfully - } - - val.api = apiSrv + val.errGroup.Go(func() error { + return apiSrv.Start(ctx, val.AppConfig.API) + }) } - if val.AppConfig.GRPC.Enable { - grpcSrv, err := servergrpc.StartGRPCServer(val.ClientCtx, app, val.AppConfig.GRPC) + grpcCfg := val.AppConfig.GRPC + if grpcCfg.Enable { + grpcSrv, err := servergrpc.NewGRPCServer(val.ClientCtx, app, grpcCfg) if err != nil { return err } - val.grpc = grpcSrv - - if val.AppConfig.GRPCWeb.Enable { - val.grpcWeb, err = servergrpc.StartGRPCWeb(grpcSrv, *val.AppConfig) - if err != nil { - return err - } + // Start the gRPC server in a goroutine. Note, the provided ctx will ensure + // that the server is gracefully shut down. + val.errGroup.Go(func() error { + return servergrpc.StartGRPCServer(ctx, logger.With("module", "grpc-server"), grpcCfg, grpcSrv) + }) + + grpcWebCfg := val.AppConfig.GRPCWeb + if grpcWebCfg.Enable { + val.errGroup.Go(func() error { + return servergrpc.StartGRPCWeb(ctx, logger.With("module", "grpc-web"), grpcSrv, grpcWebCfg) + }) } } + return nil } diff --git a/x/authz/client/testutil/tx.go b/x/authz/client/testutil/tx.go index 9ca24b00758b..46498ac7c406 100644 --- a/x/authz/client/testutil/tx.go +++ b/x/authz/client/testutil/tx.go @@ -44,6 +44,9 @@ func (s *IntegrationTestSuite) SetupSuite() { s.network, err = network.New(s.T(), s.T().TempDir(), s.cfg) s.Require().NoError(err) + _, err = s.network.WaitForHeight(1) + s.Require().NoError(err) + val := s.network.Validators[0] s.grantee = make([]sdk.AccAddress, 3) diff --git a/x/distribution/client/testutil/grpc_query_suite.go b/x/distribution/client/testutil/grpc_query_suite.go index 956449a39743..5bacb7142492 100644 --- a/x/distribution/client/testutil/grpc_query_suite.go +++ b/x/distribution/client/testutil/grpc_query_suite.go @@ -242,6 +242,9 @@ func (s *GRPCQueryTestSuite) TestQuerySlashesGRPC() { } func (s *GRPCQueryTestSuite) TestQueryDelegatorRewardsGRPC() { + _, err := s.network.WaitForHeight(2) + s.Require().NoError(err) + val := s.network.Validators[0] baseURL := val.APIAddress diff --git a/x/upgrade/client/testutil/suite.go b/x/upgrade/client/testutil/suite.go index d5e21ce47ae6..a4f7774317a1 100644 --- a/x/upgrade/client/testutil/suite.go +++ b/x/upgrade/client/testutil/suite.go @@ -41,6 +41,9 @@ func (s *IntegrationTestSuite) SetupSuite() { var err error s.network, err = network.New(s.T(), s.T().TempDir(), cfg) s.Require().NoError(err) + + _, err = s.network.WaitForHeight(1) + s.Require().NoError(err) } func (s *IntegrationTestSuite) TearDownSuite() {