From 2af514ab0e52bfd5e801d723decba93bca1c5a64 Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 4 Oct 2023 16:22:39 -0300 Subject: [PATCH 01/10] WIP: WS subscriptions --- jsonrpc/endpoints_eth.go | 6 ++++++ jsonrpc/mocks/mock_state.go | 10 +++++----- jsonrpc/server.go | 5 ++++- jsonrpc/server_test.go | 2 +- jsonrpc/types/interfaces.go | 2 +- state/l2block.go | 31 +++++++++++++++++++++++-------- state/pgstatestorage.go | 5 ++++- state/state.go | 4 +++- 8 files changed, 47 insertions(+), 18 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 9139a98bf4..75b28746fc 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -8,6 +8,7 @@ import ( "math/big" "net/http" "strings" + "time" "github.com/0xPolygonHermez/zkevm-node/hex" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/client" @@ -1048,6 +1049,8 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error { // onNewL2Block is triggered when the state triggers the event for a new l2 block func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { + log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + start := time.Now() blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn() if err != nil { log.Errorf("failed to get all block filters with web sockets connections: %v", err) @@ -1061,7 +1064,9 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { } } } + log.Debugf("[onNewL2Block] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) + start = time.Now() logFilters, err := e.storage.GetAllLogFiltersWithWSConn() if err != nil { log.Errorf("failed to get all log filters with web sockets connections: %v", err) @@ -1081,6 +1086,7 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { } } } + log.Debugf("[onNewL2Block] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) } func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) { diff --git a/jsonrpc/mocks/mock_state.go b/jsonrpc/mocks/mock_state.go index 73ed569a26..7bda9d1a38 100644 --- a/jsonrpc/mocks/mock_state.go +++ b/jsonrpc/mocks/mock_state.go @@ -976,11 +976,6 @@ func (_m *StateMock) IsL2BlockVirtualized(ctx context.Context, blockNumber uint6 return r0, r1 } -// PrepareWebSocket provides a mock function with given fields: -func (_m *StateMock) PrepareWebSocket() { - _m.Called() -} - // ProcessUnsignedTransaction provides a mock function with given fields: ctx, tx, senderAddress, l2BlockNumber, noZKEVMCounters, dbTx func (_m *StateMock) ProcessUnsignedTransaction(ctx context.Context, tx *coretypes.Transaction, senderAddress common.Address, l2BlockNumber *uint64, noZKEVMCounters bool, dbTx pgx.Tx) (*runtime.ExecutionResult, error) { ret := _m.Called(ctx, tx, senderAddress, l2BlockNumber, noZKEVMCounters, dbTx) @@ -1012,6 +1007,11 @@ func (_m *StateMock) RegisterNewL2BlockEventHandler(h state.NewL2BlockEventHandl _m.Called(h) } +// StartToMonitorNewL2Blocks provides a mock function with given fields: +func (_m *StateMock) StartToMonitorNewL2Blocks() { + _m.Called() +} + type mockConstructorTestingTNewStateMock interface { mock.TestingT Cleanup(func()) diff --git a/jsonrpc/server.go b/jsonrpc/server.go index f3459b062d..926f748d24 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -75,7 +75,10 @@ func NewServer( storage storageInterface, services []Service, ) *Server { - s.PrepareWebSocket() + if cfg.WebSockets.Enabled { + s.StartToMonitorNewL2Blocks() + } + handler := newJSONRpcHandler() for _, service := range services { diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 4cce938e20..7a9f78b987 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -59,7 +59,7 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e var newL2BlockEventHandler state.NewL2BlockEventHandler = func(e state.NewL2BlockEvent) {} st.On("RegisterNewL2BlockEventHandler", mock.IsType(newL2BlockEventHandler)).Once() - st.On("PrepareWebSocket").Once() + st.On("StartToMonitorNewL2Blocks").Once() services := []Service{} if _, ok := apis[APIEth]; ok { diff --git a/jsonrpc/types/interfaces.go b/jsonrpc/types/interfaces.go index 6ce14137a1..ccc411e219 100644 --- a/jsonrpc/types/interfaces.go +++ b/jsonrpc/types/interfaces.go @@ -26,7 +26,7 @@ type PoolInterface interface { // StateInterface gathers the methods required to interact with the state. type StateInterface interface { - PrepareWebSocket() + StartToMonitorNewL2Blocks() BeginStateTransaction(ctx context.Context) (pgx.Tx, error) DebugTransaction(ctx context.Context, transactionHash common.Hash, traceConfig state.TraceConfig, dbTx pgx.Tx) (*runtime.ExecutionResult, error) EstimateGas(transaction *types.Transaction, senderAddress common.Address, l2BlockNumber *uint64, dbTx pgx.Tx) (uint64, []byte, error) diff --git a/state/l2block.go b/state/l2block.go index a4d4824ab3..e4d4141c40 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -21,8 +21,12 @@ type NewL2BlockEvent struct { Block types.Block } -// PrepareWebSocket allows the RPC to prepare ws -func (s *State) PrepareWebSocket() { +// StartToMonitorNewL2Blocks starts 2 go routines that will +// monitor new blocks and execute handlers registered to be executed +// when a new l2 block is detected. This is used by the RPC WebSocket +// filter subscription but can be used by any other component that +// needs to react to a new L2 block added to the state. +func (s *State) StartToMonitorNewL2Blocks() { lastL2Block, err := s.GetLastL2Block(context.Background(), nil) if errors.Is(err, ErrStateNotSynchronized) { lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)}) @@ -43,6 +47,7 @@ func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) { func (s *State) handleEvents() { for newL2BlockEvent := range s.newL2BlockEvents { + log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) if len(s.newL2BlockEventHandlers) == 0 { continue } @@ -50,15 +55,18 @@ func (s *State) handleEvents() { wg := sync.WaitGroup{} for _, handler := range s.newL2BlockEventHandlers { wg.Add(1) - go func(h NewL2BlockEventHandler) { + go func(h NewL2BlockEventHandler, e NewL2BlockEvent) { defer func() { wg.Done() if r := recover(); r != nil { log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r) } }() - h(newL2BlockEvent) - }(handler) + log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) + start := time.Now() + h(e) + log.Debugf("[handleEvents] new l2 block event handler for block %v took %vms to be executed", e.Block.NumberU64(), time.Since(start).Milliseconds()) + }(handler, newL2BlockEvent) } wg.Wait() } @@ -91,17 +99,24 @@ func (s *State) monitorNewL2Blocks() { continue } - for bn := s.lastL2BlockSeen.NumberU64() + uint64(1); bn <= lastL2Block.NumberU64(); bn++ { + fromBlockNumber := s.lastL2BlockSeen.NumberU64() + uint64(1) + toBlockNumber := lastL2Block.NumberU64() + log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) + + for bn := fromBlockNumber; bn <= toBlockNumber; bn++ { block, err := s.GetL2BlockByNumber(context.Background(), bn, nil) if err != nil { - log.Errorf("failed to l2 block while monitoring new blocks: %v", err) + log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) break } + log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64()) + start := time.Now() s.newL2BlockEvents <- NewL2BlockEvent{ Block: *block, } - log.Infof("new l2 blocks detected, Number %v, Hash %v", block.NumberU64(), block.Hash().String()) + log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %vms to be sent", block.NumberU64(), time.Since(start).Milliseconds()) + log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) s.lastL2BlockSeen = *block } diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index a97c0c9e7e..10888b2b20 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -9,6 +9,7 @@ import ( "time" "github.com/0xPolygonHermez/zkevm-node/hex" + "github.com/0xPolygonHermez/zkevm-node/log" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" @@ -1460,6 +1461,8 @@ func scanLogs(rows pgx.Rows) ([]*types.Log, error) { // AddL2Block adds a new L2 block to the State Store func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2Block *types.Block, receipts []*types.Receipt, effectivePercentage uint8, dbTx pgx.Tx) error { + log.Debugf("[AddL2Block] adding l2 block: %v", l2Block.NumberU64()) + start := time.Now() e := p.getExecQuerier(dbTx) const addTransactionSQL = "INSERT INTO state.transaction (hash, encoded, decoded, l2_block_num, effective_percentage) VALUES($1, $2, $3, $4, $5)" @@ -1523,7 +1526,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2 } } } - + log.Debugf("[AddL2Block] l2 block %v took %vms to be added", l2Block.NumberU64(), time.Since(start).Milliseconds()) return nil } diff --git a/state/state.go b/state/state.go index 487c19f726..46f8940f64 100644 --- a/state/state.go +++ b/state/state.go @@ -15,6 +15,8 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +const newL2BlockEventBufferSize = 1000 + var ( // ZeroHash is the hash 0x0000000000000000000000000000000000000000000000000000000000000000 ZeroHash = common.Hash{} @@ -48,7 +50,7 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient executor.Exec executorClient: executorClient, tree: stateTree, eventLog: eventLog, - newL2BlockEvents: make(chan NewL2BlockEvent), + newL2BlockEvents: make(chan NewL2BlockEvent, newL2BlockEventBufferSize), newL2BlockEventHandlers: []NewL2BlockEventHandler{}, } From d3f756e03605acf599831618718fb3cc242f6898 Mon Sep 17 00:00:00 2001 From: tclemos Date: Mon, 9 Oct 2023 13:28:25 -0300 Subject: [PATCH 02/10] WS subscription improvements --- jsonrpc/endpoints_eth.go | 102 ++++++++++++++++++++++++++++++++------- state/l2block.go | 11 +++-- state/state.go | 5 +- 3 files changed, 93 insertions(+), 25 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 75b28746fc..e6b7490220 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -8,6 +8,7 @@ import ( "math/big" "net/http" "strings" + "sync" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -905,7 +906,6 @@ func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, types.Er if err != nil { return RPCErrorResponse(types.InvalidParamsErrorCode, "invalid tx input", err, false) } - log.Infof("adding TX to the pool: %v", tx.Hash().Hex()) if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil { // it's not needed to log the error here, because we check and log if needed @@ -1050,68 +1050,134 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error { // onNewL2Block is triggered when the state triggers the event for a new l2 block func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + wg := sync.WaitGroup{} + wg.Add(2) + go e.notifyNewHeads(&wg, event) + go e.notifyNewLogs(&wg, event) + wg.Wait() +} + +func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) { + defer wg.Done() start := time.Now() blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn() if err != nil { log.Errorf("failed to get all block filters with web sockets connections: %v", err) } else { + b, err := types.NewBlock(&event.Block, nil, false, false) + if err != nil { + log.Errorf("failed to build block response to subscription: %v", err) + return + } + data, err := json.Marshal(b) + if err != nil { + log.Errorf("failed to marshal block response to subscription: %v", err) + } for _, filter := range blockFilters { - b, err := types.NewBlock(&event.Block, nil, false, false) - if err != nil { - log.Errorf("failed to build block response to subscription: %v", err) - } else { - e.sendSubscriptionResponse(filter, b) - } + e.sendSubscriptionResponse(filter, data) } } - log.Debugf("[onNewL2Block] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) + log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) +} - start = time.Now() +func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) { + defer wg.Done() + start := time.Now() logFilters, err := e.storage.GetAllLogFiltersWithWSConn() if err != nil { log.Errorf("failed to get all log filters with web sockets connections: %v", err) } else { for _, filter := range logFilters { - changes, err := e.GetFilterChanges(filter.ID) + filterParameters := filter.Parameters.(LogFilter) + bn := types.BlockNumber(event.Block.NumberU64()) + + // if from and to blocks are new, set it to the current block to make + // the query faster + if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { + filterParameters.FromBlock = &bn + filterParameters.ToBlock = &bn + } else { + // if the filter has a fromBlock value set + // and the event block number is smaller than the + // from block, skip this filter + if filterParameters.FromBlock != nil { + fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf(rpcErr.Error(), filter.ID, err) + continue + } + if fromBlock > event.Block.NumberU64() { + continue + } + // otherwise set the from block to a fixed number + // to avoid querying it again in the next step + fixedFromBlock := types.BlockNumber(fromBlock) + filterParameters.FromBlock = &fixedFromBlock + } + + // if the filter has a toBlock value set + // and the event block number is greater than the + // to block, skip this filter + if filterParameters.ToBlock != nil { + toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf(rpcErr.Error(), filter.ID, err) + continue + } + if toBlock > event.Block.NumberU64() { + continue + } + // otherwise set the to block to a fixed number + // to avoid querying it again in the next step + fixedToBlock := types.BlockNumber(toBlock) + filterParameters.ToBlock = &fixedToBlock + } + } + + // get new logs for this specific filter + changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) if err != nil { log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err) continue } + // if there are new logs for the filter, send it if changes != nil { ethLogs := changes.([]types.Log) for _, ethLog := range ethLogs { - e.sendSubscriptionResponse(filter, ethLog) + data, err := json.Marshal(ethLog) + if err != nil { + log.Errorf("failed to marshal ethLog response to subscription: %v", err) + } + e.sendSubscriptionResponse(filter, data) } } } } - log.Debugf("[onNewL2Block] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) + log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) } -func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) { +func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) { const errMessage = "Unable to write WS message to filter %v, %s" - result, err := json.Marshal(data) - if err != nil { - log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) - } res := types.SubscriptionResponse{ JSONRPC: "2.0", Method: "eth_subscription", Params: types.SubscriptionResponseParams{ Subscription: filter.ID, - Result: result, + Result: data, }, } message, err := json.Marshal(res) if err != nil { log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) + return } err = filter.WsConn.WriteMessage(websocket.TextMessage, message) if err != nil { log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) + return } log.Debugf("WS message sent: %v", string(message)) } diff --git a/state/l2block.go b/state/l2block.go index e4d4141c40..f2ed7ba3d1 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -33,7 +33,7 @@ func (s *State) StartToMonitorNewL2Blocks() { } else if err != nil { log.Fatalf("failed to load the last l2 block: %v", err) } - s.lastL2BlockSeen = *lastL2Block + s.lastL2BlockSeen.Store(lastL2Block) go s.monitorNewL2Blocks() go s.handleEvents() } @@ -93,13 +93,15 @@ func (s *State) monitorNewL2Blocks() { continue } + lastL2BlockSeen := s.lastL2BlockSeen.Load() + // not updates until now - if lastL2Block == nil || s.lastL2BlockSeen.NumberU64() >= lastL2Block.NumberU64() { + if lastL2Block == nil || lastL2BlockSeen.NumberU64() >= lastL2Block.NumberU64() { waitNextCycle() continue } - fromBlockNumber := s.lastL2BlockSeen.NumberU64() + uint64(1) + fromBlockNumber := lastL2BlockSeen.NumberU64() + uint64(1) toBlockNumber := lastL2Block.NumberU64() log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) @@ -109,7 +111,6 @@ func (s *State) monitorNewL2Blocks() { log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) break } - log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64()) start := time.Now() s.newL2BlockEvents <- NewL2BlockEvent{ @@ -117,7 +118,7 @@ func (s *State) monitorNewL2Blocks() { } log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %vms to be sent", block.NumberU64(), time.Since(start).Milliseconds()) log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) - s.lastL2BlockSeen = *block + s.lastL2BlockSeen.Store(block) } // interval to check for new l2 blocks diff --git a/state/state.go b/state/state.go index 46f8940f64..4ae621ddae 100644 --- a/state/state.go +++ b/state/state.go @@ -4,6 +4,7 @@ import ( "context" "math/big" "sync" + "sync/atomic" "github.com/0xPolygonHermez/zkevm-node/event" "github.com/0xPolygonHermez/zkevm-node/merkletree" @@ -32,7 +33,7 @@ type State struct { tree *merkletree.StateTree eventLog *event.EventLog - lastL2BlockSeen types.Block + lastL2BlockSeen atomic.Pointer[types.Block] newL2BlockEvents chan NewL2BlockEvent newL2BlockEventHandlers []NewL2BlockEventHandler } @@ -50,7 +51,7 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient executor.Exec executorClient: executorClient, tree: stateTree, eventLog: eventLog, - newL2BlockEvents: make(chan NewL2BlockEvent, newL2BlockEventBufferSize), + newL2BlockEvents: make(chan NewL2BlockEvent), newL2BlockEventHandlers: []NewL2BlockEventHandler{}, } From 2b5cb2db7f310c96330c8cb03421061349f71329 Mon Sep 17 00:00:00 2001 From: tclemos Date: Mon, 9 Oct 2023 13:36:24 -0300 Subject: [PATCH 03/10] make new l2 block event channel buffered --- state/state.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/state/state.go b/state/state.go index 4ae621ddae..47177459a1 100644 --- a/state/state.go +++ b/state/state.go @@ -16,7 +16,7 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) -const newL2BlockEventBufferSize = 1000 +const newL2BlockEventBufferSize = 500 var ( // ZeroHash is the hash 0x0000000000000000000000000000000000000000000000000000000000000000 @@ -51,7 +51,7 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient executor.Exec executorClient: executorClient, tree: stateTree, eventLog: eventLog, - newL2BlockEvents: make(chan NewL2BlockEvent), + newL2BlockEvents: make(chan NewL2BlockEvent, newL2BlockEventBufferSize), newL2BlockEventHandlers: []NewL2BlockEventHandler{}, } From d6472eeff8b10ae11400a644fe1a0d4c40168d65 Mon Sep 17 00:00:00 2001 From: tclemos Date: Mon, 9 Oct 2023 14:25:25 -0300 Subject: [PATCH 04/10] fix linter error --- jsonrpc/endpoints_eth.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 294f1d2135..a12dab1187 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1084,9 +1084,13 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error { func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) wg := sync.WaitGroup{} - wg.Add(2) + + wg.Add(1) go e.notifyNewHeads(&wg, event) + + wg.Add(1) go e.notifyNewLogs(&wg, event) + wg.Wait() } From 788d15df3be9e2f2fd78302b2728ec1c75c91165 Mon Sep 17 00:00:00 2001 From: tclemos Date: Mon, 9 Oct 2023 14:28:30 -0300 Subject: [PATCH 05/10] allow test to set executor and mt URIs via env var, required to run tests with remote servers --- test/docker-compose.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/test/docker-compose.yml b/test/docker-compose.yml index dd7488a20f..a00598dd0e 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -56,6 +56,8 @@ services: environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_POOL_DB_HOST=zkevm-pool-db + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json @@ -72,6 +74,8 @@ services: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_POOL_DB_HOST=zkevm-pool-db - ZKEVM_NODE_SEQUENCER_SENDER_ADDRESS=0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266 + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./sequencer.keystore:/pk/sequencer.keystore - ./config/test.node.config.toml:/app/config.toml @@ -91,6 +95,8 @@ services: environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_POOL_DB_HOST=zkevm-pool-db + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json @@ -123,6 +129,8 @@ services: - 9095:9091 # needed if metrics enabled environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json From bf4c2f0c317ac3812f08c7d062c523fd6921d665 Mon Sep 17 00:00:00 2001 From: tclemos Date: Mon, 9 Oct 2023 15:54:09 -0300 Subject: [PATCH 06/10] fix jRPC unit tests and default log instance race condition --- jsonrpc/server_test.go | 6 ++++++ log/log.go | 14 ++++++++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 3f21f4ee43..601171925d 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -154,6 +154,12 @@ func getSequencerDefaultConfig() Config { BatchRequestsEnabled: true, MaxLogsCount: 10000, MaxLogsBlockRange: 10000, + WebSockets: WebSocketsConfig{ + Enabled: true, + Host: "0.0.0.0", + Port: 9133, + ReadLimit: 0, + }, } return cfg } diff --git a/log/log.go b/log/log.go index fcc8e479ba..7c93468769 100644 --- a/log/log.go +++ b/log/log.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strings" + "sync/atomic" "github.com/0xPolygonHermez/zkevm-node" "github.com/hermeznetwork/tracerr" @@ -27,11 +28,12 @@ type Logger struct { } // root logger -var log *Logger +var log atomic.Pointer[Logger] func getDefaultLog() *Logger { - if log != nil { - return log + l := log.Load() + if l != nil { + return l } // default level: debug zapLogger, _, err := NewLogger(Config{ @@ -42,8 +44,8 @@ func getDefaultLog() *Logger { if err != nil { panic(err) } - log = &Logger{x: zapLogger} - return log + log.Store(&Logger{x: zapLogger}) + return log.Load() } // Init the logger with defined level. outputs defines the outputs where the @@ -56,7 +58,7 @@ func Init(cfg Config) { if err != nil { panic(err) } - log = &Logger{x: zapLogger} + log.Store(&Logger{x: zapLogger}) } // NewLogger creates the logger with defined level. outputs defines the outputs where the From d37008eaed5b37af0b60b128f0c79a35402d99ba Mon Sep 17 00:00:00 2001 From: tclemos Date: Tue, 10 Oct 2023 19:48:01 -0300 Subject: [PATCH 07/10] fix wsConn race condition; add unit tests to eth_subscribe --- jsonrpc/endpoints_eth.go | 13 +-- jsonrpc/endpoints_eth_test.go | 177 ++++++++++++++++++++++++++++++++++ jsonrpc/handler.go | 8 +- jsonrpc/interfaces.go | 10 +- jsonrpc/mock_storage.go | 30 +++--- jsonrpc/query.go | 3 +- jsonrpc/server.go | 33 +++---- jsonrpc/server_test.go | 25 +++-- jsonrpc/storage.go | 11 ++- 9 files changed, 252 insertions(+), 58 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index a12dab1187..02b7294e28 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -9,6 +9,7 @@ import ( "net/http" "strings" "sync" + "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -824,7 +825,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) { } // internal -func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, types.Error) { +func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) { id, err := e.storage.NewBlockFilter(wsConn) if err != nil { return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err, true) @@ -843,7 +844,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) { } // internal -func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *websocket.Conn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) { +func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *atomic.Pointer[websocket.Conn], filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) { shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil if shouldFilterByBlockRange { @@ -885,7 +886,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error) } // internal -func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (interface{}, types.Error) { +func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) { return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet") // id, err := e.storage.NewPendingTransactionFilter(wsConn) // if err != nil { @@ -1048,7 +1049,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error { // The node will return a subscription id. // For each event that matches the subscription a notification with relevant // data is sent together with the subscription id. -func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter *LogFilter) (interface{}, types.Error) { +func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) { switch name { case "newHeads": return e.newBlockFilter(wsConn) @@ -1076,7 +1077,7 @@ func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (int // uninstallFilterByWSConn uninstalls the filters connected to the // provided web socket connection -func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error { +func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { return e.storage.UninstallFilterByWSConn(wsConn) } @@ -1220,7 +1221,7 @@ func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) { return } - err = filter.WsConn.WriteMessage(websocket.TextMessage, message) + err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message) if err != nil { log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) return diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 54d6292484..2fb38a3563 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "errors" + "fmt" "math/big" "strings" + "sync/atomic" "testing" "time" @@ -4885,3 +4887,178 @@ func TestGetFilterChanges(t *testing.T) { }) } } + +func TestSubscribeNewHeads(t *testing.T) { + s, m, _ := newSequencerMockedServer(t) + defer s.Stop() + + type testCase struct { + Name string + Channel chan *ethTypes.Header + ExpectedError interface{} + SetupMocks func(m *mocksWrapper, tc testCase) + } + + testCases := []testCase{ + { + Name: "Subscribe to new heads Successfully", + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.Storage. + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + Return("0x1", nil). + Once() + }, + }, + { + Name: "Subscribe fails to add filter to storage", + ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"), + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.Storage. + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + Return("", fmt.Errorf("failed to add filter to storage")). + Once() + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + tc := testCase + tc.SetupMocks(m, tc) + + c := s.GetWSClient() + + ctx := context.Background() + newHeadsChannel := make(chan *ethTypes.Header, 100) + sub, err := c.SubscribeNewHead(ctx, newHeadsChannel) + + if sub != nil { + assert.NotNil(t, sub) + } + + if err != nil || tc.ExpectedError != nil { + if expectedErr, ok := tc.ExpectedError.(*types.RPCError); ok { + rpcErr := err.(rpc.Error) + assert.Equal(t, expectedErr.ErrorCode(), rpcErr.ErrorCode()) + assert.Equal(t, expectedErr.Error(), rpcErr.Error()) + } else { + assert.Equal(t, tc.ExpectedError, err) + } + } + }) + } +} + +func TestSubscribeNewLogs(t *testing.T) { + s, m, _ := newSequencerMockedServer(t) + defer s.Stop() + + type testCase struct { + Name string + Filter ethereum.FilterQuery + Channel chan *ethTypes.Log + ExpectedError interface{} + Prepare func(t *testing.T, tc *testCase) + SetupMocks func(m *mocksWrapper, tc testCase) + } + + testCases := []testCase{ + { + Name: "Subscribe to new logs by block hash successfully", + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + BlockHash: &blockHash, + } + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Commit", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + + m.Storage. + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + Return("0x1", nil). + Once() + }, + }, + { + Name: "Subscribe to new logs fails to add new filter to storage", + ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new log filter"), + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + BlockHash: &blockHash, + } + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + + m.Storage. + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + Return("", fmt.Errorf("failed to add filter to storage")). + Once() + }, + }, + { + Name: "Subscribe to new logs fails due to max block range limit exceeded", + ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "logs are limited to a 10000 block range"), + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + FromBlock: big.NewInt(1), ToBlock: big.NewInt(10002), + } + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + tc := testCase + tc.Prepare(t, &tc) + tc.SetupMocks(m, tc) + + c := s.GetWSClient() + + ctx := context.Background() + newLogs := make(chan ethTypes.Log, 100) + sub, err := c.SubscribeFilterLogs(ctx, tc.Filter, newLogs) + + if sub != nil { + assert.NotNil(t, sub) + } + + if err != nil || tc.ExpectedError != nil { + if expectedErr, ok := tc.ExpectedError.(*types.RPCError); ok { + rpcErr := err.(rpc.Error) + assert.Equal(t, expectedErr.ErrorCode(), rpcErr.ErrorCode()) + assert.Equal(t, expectedErr.Error(), rpcErr.Error()) + } else { + assert.Equal(t, tc.ExpectedError, err) + } + } + }) + } +} diff --git a/jsonrpc/handler.go b/jsonrpc/handler.go index 68b839e58a..a38ee475cf 100644 --- a/jsonrpc/handler.go +++ b/jsonrpc/handler.go @@ -36,7 +36,7 @@ func (f *funcData) numParams() int { type handleRequest struct { types.Request - wsConn *websocket.Conn + wsConn *atomic.Pointer[websocket.Conn] HttpRequest *http.Request } @@ -101,7 +101,7 @@ func (h *Handler) Handle(req handleRequest) types.Response { firstFuncParamIsWebSocketConn := false firstFuncParamIsHttpRequest := false if funcHasMoreThanOneInputParams { - firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&websocket.Conn{})) + firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&atomic.Pointer[websocket.Conn]{})) firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{})) } if requestHasWebSocketConn && firstFuncParamIsWebSocketConn { @@ -151,7 +151,7 @@ func (h *Handler) Handle(req handleRequest) types.Response { } // HandleWs handle websocket requests -func (h *Handler) HandleWs(reqBody []byte, wsConn *websocket.Conn, httpReq *http.Request) ([]byte, error) { +func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn], httpReq *http.Request) ([]byte, error) { log.Debugf("WS message received: %v", string(reqBody)) var req types.Request if err := json.Unmarshal(reqBody, &req); err != nil { @@ -168,7 +168,7 @@ func (h *Handler) HandleWs(reqBody []byte, wsConn *websocket.Conn, httpReq *http } // RemoveFilterByWsConn uninstalls the filter attached to this websocket connection -func (h *Handler) RemoveFilterByWsConn(wsConn *websocket.Conn) { +func (h *Handler) RemoveFilterByWsConn(wsConn *atomic.Pointer[websocket.Conn]) { service, ok := h.serviceMap[APIEth] if !ok { return diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index f1fce40123..18d5249f80 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -1,6 +1,8 @@ package jsonrpc import ( + "sync/atomic" + "github.com/gorilla/websocket" ) @@ -9,10 +11,10 @@ type storageInterface interface { GetAllBlockFiltersWithWSConn() ([]*Filter, error) GetAllLogFiltersWithWSConn() ([]*Filter, error) GetFilter(filterID string) (*Filter, error) - NewBlockFilter(wsConn *websocket.Conn) (string, error) - NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) - NewPendingTransactionFilter(wsConn *websocket.Conn) (string, error) + NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) + NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) + NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) UninstallFilter(filterID string) error - UninstallFilterByWSConn(wsConn *websocket.Conn) error + UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error UpdateFilterLastPoll(filterID string) error } diff --git a/jsonrpc/mock_storage.go b/jsonrpc/mock_storage.go index 50e19658e8..57cff709ee 100644 --- a/jsonrpc/mock_storage.go +++ b/jsonrpc/mock_storage.go @@ -3,6 +3,8 @@ package jsonrpc import ( + atomic "sync/atomic" + websocket "github.com/gorilla/websocket" mock "github.com/stretchr/testify/mock" ) @@ -91,21 +93,21 @@ func (_m *storageMock) GetFilter(filterID string) (*Filter, error) { } // NewBlockFilter provides a mock function with given fields: wsConn -func (_m *storageMock) NewBlockFilter(wsConn *websocket.Conn) (string, error) { +func (_m *storageMock) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { ret := _m.Called(wsConn) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*websocket.Conn) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) (string, error)); ok { return rf(wsConn) } - if rf, ok := ret.Get(0).(func(*websocket.Conn) string); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) string); ok { r0 = rf(wsConn) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*websocket.Conn) error); ok { + if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn]) error); ok { r1 = rf(wsConn) } else { r1 = ret.Error(1) @@ -115,21 +117,21 @@ func (_m *storageMock) NewBlockFilter(wsConn *websocket.Conn) (string, error) { } // NewLogFilter provides a mock function with given fields: wsConn, filter -func (_m *storageMock) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) { +func (_m *storageMock) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) { ret := _m.Called(wsConn, filter) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*websocket.Conn, LogFilter) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn], LogFilter) (string, error)); ok { return rf(wsConn, filter) } - if rf, ok := ret.Get(0).(func(*websocket.Conn, LogFilter) string); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn], LogFilter) string); ok { r0 = rf(wsConn, filter) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*websocket.Conn, LogFilter) error); ok { + if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn], LogFilter) error); ok { r1 = rf(wsConn, filter) } else { r1 = ret.Error(1) @@ -139,21 +141,21 @@ func (_m *storageMock) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (s } // NewPendingTransactionFilter provides a mock function with given fields: wsConn -func (_m *storageMock) NewPendingTransactionFilter(wsConn *websocket.Conn) (string, error) { +func (_m *storageMock) NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { ret := _m.Called(wsConn) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*websocket.Conn) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) (string, error)); ok { return rf(wsConn) } - if rf, ok := ret.Get(0).(func(*websocket.Conn) string); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) string); ok { r0 = rf(wsConn) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*websocket.Conn) error); ok { + if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn]) error); ok { r1 = rf(wsConn) } else { r1 = ret.Error(1) @@ -177,11 +179,11 @@ func (_m *storageMock) UninstallFilter(filterID string) error { } // UninstallFilterByWSConn provides a mock function with given fields: wsConn -func (_m *storageMock) UninstallFilterByWSConn(wsConn *websocket.Conn) error { +func (_m *storageMock) UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { ret := _m.Called(wsConn) var r0 error - if rf, ok := ret.Get(0).(func(*websocket.Conn) error); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) error); ok { r0 = rf(wsConn) } else { r0 = ret.Error(0) diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 2cc375dd36..83f888d8a0 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -3,6 +3,7 @@ package jsonrpc import ( "encoding/json" "fmt" + "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -26,7 +27,7 @@ type Filter struct { Type FilterType Parameters interface{} LastPoll time.Time - WsConn *websocket.Conn + WsConn *atomic.Pointer[websocket.Conn] } // FilterType express the type of the filter, block, logs, pending transactions diff --git a/jsonrpc/server.go b/jsonrpc/server.go index 29ce507733..7830e930fa 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -10,7 +10,7 @@ import ( "mime" "net" "net/http" - "sync" + "sync/atomic" "syscall" "time" @@ -379,28 +379,29 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { s.wsUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // Upgrade the connection to a WS one - wsConn, err := s.wsUpgrader.Upgrade(w, req, nil) + innerWsConn, err := s.wsUpgrader.Upgrade(w, req, nil) if err != nil { log.Error(fmt.Sprintf("Unable to upgrade to a WS connection, %s", err.Error())) return } + wsConn := new(atomic.Pointer[websocket.Conn]) + wsConn.Store(innerWsConn) // Set read limit - wsConn.SetReadLimit(s.config.WebSockets.ReadLimit) + wsConn.Load().SetReadLimit(s.config.WebSockets.ReadLimit) // Defer WS closure - defer func(ws *websocket.Conn) { - err = ws.Close() + defer func(wsConn *atomic.Pointer[websocket.Conn]) { + err = wsConn.Load().Close() if err != nil { log.Error(fmt.Sprintf("Unable to gracefully close WS connection, %s", err.Error())) } }(wsConn) log.Info("Websocket connection established") - var mu sync.Mutex for { - msgType, message, err := wsConn.ReadMessage() + msgType, message, err := wsConn.Load().ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) { log.Info("Closing WS connection gracefully") @@ -417,17 +418,13 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { } if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { - go func() { - mu.Lock() - defer mu.Unlock() - resp, err := s.handler.HandleWs(message, wsConn, req) - if err != nil { - log.Error(fmt.Sprintf("Unable to handle WS request, %s", err.Error())) - _ = wsConn.WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error()))) - } else { - _ = wsConn.WriteMessage(msgType, resp) - } - }() + resp, err := s.handler.HandleWs(message, wsConn, req) + if err != nil { + log.Error(fmt.Sprintf("Unable to handle WS request, %s", err.Error())) + _ = wsConn.Load().WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error()))) + } else { + _ = wsConn.Load().WriteMessage(msgType, resp) + } } } } diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 601171925d..2bb5e1d72f 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -31,9 +31,10 @@ const ( ) type mockedServer struct { - Config Config - Server *Server - ServerURL string + Config Config + Server *Server + ServerURL string + ServerWebSocketsURL string } type mocksWrapper struct { @@ -128,10 +129,13 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e ethClient, err := ethclient.Dial(serverURL) require.NoError(t, err) + serverWebSocketsURL := fmt.Sprintf("ws://%s:%d", cfg.WebSockets.Host, cfg.WebSockets.Port) + msv := &mockedServer{ - Config: cfg, - Server: server, - ServerURL: serverURL, + Config: cfg, + Server: server, + ServerURL: serverURL, + ServerWebSocketsURL: serverWebSocketsURL, } mks := &mocksWrapper{ @@ -185,6 +189,15 @@ func newNonSequencerMockedServer(t *testing.T, sequencerNodeURI string) (*mocked return newMockedServer(t, cfg) } +func (s *mockedServer) GetWSClient() *ethclient.Client { + ethClient, err := ethclient.Dial(s.ServerWebSocketsURL) + if err != nil { + panic(err) + } + + return ethClient +} + func (s *mockedServer) Stop() { err := s.Server.Stop() if err != nil { diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index ada5f32ffe..8838c6fe08 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -31,7 +32,7 @@ func NewStorage() *Storage { } // NewLogFilter persists a new log filter -func (s *Storage) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) { +func (s *Storage) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) { shouldFilterByBlockHash := filter.BlockHash != nil shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil @@ -43,17 +44,17 @@ func (s *Storage) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string } // NewBlockFilter persists a new block log filter -func (s *Storage) NewBlockFilter(wsConn *websocket.Conn) (string, error) { +func (s *Storage) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { return s.createFilter(FilterTypeBlock, nil, wsConn) } // NewPendingTransactionFilter persists a new pending transaction filter -func (s *Storage) NewPendingTransactionFilter(wsConn *websocket.Conn) (string, error) { +func (s *Storage) NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { return s.createFilter(FilterTypePendingTx, nil, wsConn) } // create persists the filter to the memory and provides the filter id -func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *websocket.Conn) (string, error) { +func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *atomic.Pointer[websocket.Conn]) (string, error) { lastPoll := time.Now().UTC() id, err := s.generateFilterID() if err != nil { @@ -154,7 +155,7 @@ func (s *Storage) UninstallFilter(filterID string) error { } // UninstallFilterByWSConn deletes all filters connected to the provided web socket connection -func (s *Storage) UninstallFilterByWSConn(wsConn *websocket.Conn) error { +func (s *Storage) UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { filterIDsToDelete := []string{} s.filters.Range(func(key, value any) bool { id := key.(string) From 01f23c9da04f4894e881403d0fce31e84c242b32 Mon Sep 17 00:00:00 2001 From: tclemos Date: Tue, 10 Oct 2023 20:36:29 -0300 Subject: [PATCH 08/10] fix unit tests --- jsonrpc/endpoints_eth_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 2fb38a3563..d64b29a7ed 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -3630,7 +3630,7 @@ func TestNewFilter(t *testing.T) { Once() m.Storage. - On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). Return("1", nil). Once() }, @@ -3654,7 +3654,7 @@ func TestNewFilter(t *testing.T) { Once() m.Storage. - On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). Return("1", nil). Once() }, @@ -3717,7 +3717,7 @@ func TestNewFilter(t *testing.T) { Return(m.DbTx, nil). Once() m.Storage. - On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). Return("", errors.New("failed to add new filter")). Once() }, @@ -3768,7 +3768,7 @@ func TestNewBlockFilter(t *testing.T) { ExpectedError: nil, SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&websocket.Conn{})). + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). Return("1", nil). Once() }, @@ -3779,7 +3779,7 @@ func TestNewBlockFilter(t *testing.T) { ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"), SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&websocket.Conn{})). + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). Return("", errors.New("failed to add new block filter")). Once() }, @@ -3830,7 +3830,7 @@ func TestNewPendingTransactionFilter(t *testing.T) { // ExpectedError: nil, // SetupMocks: func(m *mocks, tc testCase) { // m.Storage. - // On("NewPendingTransactionFilter", mock.IsType(&websocket.Conn{})). + // On("NewPendingTransactionFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). // Return("1", nil). // Once() // }, @@ -3841,7 +3841,7 @@ func TestNewPendingTransactionFilter(t *testing.T) { // ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new pending transaction filter"), // SetupMocks: func(m *mocks, tc testCase) { // m.Storage. - // On("NewPendingTransactionFilter", mock.IsType(&websocket.Conn{})). + // On("NewPendingTransactionFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). // Return("", errors.New("failed to add new pending transaction filter")). // Once() // }, From d0980667d19205357c78891e81b45eb687429d27 Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 11 Oct 2023 08:37:22 -0300 Subject: [PATCH 09/10] add missing return --- jsonrpc/endpoints_eth.go | 1 + 1 file changed, 1 insertion(+) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 02b7294e28..1903307ba2 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1110,6 +1110,7 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block data, err := json.Marshal(b) if err != nil { log.Errorf("failed to marshal block response to subscription: %v", err) + return } for _, filter := range blockFilters { e.sendSubscriptionResponse(filter, data) From 43dcd698e34249af0e573a1adcd394b33a609e65 Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 11 Oct 2023 17:20:19 -0300 Subject: [PATCH 10/10] refactor connection counter log --- jsonrpc/handler.go | 8 ------- jsonrpc/server.go | 54 +++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 9 deletions(-) diff --git a/jsonrpc/handler.go b/jsonrpc/handler.go index a38ee475cf..ce57c8e98a 100644 --- a/jsonrpc/handler.go +++ b/jsonrpc/handler.go @@ -73,18 +73,10 @@ func newJSONRpcHandler() *Handler { return handler } -var connectionCounter int64 = 0 - // Handle is the function that knows which and how a function should // be executed when a JSON RPC request is received func (h *Handler) Handle(req handleRequest) types.Response { log := log.WithFields("method", req.Method, "requestId", req.ID) - atomic.AddInt64(&connectionCounter, 1) - defer func() { - atomic.AddInt64(&connectionCounter, -1) - log.Debugf("Current open connections %d", atomic.LoadInt64(&connectionCounter)) - }() - log.Debugf("Current open connections %d", atomic.LoadInt64(&connectionCounter)) log.Debugf("request params %v", string(req.Params)) service, fd, err := h.getFnHandler(req.Request) diff --git a/jsonrpc/server.go b/jsonrpc/server.go index 7830e930fa..008b34943d 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -10,6 +10,7 @@ import ( "mime" "net" "net/http" + "sync" "sync/atomic" "syscall" "time" @@ -51,6 +52,10 @@ type Server struct { srv *http.Server wsSrv *http.Server wsUpgrader websocket.Upgrader + + connCounterMutex sync.Mutex + httpConnCounter int64 + wsConnCounter int64 } // Service defines a struct that will provide public methods to be exposed @@ -240,6 +245,9 @@ func (s *Server) handle(w http.ResponseWriter, req *http.Request) { return } + s.increaseHttpConnCounter() + defer s.decreaseHttpConnCounter() + start := time.Now() w.Header().Set("Content-Type", contentType) w.Header().Set("Access-Control-Allow-Origin", "*") @@ -382,9 +390,9 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { innerWsConn, err := s.wsUpgrader.Upgrade(w, req, nil) if err != nil { log.Error(fmt.Sprintf("Unable to upgrade to a WS connection, %s", err.Error())) - return } + wsConn := new(atomic.Pointer[websocket.Conn]) wsConn.Store(innerWsConn) @@ -399,6 +407,15 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { } }(wsConn) + s.increaseWsConnCounter() + defer s.decreaseWsConnCounter() + + // recover + defer func() { + if err := recover(); err != nil { + log.Error(err) + } + }() log.Info("Websocket connection established") for { msgType, message, err := wsConn.Load().ReadMessage() @@ -429,6 +446,41 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { } } +func (s *Server) increaseHttpConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.httpConnCounter, 1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) decreaseHttpConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.httpConnCounter, -1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) increaseWsConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.wsConnCounter, 1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) decreaseWsConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.wsConnCounter, -1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) logConnCounters() { + httpConnCounter := atomic.LoadInt64(&s.httpConnCounter) + wsConnCounter := atomic.LoadInt64(&s.wsConnCounter) + totalConnCounter := httpConnCounter + wsConnCounter + log.Debugf("[ HTTP conns: %v | WS conns: %v | Total conns: %v ]", httpConnCounter, wsConnCounter, totalConnCounter) +} + func handleInvalidRequest(w http.ResponseWriter, err error, code int) { defer metrics.RequestHandled(metrics.RequestHandledLabelInvalid) log.Infof("Invalid Request: %v", err.Error())