Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: cleanup server logic #15041

Merged
merged 19 commits into from
Feb 22, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ Ref: https://keepachangelog.com/en/1.0.0/

### Improvements

* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Remove unnecessary sleeps from gRPC and API server initiation. The servers will start and accept requests as soon as they're ready.
* (cli) [#14659](https://github.com/cosmos/cosmos-sdk/pull/14659) Added ability to query blocks by either height/hash `simd q block --type=height|hash <height|hash>`.
* (store) [#14410](https://github.com/cosmos/cosmos-sdk/pull/14410) `rootmulti.Store.loadVersion` has validation to check if all the module stores' height is correct, it will error if any module store has incorrect height.
* (x/evidence) [#14757](https://github.com/cosmos/cosmos-sdk/pull/14757) Evidence messages do not need to implement a `.Type()` anymore.
Expand Down Expand Up @@ -177,6 +178,11 @@ Ref: https://keepachangelog.com/en/1.0.0/

### API Breaking Changes

* (server) [#15041](https://github.com/cosmos/cosmos-sdk/pull/15041) Refactor how gRPC and API servers are started to remove unnecessary sleeps:
* Remove `ServerStartTime` constant.
* Rename `WaitForQuitSignals` to `ListenForQuitSignals`. Note, this function is no longer blocking. Thus the caller is expected to provide a `context.CancelFunc` which indicates that when a signal is caught, that any spawned processes can gracefully exit.
* `api.Server#Start` now accepts a `context.Context`. The caller is responsible for ensuring that the context is canceled such that the API server can gracefully exit. The caller does not need to stop the server.
* To start the gRPC server you must first create the server via `NewGRPCServer`, after which you can start the gRPC server via `StartGRPCServer` which accepts a `context.Context`. The caller is responsible for ensuring that the context is canceled such that the gRPC server can gracefully exit. The caller does not need to stop the server.
* (testutil) [#14991](https://github.com/cosmos/cosmos-sdk/pull/14991) The `testutil/testdata_pulsar` package has moved to `testutil/testdata/testpb`.
* (simapp) [#14977](https://github.com/cosmos/cosmos-sdk/pull/14977) Move simulation helpers functions (`AppStateFn` and `AppStateRandomizedFn`) to `testutil/sims`. These takes an extra genesisState argument which is the default state of the app.
* (x/gov) [#14720](https://github.com/cosmos/cosmos-sdk/pull/14720) Add an expedited field in the gov v1 proposal and `MsgNewMsgProposal`.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ require (
github.com/magiconair/properties v1.8.7
github.com/manifoldco/promptui v0.9.0
github.com/mattn/go-isatty v0.0.17
github.com/neilotoole/errgroup v0.1.6
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.14.0
github.com/prometheus/common v0.39.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,8 @@ github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzE
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/neilotoole/errgroup v0.1.6 h1:PODGqPXdT5BC/zCYIMoTrwV+ujKcW+gBXM6Ye9Ve3R8=
github.com/neilotoole/errgroup v0.1.6/go.mod h1:Q2nLGf+594h0CLBs/Mbg6qOr7GtqDK7C2S41udRnToE=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
Expand Down
54 changes: 41 additions & 13 deletions server/api/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package api

import (
"context"
"fmt"
"net"
"net/http"
Expand Down Expand Up @@ -29,11 +30,10 @@ type Server struct {
Router *mux.Router
GRPCGatewayRouter *runtime.ServeMux
ClientCtx client.Context
GRPCSrv *grpc.Server
logger log.Logger
metrics *telemetry.Metrics

GRPCSrv *grpc.Server

logger log.Logger
metrics *telemetry.Metrics
// Start() is blocking and generally called from a separate goroutine.
// Close() can be called asynchronously and access shared memory
// via the listener. Therefore, we sync access to Start and Close with
Expand All @@ -51,6 +51,7 @@ func CustomGRPCHeaderMatcher(key string) (string, bool) {
switch strings.ToLower(key) {
case grpctypes.GRPCBlockHeightHeader:
return grpctypes.GRPCBlockHeightHeader, true

default:
return runtime.DefaultHeaderMatcher(key)
}
Expand Down Expand Up @@ -88,9 +89,12 @@ func New(clientCtx client.Context, logger log.Logger, grpcSrv *grpc.Server) *Ser

// Start starts the API server. Internally, the API server leverages CometBFT's
// JSON RPC server. Configuration options are provided via config.APIConfig
// and are delegated to the CometBFT JSON RPC server. The process is
// non-blocking, so an external signal handler must be used.
func (s *Server) Start(cfg config.Config) error {
// and are delegated to the CometBFT JSON RPC server.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func (s *Server) Start(ctx context.Context, cfg config.Config) error {
s.mtx.Lock()

cmtCfg := tmrpcserver.DefaultConfig()
Expand Down Expand Up @@ -134,13 +138,37 @@ func (s *Server) Start(cfg config.Config) error {
// register grpc-gateway routes (after grpc-web server as the first match is used)
s.Router.PathPrefix("/").Handler(s.GRPCGatewayRouter)

s.logger.Info("starting API server...")
if cfg.API.EnableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
return tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, cmtCfg)
}
errCh := make(chan error, 1)

// Start the API in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func(enableUnsafeCORS bool) {
s.logger.Info("starting API server...", "address", cfg.API.Address)

return tmrpcserver.Serve(s.listener, s.Router, s.logger, cmtCfg)
if enableUnsafeCORS {
allowAllCORS := handlers.CORS(handlers.AllowedHeaders([]string{"Content-Type"}))
errCh <- tmrpcserver.Serve(s.listener, allowAllCORS(s.Router), s.logger, cmtCfg)
} else {
errCh <- tmrpcserver.Serve(s.listener, s.Router, s.logger, cmtCfg)
}
}(cfg.API.EnableUnsafeCORS)
Comment on lines +146 to +155

Check notice

Code scanning / CodeQL

Spawning a Go routine

Spawning a Go routine may be a possible source of non-determinism

// Start a blocking loop to wait for an indication to stop the server or that
// the server failed to start properly.
for {
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the API server.
s.logger.Info("stopping API server...", "address", cfg.API.Address)
return s.Close()

case err := <-errCh:
s.logger.Error("failed to start API server", "err", err)
return err
}
}
}
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

// Close closes the API server.
Expand Down
55 changes: 39 additions & 16 deletions server/grpc/server.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package grpc

import (
"context"
"fmt"
"net"
"time"

"cosmossdk.io/log"
"google.golang.org/grpc"

"github.com/cosmos/cosmos-sdk/client"
Expand All @@ -17,8 +18,9 @@ import (
_ "github.com/cosmos/cosmos-sdk/types/tx/amino" // Import amino.proto file for reflection
)

// StartGRPCServer starts a gRPC server on the given address.
func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
// NewGRPCServer returns a correctly configured and initialized gRPC server.
// Note, the caller is responsible for starting the server. See StartGRPCServer.
func NewGRPCServer(clientCtx client.Context, app types.Application, cfg config.GRPCConfig) (*grpc.Server, error) {
maxSendMsgSize := cfg.MaxSendMsgSize
if maxSendMsgSize == 0 {
maxSendMsgSize = config.DefaultGRPCMaxSendMsgSize
Expand Down Expand Up @@ -46,39 +48,60 @@ func StartGRPCServer(clientCtx client.Context, app types.Application, cfg config
for _, m := range clientCtx.TxConfig.SignModeHandler().Modes() {
modes[m.String()] = (int32)(m)
}

return modes
}(),
ChainID: clientCtx.ChainID,
SdkConfig: sdk.GetConfig(),
InterfaceRegistry: clientCtx.InterfaceRegistry,
})
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to register reflection service: %w", err)
}

// Reflection allows external clients to see what services and methods
// the gRPC server exposes.
gogoreflection.Register(grpcSrv)

return grpcSrv, nil
}

// StartGRPCServer starts the provided gRPC server on the address specified in cfg.
//
// Note, this creates a blocking process if the server is started successfully.
// Otherwise, an error is returned. The caller is expected to provide a Context
// that is properly canceled or closed to indicate the server should be stopped.
func StartGRPCServer(ctx context.Context, logger log.Logger, cfg config.GRPCConfig, grpcSrv *grpc.Server) error {
listener, err := net.Listen("tcp", cfg.Address)
if err != nil {
return nil, err
return fmt.Errorf("failed to listen on address %s: %w", cfg.Address, err)
}

errCh := make(chan error)
errCh := make(chan error, 1)
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

// Start the gRPC in an external goroutine as Serve is blocking and will return
// an error upon failure, which we'll send on the error channel that will be
// consumed by the for block below.
go func() {
err = grpcSrv.Serve(listener)
if err != nil {
errCh <- fmt.Errorf("failed to serve: %w", err)
}
logger.Info("starting gRPC server...", "address", cfg.Address)
errCh <- grpcSrv.Serve(listener)
}()

select {
case err := <-errCh:
return nil, err
// Start a blocking loop to wait for an indication to stop the server or that
// the server failed to start properly.
for {
select {
case <-ctx.Done():
// The calling process cancelled or closed the provided context, so we must
// gracefully stop the gRPC server.
logger.Info("stopping gRPC server...", "address", cfg.Address)
grpcSrv.GracefulStop()
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved

return nil

case <-time.After(types.ServerStartTime):
// assume server started successfully
return grpcSrv, nil
case err := <-errCh:
logger.Error("failed to start gRPC server", "err", err)
return err
}
alexanderbez marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading