diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 988cafd5af..1903307ba2 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -8,6 +8,9 @@ import ( "math/big" "net/http" "strings" + "sync" + "sync/atomic" + "time" "github.com/0xPolygonHermez/zkevm-node/hex" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/client" @@ -822,7 +825,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) { } // internal -func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, types.Error) { +func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) { id, err := e.storage.NewBlockFilter(wsConn) if err != nil { return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err, true) @@ -841,7 +844,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) { } // internal -func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *websocket.Conn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) { +func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *atomic.Pointer[websocket.Conn], filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) { shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil if shouldFilterByBlockRange { @@ -883,7 +886,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error) } // internal -func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (interface{}, types.Error) { +func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) { return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet") // id, err := e.storage.NewPendingTransactionFilter(wsConn) // if err != nil { @@ -935,7 +938,6 @@ func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, types.Er if err != nil { return RPCErrorResponse(types.InvalidParamsErrorCode, "invalid tx input", err, false) } - log.Infof("adding TX to the pool: %v", tx.Hash().Hex()) if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil { // it's not needed to log the error here, because we check and log if needed @@ -1047,7 +1049,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error { // The node will return a subscription id. // For each event that matches the subscription a notification with relevant // data is sent together with the subscription id. -func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter *LogFilter) (interface{}, types.Error) { +func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) { switch name { case "newHeads": return e.newBlockFilter(wsConn) @@ -1075,43 +1077,104 @@ func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (int // uninstallFilterByWSConn uninstalls the filters connected to the // provided web socket connection -func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error { +func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { return e.storage.UninstallFilterByWSConn(wsConn) } // onNewL2Block is triggered when the state triggers the event for a new l2 block func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { - e.processBlockFilters(event) - e.processLogFilters(event) + log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + wg := sync.WaitGroup{} + + wg.Add(1) + go e.notifyNewHeads(&wg, event) + + wg.Add(1) + go e.notifyNewLogs(&wg, event) + + wg.Wait() } -// processBlockFilters answer filters subscribed for block updates when a new l2 block event -// is detected -func (e *EthEndpoints) processBlockFilters(event state.NewL2BlockEvent) { +func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) { + defer wg.Done() + start := time.Now() blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn() if err != nil { log.Errorf("failed to get all block filters with web sockets connections: %v", err) } else { + b, err := types.NewBlock(&event.Block, nil, false, false) + if err != nil { + log.Errorf("failed to build block response to subscription: %v", err) + return + } + data, err := json.Marshal(b) + if err != nil { + log.Errorf("failed to marshal block response to subscription: %v", err) + return + } for _, filter := range blockFilters { - b, err := types.NewBlock(&event.Block, nil, false, false) - if err != nil { - log.Errorf("failed to build block response to subscription: %v", err) - } else { - e.sendSubscriptionResponse(filter, b) - } + e.sendSubscriptionResponse(filter, data) } } + log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) } -// processLogFilters answer filters subscribed for log updates when a new l2 block event -// is detected -func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) { +func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) { + defer wg.Done() + start := time.Now() logFilters, err := e.storage.GetAllLogFiltersWithWSConn() if err != nil { log.Errorf("failed to get all log filters with web sockets connections: %v", err) } else { for _, filter := range logFilters { - changes, err := e.GetFilterChanges(filter.ID) + filterParameters := filter.Parameters.(LogFilter) + bn := types.BlockNumber(event.Block.NumberU64()) + + // if from and to blocks are new, set it to the current block to make + // the query faster + if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { + filterParameters.FromBlock = &bn + filterParameters.ToBlock = &bn + } else { + // if the filter has a fromBlock value set + // and the event block number is smaller than the + // from block, skip this filter + if filterParameters.FromBlock != nil { + fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf(rpcErr.Error(), filter.ID, err) + continue + } + if fromBlock > event.Block.NumberU64() { + continue + } + // otherwise set the from block to a fixed number + // to avoid querying it again in the next step + fixedFromBlock := types.BlockNumber(fromBlock) + filterParameters.FromBlock = &fixedFromBlock + } + + // if the filter has a toBlock value set + // and the event block number is greater than the + // to block, skip this filter + if filterParameters.ToBlock != nil { + toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf(rpcErr.Error(), filter.ID, err) + continue + } + if toBlock > event.Block.NumberU64() { + continue + } + // otherwise set the to block to a fixed number + // to avoid querying it again in the next step + fixedToBlock := types.BlockNumber(toBlock) + filterParameters.ToBlock = &fixedToBlock + } + } + + // get new logs for this specific filter + changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) { log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err) err := e.storage.UninstallFilter(filter.ID) @@ -1126,39 +1189,43 @@ func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) { continue } + // if there are new logs for the filter, send it if changes != nil { ethLogs := changes.([]types.Log) for _, ethLog := range ethLogs { - e.sendSubscriptionResponse(filter, ethLog) + data, err := json.Marshal(ethLog) + if err != nil { + log.Errorf("failed to marshal ethLog response to subscription: %v", err) + } + e.sendSubscriptionResponse(filter, data) } } } } + log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds()) } -func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) { +func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) { const errMessage = "Unable to write WS message to filter %v, %s" - result, err := json.Marshal(data) - if err != nil { - log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) - } res := types.SubscriptionResponse{ JSONRPC: "2.0", Method: "eth_subscription", Params: types.SubscriptionResponseParams{ Subscription: filter.ID, - Result: result, + Result: data, }, } message, err := json.Marshal(res) if err != nil { log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) + return } - err = filter.WsConn.WriteMessage(websocket.TextMessage, message) + err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message) if err != nil { log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error())) + return } log.Debugf("WS message sent: %v", string(message)) } diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 54d6292484..d64b29a7ed 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -4,8 +4,10 @@ import ( "context" "encoding/json" "errors" + "fmt" "math/big" "strings" + "sync/atomic" "testing" "time" @@ -3628,7 +3630,7 @@ func TestNewFilter(t *testing.T) { Once() m.Storage. - On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). Return("1", nil). Once() }, @@ -3652,7 +3654,7 @@ func TestNewFilter(t *testing.T) { Once() m.Storage. - On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). Return("1", nil). Once() }, @@ -3715,7 +3717,7 @@ func TestNewFilter(t *testing.T) { Return(m.DbTx, nil). Once() m.Storage. - On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). Return("", errors.New("failed to add new filter")). Once() }, @@ -3766,7 +3768,7 @@ func TestNewBlockFilter(t *testing.T) { ExpectedError: nil, SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&websocket.Conn{})). + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). Return("1", nil). Once() }, @@ -3777,7 +3779,7 @@ func TestNewBlockFilter(t *testing.T) { ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"), SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&websocket.Conn{})). + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). Return("", errors.New("failed to add new block filter")). Once() }, @@ -3828,7 +3830,7 @@ func TestNewPendingTransactionFilter(t *testing.T) { // ExpectedError: nil, // SetupMocks: func(m *mocks, tc testCase) { // m.Storage. - // On("NewPendingTransactionFilter", mock.IsType(&websocket.Conn{})). + // On("NewPendingTransactionFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). // Return("1", nil). // Once() // }, @@ -3839,7 +3841,7 @@ func TestNewPendingTransactionFilter(t *testing.T) { // ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new pending transaction filter"), // SetupMocks: func(m *mocks, tc testCase) { // m.Storage. - // On("NewPendingTransactionFilter", mock.IsType(&websocket.Conn{})). + // On("NewPendingTransactionFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). // Return("", errors.New("failed to add new pending transaction filter")). // Once() // }, @@ -4885,3 +4887,178 @@ func TestGetFilterChanges(t *testing.T) { }) } } + +func TestSubscribeNewHeads(t *testing.T) { + s, m, _ := newSequencerMockedServer(t) + defer s.Stop() + + type testCase struct { + Name string + Channel chan *ethTypes.Header + ExpectedError interface{} + SetupMocks func(m *mocksWrapper, tc testCase) + } + + testCases := []testCase{ + { + Name: "Subscribe to new heads Successfully", + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.Storage. + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + Return("0x1", nil). + Once() + }, + }, + { + Name: "Subscribe fails to add filter to storage", + ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"), + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.Storage. + On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + Return("", fmt.Errorf("failed to add filter to storage")). + Once() + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + tc := testCase + tc.SetupMocks(m, tc) + + c := s.GetWSClient() + + ctx := context.Background() + newHeadsChannel := make(chan *ethTypes.Header, 100) + sub, err := c.SubscribeNewHead(ctx, newHeadsChannel) + + if sub != nil { + assert.NotNil(t, sub) + } + + if err != nil || tc.ExpectedError != nil { + if expectedErr, ok := tc.ExpectedError.(*types.RPCError); ok { + rpcErr := err.(rpc.Error) + assert.Equal(t, expectedErr.ErrorCode(), rpcErr.ErrorCode()) + assert.Equal(t, expectedErr.Error(), rpcErr.Error()) + } else { + assert.Equal(t, tc.ExpectedError, err) + } + } + }) + } +} + +func TestSubscribeNewLogs(t *testing.T) { + s, m, _ := newSequencerMockedServer(t) + defer s.Stop() + + type testCase struct { + Name string + Filter ethereum.FilterQuery + Channel chan *ethTypes.Log + ExpectedError interface{} + Prepare func(t *testing.T, tc *testCase) + SetupMocks func(m *mocksWrapper, tc testCase) + } + + testCases := []testCase{ + { + Name: "Subscribe to new logs by block hash successfully", + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + BlockHash: &blockHash, + } + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Commit", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + + m.Storage. + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + Return("0x1", nil). + Once() + }, + }, + { + Name: "Subscribe to new logs fails to add new filter to storage", + ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new log filter"), + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + BlockHash: &blockHash, + } + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + + m.Storage. + On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + Return("", fmt.Errorf("failed to add filter to storage")). + Once() + }, + }, + { + Name: "Subscribe to new logs fails due to max block range limit exceeded", + ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "logs are limited to a 10000 block range"), + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + FromBlock: big.NewInt(1), ToBlock: big.NewInt(10002), + } + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + }, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.Name, func(t *testing.T) { + tc := testCase + tc.Prepare(t, &tc) + tc.SetupMocks(m, tc) + + c := s.GetWSClient() + + ctx := context.Background() + newLogs := make(chan ethTypes.Log, 100) + sub, err := c.SubscribeFilterLogs(ctx, tc.Filter, newLogs) + + if sub != nil { + assert.NotNil(t, sub) + } + + if err != nil || tc.ExpectedError != nil { + if expectedErr, ok := tc.ExpectedError.(*types.RPCError); ok { + rpcErr := err.(rpc.Error) + assert.Equal(t, expectedErr.ErrorCode(), rpcErr.ErrorCode()) + assert.Equal(t, expectedErr.Error(), rpcErr.Error()) + } else { + assert.Equal(t, tc.ExpectedError, err) + } + } + }) + } +} diff --git a/jsonrpc/handler.go b/jsonrpc/handler.go index 68b839e58a..ce57c8e98a 100644 --- a/jsonrpc/handler.go +++ b/jsonrpc/handler.go @@ -36,7 +36,7 @@ func (f *funcData) numParams() int { type handleRequest struct { types.Request - wsConn *websocket.Conn + wsConn *atomic.Pointer[websocket.Conn] HttpRequest *http.Request } @@ -73,18 +73,10 @@ func newJSONRpcHandler() *Handler { return handler } -var connectionCounter int64 = 0 - // Handle is the function that knows which and how a function should // be executed when a JSON RPC request is received func (h *Handler) Handle(req handleRequest) types.Response { log := log.WithFields("method", req.Method, "requestId", req.ID) - atomic.AddInt64(&connectionCounter, 1) - defer func() { - atomic.AddInt64(&connectionCounter, -1) - log.Debugf("Current open connections %d", atomic.LoadInt64(&connectionCounter)) - }() - log.Debugf("Current open connections %d", atomic.LoadInt64(&connectionCounter)) log.Debugf("request params %v", string(req.Params)) service, fd, err := h.getFnHandler(req.Request) @@ -101,7 +93,7 @@ func (h *Handler) Handle(req handleRequest) types.Response { firstFuncParamIsWebSocketConn := false firstFuncParamIsHttpRequest := false if funcHasMoreThanOneInputParams { - firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&websocket.Conn{})) + firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&atomic.Pointer[websocket.Conn]{})) firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{})) } if requestHasWebSocketConn && firstFuncParamIsWebSocketConn { @@ -151,7 +143,7 @@ func (h *Handler) Handle(req handleRequest) types.Response { } // HandleWs handle websocket requests -func (h *Handler) HandleWs(reqBody []byte, wsConn *websocket.Conn, httpReq *http.Request) ([]byte, error) { +func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn], httpReq *http.Request) ([]byte, error) { log.Debugf("WS message received: %v", string(reqBody)) var req types.Request if err := json.Unmarshal(reqBody, &req); err != nil { @@ -168,7 +160,7 @@ func (h *Handler) HandleWs(reqBody []byte, wsConn *websocket.Conn, httpReq *http } // RemoveFilterByWsConn uninstalls the filter attached to this websocket connection -func (h *Handler) RemoveFilterByWsConn(wsConn *websocket.Conn) { +func (h *Handler) RemoveFilterByWsConn(wsConn *atomic.Pointer[websocket.Conn]) { service, ok := h.serviceMap[APIEth] if !ok { return diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index f1fce40123..18d5249f80 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -1,6 +1,8 @@ package jsonrpc import ( + "sync/atomic" + "github.com/gorilla/websocket" ) @@ -9,10 +11,10 @@ type storageInterface interface { GetAllBlockFiltersWithWSConn() ([]*Filter, error) GetAllLogFiltersWithWSConn() ([]*Filter, error) GetFilter(filterID string) (*Filter, error) - NewBlockFilter(wsConn *websocket.Conn) (string, error) - NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) - NewPendingTransactionFilter(wsConn *websocket.Conn) (string, error) + NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) + NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) + NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) UninstallFilter(filterID string) error - UninstallFilterByWSConn(wsConn *websocket.Conn) error + UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error UpdateFilterLastPoll(filterID string) error } diff --git a/jsonrpc/mock_storage.go b/jsonrpc/mock_storage.go index 50e19658e8..57cff709ee 100644 --- a/jsonrpc/mock_storage.go +++ b/jsonrpc/mock_storage.go @@ -3,6 +3,8 @@ package jsonrpc import ( + atomic "sync/atomic" + websocket "github.com/gorilla/websocket" mock "github.com/stretchr/testify/mock" ) @@ -91,21 +93,21 @@ func (_m *storageMock) GetFilter(filterID string) (*Filter, error) { } // NewBlockFilter provides a mock function with given fields: wsConn -func (_m *storageMock) NewBlockFilter(wsConn *websocket.Conn) (string, error) { +func (_m *storageMock) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { ret := _m.Called(wsConn) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*websocket.Conn) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) (string, error)); ok { return rf(wsConn) } - if rf, ok := ret.Get(0).(func(*websocket.Conn) string); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) string); ok { r0 = rf(wsConn) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*websocket.Conn) error); ok { + if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn]) error); ok { r1 = rf(wsConn) } else { r1 = ret.Error(1) @@ -115,21 +117,21 @@ func (_m *storageMock) NewBlockFilter(wsConn *websocket.Conn) (string, error) { } // NewLogFilter provides a mock function with given fields: wsConn, filter -func (_m *storageMock) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) { +func (_m *storageMock) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) { ret := _m.Called(wsConn, filter) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*websocket.Conn, LogFilter) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn], LogFilter) (string, error)); ok { return rf(wsConn, filter) } - if rf, ok := ret.Get(0).(func(*websocket.Conn, LogFilter) string); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn], LogFilter) string); ok { r0 = rf(wsConn, filter) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*websocket.Conn, LogFilter) error); ok { + if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn], LogFilter) error); ok { r1 = rf(wsConn, filter) } else { r1 = ret.Error(1) @@ -139,21 +141,21 @@ func (_m *storageMock) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (s } // NewPendingTransactionFilter provides a mock function with given fields: wsConn -func (_m *storageMock) NewPendingTransactionFilter(wsConn *websocket.Conn) (string, error) { +func (_m *storageMock) NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { ret := _m.Called(wsConn) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*websocket.Conn) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) (string, error)); ok { return rf(wsConn) } - if rf, ok := ret.Get(0).(func(*websocket.Conn) string); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) string); ok { r0 = rf(wsConn) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*websocket.Conn) error); ok { + if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn]) error); ok { r1 = rf(wsConn) } else { r1 = ret.Error(1) @@ -177,11 +179,11 @@ func (_m *storageMock) UninstallFilter(filterID string) error { } // UninstallFilterByWSConn provides a mock function with given fields: wsConn -func (_m *storageMock) UninstallFilterByWSConn(wsConn *websocket.Conn) error { +func (_m *storageMock) UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { ret := _m.Called(wsConn) var r0 error - if rf, ok := ret.Get(0).(func(*websocket.Conn) error); ok { + if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) error); ok { r0 = rf(wsConn) } else { r0 = ret.Error(0) diff --git a/jsonrpc/mocks/mock_state.go b/jsonrpc/mocks/mock_state.go index 43c776a40a..3d79cf5409 100644 --- a/jsonrpc/mocks/mock_state.go +++ b/jsonrpc/mocks/mock_state.go @@ -976,11 +976,6 @@ func (_m *StateMock) IsL2BlockVirtualized(ctx context.Context, blockNumber uint6 return r0, r1 } -// PrepareWebSocket provides a mock function with given fields: -func (_m *StateMock) PrepareWebSocket() { - _m.Called() -} - // ProcessUnsignedTransaction provides a mock function with given fields: ctx, tx, senderAddress, l2BlockNumber, noZKEVMCounters, dbTx func (_m *StateMock) ProcessUnsignedTransaction(ctx context.Context, tx *coretypes.Transaction, senderAddress common.Address, l2BlockNumber *uint64, noZKEVMCounters bool, dbTx pgx.Tx) (*runtime.ExecutionResult, error) { ret := _m.Called(ctx, tx, senderAddress, l2BlockNumber, noZKEVMCounters, dbTx) @@ -1012,6 +1007,11 @@ func (_m *StateMock) RegisterNewL2BlockEventHandler(h state.NewL2BlockEventHandl _m.Called(h) } +// StartToMonitorNewL2Blocks provides a mock function with given fields: +func (_m *StateMock) StartToMonitorNewL2Blocks() { + _m.Called() +} + // NewStateMock creates a new instance of StateMock. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func NewStateMock(t interface { diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 2cc375dd36..83f888d8a0 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -3,6 +3,7 @@ package jsonrpc import ( "encoding/json" "fmt" + "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -26,7 +27,7 @@ type Filter struct { Type FilterType Parameters interface{} LastPoll time.Time - WsConn *websocket.Conn + WsConn *atomic.Pointer[websocket.Conn] } // FilterType express the type of the filter, block, logs, pending transactions diff --git a/jsonrpc/server.go b/jsonrpc/server.go index bd6da338b7..008b34943d 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "sync" + "sync/atomic" "syscall" "time" @@ -51,6 +52,10 @@ type Server struct { srv *http.Server wsSrv *http.Server wsUpgrader websocket.Upgrader + + connCounterMutex sync.Mutex + httpConnCounter int64 + wsConnCounter int64 } // Service defines a struct that will provide public methods to be exposed @@ -75,7 +80,10 @@ func NewServer( storage storageInterface, services []Service, ) *Server { - s.PrepareWebSocket() + if cfg.WebSockets.Enabled { + s.StartToMonitorNewL2Blocks() + } + handler := newJSONRpcHandler() for _, service := range services { @@ -237,6 +245,9 @@ func (s *Server) handle(w http.ResponseWriter, req *http.Request) { return } + s.increaseHttpConnCounter() + defer s.decreaseHttpConnCounter() + start := time.Now() w.Header().Set("Content-Type", contentType) w.Header().Set("Access-Control-Allow-Origin", "*") @@ -376,28 +387,38 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { s.wsUpgrader.CheckOrigin = func(r *http.Request) bool { return true } // Upgrade the connection to a WS one - wsConn, err := s.wsUpgrader.Upgrade(w, req, nil) + innerWsConn, err := s.wsUpgrader.Upgrade(w, req, nil) if err != nil { log.Error(fmt.Sprintf("Unable to upgrade to a WS connection, %s", err.Error())) - return } + wsConn := new(atomic.Pointer[websocket.Conn]) + wsConn.Store(innerWsConn) + // Set read limit - wsConn.SetReadLimit(s.config.WebSockets.ReadLimit) + wsConn.Load().SetReadLimit(s.config.WebSockets.ReadLimit) // Defer WS closure - defer func(ws *websocket.Conn) { - err = ws.Close() + defer func(wsConn *atomic.Pointer[websocket.Conn]) { + err = wsConn.Load().Close() if err != nil { log.Error(fmt.Sprintf("Unable to gracefully close WS connection, %s", err.Error())) } }(wsConn) + s.increaseWsConnCounter() + defer s.decreaseWsConnCounter() + + // recover + defer func() { + if err := recover(); err != nil { + log.Error(err) + } + }() log.Info("Websocket connection established") - var mu sync.Mutex for { - msgType, message, err := wsConn.ReadMessage() + msgType, message, err := wsConn.Load().ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) { log.Info("Closing WS connection gracefully") @@ -414,21 +435,52 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { } if msgType == websocket.TextMessage || msgType == websocket.BinaryMessage { - go func() { - mu.Lock() - defer mu.Unlock() - resp, err := s.handler.HandleWs(message, wsConn, req) - if err != nil { - log.Error(fmt.Sprintf("Unable to handle WS request, %s", err.Error())) - _ = wsConn.WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error()))) - } else { - _ = wsConn.WriteMessage(msgType, resp) - } - }() + resp, err := s.handler.HandleWs(message, wsConn, req) + if err != nil { + log.Error(fmt.Sprintf("Unable to handle WS request, %s", err.Error())) + _ = wsConn.Load().WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error()))) + } else { + _ = wsConn.Load().WriteMessage(msgType, resp) + } } } } +func (s *Server) increaseHttpConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.httpConnCounter, 1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) decreaseHttpConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.httpConnCounter, -1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) increaseWsConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.wsConnCounter, 1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) decreaseWsConnCounter() { + s.connCounterMutex.Lock() + atomic.AddInt64(&s.wsConnCounter, -1) + s.logConnCounters() + s.connCounterMutex.Unlock() +} + +func (s *Server) logConnCounters() { + httpConnCounter := atomic.LoadInt64(&s.httpConnCounter) + wsConnCounter := atomic.LoadInt64(&s.wsConnCounter) + totalConnCounter := httpConnCounter + wsConnCounter + log.Debugf("[ HTTP conns: %v | WS conns: %v | Total conns: %v ]", httpConnCounter, wsConnCounter, totalConnCounter) +} + func handleInvalidRequest(w http.ResponseWriter, err error, code int) { defer metrics.RequestHandled(metrics.RequestHandledLabelInvalid) log.Infof("Invalid Request: %v", err.Error()) diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 447844787d..2bb5e1d72f 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -31,9 +31,10 @@ const ( ) type mockedServer struct { - Config Config - Server *Server - ServerURL string + Config Config + Server *Server + ServerURL string + ServerWebSocketsURL string } type mocksWrapper struct { @@ -61,7 +62,7 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e var newL2BlockEventHandler state.NewL2BlockEventHandler = func(e state.NewL2BlockEvent) {} st.On("RegisterNewL2BlockEventHandler", mock.IsType(newL2BlockEventHandler)).Once() - st.On("PrepareWebSocket").Once() + st.On("StartToMonitorNewL2Blocks").Once() services := []Service{} if _, ok := apis[APIEth]; ok { @@ -128,10 +129,13 @@ func newMockedServer(t *testing.T, cfg Config) (*mockedServer, *mocksWrapper, *e ethClient, err := ethclient.Dial(serverURL) require.NoError(t, err) + serverWebSocketsURL := fmt.Sprintf("ws://%s:%d", cfg.WebSockets.Host, cfg.WebSockets.Port) + msv := &mockedServer{ - Config: cfg, - Server: server, - ServerURL: serverURL, + Config: cfg, + Server: server, + ServerURL: serverURL, + ServerWebSocketsURL: serverWebSocketsURL, } mks := &mocksWrapper{ @@ -154,6 +158,12 @@ func getSequencerDefaultConfig() Config { BatchRequestsEnabled: true, MaxLogsCount: 10000, MaxLogsBlockRange: 10000, + WebSockets: WebSocketsConfig{ + Enabled: true, + Host: "0.0.0.0", + Port: 9133, + ReadLimit: 0, + }, } return cfg } @@ -179,6 +189,15 @@ func newNonSequencerMockedServer(t *testing.T, sequencerNodeURI string) (*mocked return newMockedServer(t, cfg) } +func (s *mockedServer) GetWSClient() *ethclient.Client { + ethClient, err := ethclient.Dial(s.ServerWebSocketsURL) + if err != nil { + panic(err) + } + + return ethClient +} + func (s *mockedServer) Stop() { err := s.Server.Stop() if err != nil { diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index ada5f32ffe..8838c6fe08 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -31,7 +32,7 @@ func NewStorage() *Storage { } // NewLogFilter persists a new log filter -func (s *Storage) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) { +func (s *Storage) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) { shouldFilterByBlockHash := filter.BlockHash != nil shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil @@ -43,17 +44,17 @@ func (s *Storage) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string } // NewBlockFilter persists a new block log filter -func (s *Storage) NewBlockFilter(wsConn *websocket.Conn) (string, error) { +func (s *Storage) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { return s.createFilter(FilterTypeBlock, nil, wsConn) } // NewPendingTransactionFilter persists a new pending transaction filter -func (s *Storage) NewPendingTransactionFilter(wsConn *websocket.Conn) (string, error) { +func (s *Storage) NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { return s.createFilter(FilterTypePendingTx, nil, wsConn) } // create persists the filter to the memory and provides the filter id -func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *websocket.Conn) (string, error) { +func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *atomic.Pointer[websocket.Conn]) (string, error) { lastPoll := time.Now().UTC() id, err := s.generateFilterID() if err != nil { @@ -154,7 +155,7 @@ func (s *Storage) UninstallFilter(filterID string) error { } // UninstallFilterByWSConn deletes all filters connected to the provided web socket connection -func (s *Storage) UninstallFilterByWSConn(wsConn *websocket.Conn) error { +func (s *Storage) UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { filterIDsToDelete := []string{} s.filters.Range(func(key, value any) bool { id := key.(string) diff --git a/jsonrpc/types/interfaces.go b/jsonrpc/types/interfaces.go index 6ce14137a1..ccc411e219 100644 --- a/jsonrpc/types/interfaces.go +++ b/jsonrpc/types/interfaces.go @@ -26,7 +26,7 @@ type PoolInterface interface { // StateInterface gathers the methods required to interact with the state. type StateInterface interface { - PrepareWebSocket() + StartToMonitorNewL2Blocks() BeginStateTransaction(ctx context.Context) (pgx.Tx, error) DebugTransaction(ctx context.Context, transactionHash common.Hash, traceConfig state.TraceConfig, dbTx pgx.Tx) (*runtime.ExecutionResult, error) EstimateGas(transaction *types.Transaction, senderAddress common.Address, l2BlockNumber *uint64, dbTx pgx.Tx) (uint64, []byte, error) diff --git a/log/log.go b/log/log.go index fcc8e479ba..7c93468769 100644 --- a/log/log.go +++ b/log/log.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "strings" + "sync/atomic" "github.com/0xPolygonHermez/zkevm-node" "github.com/hermeznetwork/tracerr" @@ -27,11 +28,12 @@ type Logger struct { } // root logger -var log *Logger +var log atomic.Pointer[Logger] func getDefaultLog() *Logger { - if log != nil { - return log + l := log.Load() + if l != nil { + return l } // default level: debug zapLogger, _, err := NewLogger(Config{ @@ -42,8 +44,8 @@ func getDefaultLog() *Logger { if err != nil { panic(err) } - log = &Logger{x: zapLogger} - return log + log.Store(&Logger{x: zapLogger}) + return log.Load() } // Init the logger with defined level. outputs defines the outputs where the @@ -56,7 +58,7 @@ func Init(cfg Config) { if err != nil { panic(err) } - log = &Logger{x: zapLogger} + log.Store(&Logger{x: zapLogger}) } // NewLogger creates the logger with defined level. outputs defines the outputs where the diff --git a/state/l2block.go b/state/l2block.go index a4d4824ab3..f2ed7ba3d1 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -21,15 +21,19 @@ type NewL2BlockEvent struct { Block types.Block } -// PrepareWebSocket allows the RPC to prepare ws -func (s *State) PrepareWebSocket() { +// StartToMonitorNewL2Blocks starts 2 go routines that will +// monitor new blocks and execute handlers registered to be executed +// when a new l2 block is detected. This is used by the RPC WebSocket +// filter subscription but can be used by any other component that +// needs to react to a new L2 block added to the state. +func (s *State) StartToMonitorNewL2Blocks() { lastL2Block, err := s.GetLastL2Block(context.Background(), nil) if errors.Is(err, ErrStateNotSynchronized) { lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)}) } else if err != nil { log.Fatalf("failed to load the last l2 block: %v", err) } - s.lastL2BlockSeen = *lastL2Block + s.lastL2BlockSeen.Store(lastL2Block) go s.monitorNewL2Blocks() go s.handleEvents() } @@ -43,6 +47,7 @@ func (s *State) RegisterNewL2BlockEventHandler(h NewL2BlockEventHandler) { func (s *State) handleEvents() { for newL2BlockEvent := range s.newL2BlockEvents { + log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) if len(s.newL2BlockEventHandlers) == 0 { continue } @@ -50,15 +55,18 @@ func (s *State) handleEvents() { wg := sync.WaitGroup{} for _, handler := range s.newL2BlockEventHandlers { wg.Add(1) - go func(h NewL2BlockEventHandler) { + go func(h NewL2BlockEventHandler, e NewL2BlockEvent) { defer func() { wg.Done() if r := recover(); r != nil { log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r) } }() - h(newL2BlockEvent) - }(handler) + log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) + start := time.Now() + h(e) + log.Debugf("[handleEvents] new l2 block event handler for block %v took %vms to be executed", e.Block.NumberU64(), time.Since(start).Milliseconds()) + }(handler, newL2BlockEvent) } wg.Wait() } @@ -85,24 +93,32 @@ func (s *State) monitorNewL2Blocks() { continue } + lastL2BlockSeen := s.lastL2BlockSeen.Load() + // not updates until now - if lastL2Block == nil || s.lastL2BlockSeen.NumberU64() >= lastL2Block.NumberU64() { + if lastL2Block == nil || lastL2BlockSeen.NumberU64() >= lastL2Block.NumberU64() { waitNextCycle() continue } - for bn := s.lastL2BlockSeen.NumberU64() + uint64(1); bn <= lastL2Block.NumberU64(); bn++ { + fromBlockNumber := lastL2BlockSeen.NumberU64() + uint64(1) + toBlockNumber := lastL2Block.NumberU64() + log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) + + for bn := fromBlockNumber; bn <= toBlockNumber; bn++ { block, err := s.GetL2BlockByNumber(context.Background(), bn, nil) if err != nil { - log.Errorf("failed to l2 block while monitoring new blocks: %v", err) + log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) break } - + log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64()) + start := time.Now() s.newL2BlockEvents <- NewL2BlockEvent{ Block: *block, } - log.Infof("new l2 blocks detected, Number %v, Hash %v", block.NumberU64(), block.Hash().String()) - s.lastL2BlockSeen = *block + log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %vms to be sent", block.NumberU64(), time.Since(start).Milliseconds()) + log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) + s.lastL2BlockSeen.Store(block) } // interval to check for new l2 blocks diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 1c394a8c71..cbb1602873 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -9,6 +9,7 @@ import ( "time" "github.com/0xPolygonHermez/zkevm-node/hex" + "github.com/0xPolygonHermez/zkevm-node/log" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" @@ -1461,6 +1462,8 @@ func scanLogs(rows pgx.Rows) ([]*types.Log, error) { // AddL2Block adds a new L2 block to the State Store func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2Block *types.Block, receipts []*types.Receipt, effectivePercentage uint8, dbTx pgx.Tx) error { + log.Debugf("[AddL2Block] adding l2 block: %v", l2Block.NumberU64()) + start := time.Now() e := p.getExecQuerier(dbTx) const addTransactionSQL = "INSERT INTO state.transaction (hash, encoded, decoded, l2_block_num, effective_percentage) VALUES($1, $2, $3, $4, $5)" @@ -1524,7 +1527,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2 } } } - + log.Debugf("[AddL2Block] l2 block %v took %vms to be added", l2Block.NumberU64(), time.Since(start).Milliseconds()) return nil } diff --git a/state/state.go b/state/state.go index 487c19f726..47177459a1 100644 --- a/state/state.go +++ b/state/state.go @@ -4,6 +4,7 @@ import ( "context" "math/big" "sync" + "sync/atomic" "github.com/0xPolygonHermez/zkevm-node/event" "github.com/0xPolygonHermez/zkevm-node/merkletree" @@ -15,6 +16,8 @@ import ( "google.golang.org/protobuf/types/known/emptypb" ) +const newL2BlockEventBufferSize = 500 + var ( // ZeroHash is the hash 0x0000000000000000000000000000000000000000000000000000000000000000 ZeroHash = common.Hash{} @@ -30,7 +33,7 @@ type State struct { tree *merkletree.StateTree eventLog *event.EventLog - lastL2BlockSeen types.Block + lastL2BlockSeen atomic.Pointer[types.Block] newL2BlockEvents chan NewL2BlockEvent newL2BlockEventHandlers []NewL2BlockEventHandler } @@ -48,7 +51,7 @@ func NewState(cfg Config, storage *PostgresStorage, executorClient executor.Exec executorClient: executorClient, tree: stateTree, eventLog: eventLog, - newL2BlockEvents: make(chan NewL2BlockEvent), + newL2BlockEvents: make(chan NewL2BlockEvent, newL2BlockEventBufferSize), newL2BlockEventHandlers: []NewL2BlockEventHandler{}, } diff --git a/test/docker-compose.yml b/test/docker-compose.yml index dd7488a20f..a00598dd0e 100644 --- a/test/docker-compose.yml +++ b/test/docker-compose.yml @@ -56,6 +56,8 @@ services: environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_POOL_DB_HOST=zkevm-pool-db + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json @@ -72,6 +74,8 @@ services: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_POOL_DB_HOST=zkevm-pool-db - ZKEVM_NODE_SEQUENCER_SENDER_ADDRESS=0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266 + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./sequencer.keystore:/pk/sequencer.keystore - ./config/test.node.config.toml:/app/config.toml @@ -91,6 +95,8 @@ services: environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db - ZKEVM_NODE_POOL_DB_HOST=zkevm-pool-db + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json @@ -123,6 +129,8 @@ services: - 9095:9091 # needed if metrics enabled environment: - ZKEVM_NODE_STATE_DB_HOST=zkevm-state-db + - ZKEVM_NODE_MTCLIENT_URI=${ZKEVM_NODE_MTCLIENT_URI} + - ZKEVM_NODE_EXECUTOR_URI=${ZKEVM_NODE_EXECUTOR_URI} volumes: - ./config/test.node.config.toml:/app/config.toml - ./config/test.genesis.config.json:/app/genesis.json