Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup server logic (backport #15041) #16203

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
* (server) [#16061](https://github.com/cosmos/cosmos-sdk/pull/16061) add comet bootstrap command
* (store) [#16067](https://github.com/cosmos/cosmos-sdk/pull/16067) Add local snapshots management commands.
* (baseapp) [#16193](https://github.com/cosmos/cosmos-sdk/pull/16193) Add `Close` method to `BaseApp` for custom app to cleanup resource in graceful shutdown.
* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready.

### Bug Fixes

Expand Down
3 changes: 3 additions & 0 deletions client/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ func TestCLIQueryConn(t *testing.T) {
require.NoError(t, err)
defer n.Cleanup()

_, err = n.WaitForHeight(1)
require.NoError(t, err)

testClient := testdata.NewQueryClient(n.Validators[0].ClientCtx)
res, err := testClient.Echo(context.Background(), &testdata.EchoRequest{Message: "hello"})
require.NoError(t, err)
Expand Down
7 changes: 6 additions & 1 deletion client/grpc/tmservice/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,10 @@ func (s *IntegrationTestSuite) TestValidatorSetByHeight_GRPCGateway() {
}

func (s *IntegrationTestSuite) TestABCIQuery() {
// proof query don't work on height 1
_, err := s.network.WaitForHeight(2)
s.Require().NoError(err)

testCases := []struct {
name string
req *tmservice.ABCIQueryRequest
Expand Down Expand Up @@ -332,9 +336,10 @@ func (s *IntegrationTestSuite) TestABCIQuery() {
s.Require().Error(err)
s.Require().Nil(res)
} else {
fmt.Println("res", res)
s.Require().NoError(err)
s.Require().NotNil(res)
s.Require().Equal(res.Code, tc.expectedCode)
s.Require().Equal(tc.expectedCode, res.Code)
}

if tc.validQuery {
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ require (
github.com/tidwall/btree v1.5.0
golang.org/x/crypto v0.7.0
golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0
golang.org/x/sync v0.1.0
google.golang.org/genproto v0.0.0-20230306155012-7f2fa6fef1f4
google.golang.org/grpc v1.54.0
google.golang.org/protobuf v1.30.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1146,6 +1146,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
56 changes: 41 additions & 15 deletions server/api/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -50,6 +51,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) {
switch strings.ToLower(key) {
case grpctypes.GRPCBlockHeightHeader:
return grpctypes.GRPCBlockHeightHeader, true

default:
return runtime.DefaultHeaderMatcher(key)
}
Expand Down Expand Up @@ -86,36 +88,60 @@ func New(clientCtx client.Context, logger log.Logger) *Server {

// Start starts the API server. Internally, the API server leverages Tendermint's
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the Tendermint JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
// and are delegated to the Tendermint JSON RPC server.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func (s *Server) Start(ctx context.Context, cfg config.APIConfig) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

s.mtx.Lock()

tmCfg := tmrpcserver.DefaultConfig()
tmCfg.MaxOpenConnections = int(cfg.API.MaxOpenConnections)
tmCfg.ReadTimeout = time.Duration(cfg.API.RPCReadTimeout) * time.Second
tmCfg.WriteTimeout = time.Duration(cfg.API.RPCWriteTimeout) * time.Second
tmCfg.MaxBodyBytes = int64(cfg.API.RPCMaxBodyBytes)
tmCfg.MaxOpenConnections = int(cfg.MaxOpenConnections)
tmCfg.ReadTimeout = time.Duration(cfg.RPCReadTimeout) * time.Second
tmCfg.WriteTimeout = time.Duration(cfg.RPCWriteTimeout) * time.Second
tmCfg.MaxBodyBytes = int64(cfg.RPCMaxBodyBytes)

listener, err := tmrpcserver.Listen(cfg.API.Address, tmCfg)
listener, err := tmrpcserver.Listen(cfg.Address, tmCfg)
if err != nil {
s.mtx.Unlock()
return err
}

s.registerGRPCGatewayRoutes()
s.listener = listener
var h http.Handler = s.Router

s.mtx.Unlock()

if cfg.API.EnableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
return tmrpcserver.Serve(s.listener, allowAllCORS(h), s.logger, tmCfg)
}
errCh := make(chan error)

// Start the API in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func(enableUnsafeCORS bool) {
s.logger.Info("starting API server...", "address", cfg.Address)

s.logger.Info("starting API server...")
return tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
if enableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, tmCfg)
} else {
errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, tmCfg)
}
}(cfg.EnableUnsafeCORS)
Comment on lines +121 to +130

Check notice

Code scanning / CodeQL

Spawning a Go routine

Spawning a Go routine may be a possible source of non-determinism

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the API server.
s.logger.Info("stopping API server...", "address", cfg.Address)
return s.Close()

case err := <-errCh:
s.logger.Error("failed to start API server", "err", err)
return err
}
}

// Close closes the API server.
Expand Down
24 changes: 17 additions & 7 deletions server/grpc/grpc_web.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
package grpc

import (
"context"
"fmt"
"net/http"
"time"

"github.com/improbable-eng/grpc-web/go/grpcweb"
"github.com/tendermint/tendermint/libs/log"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/server/config"
"github.com/cosmos/cosmos-sdk/server/types"
)

// StartGRPCWeb starts a gRPC-Web server on the given address.
func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, error) {
func StartGRPCWeb(ctx context.Context, logger log.Logger, grpcSrv *grpc.Server, config config.GRPCWebConfig) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok breaking the API in a point release?

var options []grpcweb.Option
if config.GRPCWeb.EnableUnsafeCORS {
if config.EnableUnsafeCORS {
options = append(options,
grpcweb.WithOriginFunc(func(origin string) bool {
return true
Expand All @@ -25,22 +26,31 @@ func StartGRPCWeb(grpcSrv *grpc.Server, config config.Config) (*http.Server, err

wrappedServer := grpcweb.WrapServer(grpcSrv, options...)
grpcWebSrv := &http.Server{
Addr: config.GRPCWeb.Address,
Addr: config.Address,
Handler: wrappedServer,
ReadHeaderTimeout: 500 * time.Millisecond,
}

errCh := make(chan error)
go func() {
logger.Info("starting gRPC web server...", "address", config.Address)
if err := grpcWebSrv.ListenAndServe(); err != nil {
errCh <- fmt.Errorf("[grpc] failed to serve: %w", err)
}
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC-web server.
logger.Info("stopping gRPC web server...", "address", config.Address)
grpcWebSrv.Close()
return nil

case err := <-errCh:
return nil, err
case <-time.After(types.ServerStartTime): // assume server started successfully
return grpcWebSrv, nil
logger.Error("failed to start gRPC Web server", "err", err)
return err
}
}
49 changes: 35 additions & 14 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package grpc

import (
"context"
"fmt"
"net"
"time"

"github.com/tendermint/tendermint/libs/log"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -16,8 +17,9 @@ import (
sdk "github.com/cosmos/cosmos-sdk/types"
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -45,39 +47,58 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() {
modes[m.String()] = (int32)(m)
}

return modes
}(),
ChainID: clientCtx.ChainID,
SdkConfig: sdk.GetConfig(),
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to register reflection service: %w", err)
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return grpcSrv, nil
}

// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
}

errCh := make(chan error)

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
logger.Info("starting gRPC server...", "address", cfg.Address)
errCh <- grpcSrv.Serve(listener)
}()

// Start a blocking select to wait for an indication to stop the server or that
// the server failed to start properly.
select {
case err := <-errCh:
return nil, err
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC server.
logger.Info("stopping gRPC server...", "address", cfg.Address)
grpcSrv.GracefulStop()

return nil

case <-time.After(types.ServerStartTime):
// assume server started successfully
return grpcSrv, nil
case err := <-errCh:
logger.Error("failed to start gRPC server", "err", err)
return err
}
}
Loading