diff --git a/db/migrations/state/0011.sql b/db/migrations/state/0011.sql new file mode 100644 index 0000000000..e4294cdf20 --- /dev/null +++ b/db/migrations/state/0011.sql @@ -0,0 +1,21 @@ +-- +migrate Up +CREATE INDEX IF NOT EXISTS l2block_created_at_idx ON state.l2block (created_at); + +CREATE INDEX IF NOT EXISTS log_log_index_idx ON state.log (log_index); +CREATE INDEX IF NOT EXISTS log_topic0_idx ON state.log (topic0); +CREATE INDEX IF NOT EXISTS log_topic1_idx ON state.log (topic1); +CREATE INDEX IF NOT EXISTS log_topic2_idx ON state.log (topic2); +CREATE INDEX IF NOT EXISTS log_topic3_idx ON state.log (topic3); + +ALTER TABLE state.transaction ADD COLUMN egp_log JSONB; + +-- +migrate Down +DROP INDEX IF EXISTS state.l2block_created_at_idx; + +DROP INDEX IF EXISTS state.log_log_index_idx; +DROP INDEX IF EXISTS state.log_topic0_idx; +DROP INDEX IF EXISTS state.log_topic1_idx; +DROP INDEX IF EXISTS state.log_topic2_idx; +DROP INDEX IF EXISTS state.log_topic3_idx; + +ALTER TABLE state.transaction DROP COLUMN egp_log; \ No newline at end of file diff --git a/db/migrations/state/0011_test.go b/db/migrations/state/0011_test.go new file mode 100644 index 0000000000..3c245e7d31 --- /dev/null +++ b/db/migrations/state/0011_test.go @@ -0,0 +1,73 @@ +package migrations_test + +import ( + "database/sql" + "testing" + + "github.com/stretchr/testify/assert" +) + +// this migration changes length of the token name +type migrationTest0011 struct{} + +func (m migrationTest0011) InsertData(db *sql.DB) error { + return nil +} + +func (m migrationTest0011) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) { + indexes := []string{ + "l2block_created_at_idx", + "log_log_index_idx", + "log_topic0_idx", + "log_topic1_idx", + "log_topic2_idx", + "log_topic3_idx", + } + // Check indexes adding + for _, idx := range indexes { + // getIndex + const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` + row := db.QueryRow(getIndex, idx) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 1, result) + } + + // Check column egp_log exists in state.transactions table + const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'` + row := db.QueryRow(getFinalDeviationColumn) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 1, result) +} + +func (m migrationTest0011) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) { + indexes := []string{ + "l2block_created_at_idx", + "log_log_index_idx", + "log_topic0_idx", + "log_topic1_idx", + "log_topic2_idx", + "log_topic3_idx", + } + // Check indexes removing + for _, idx := range indexes { + // getIndex + const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` + row := db.QueryRow(getIndex, idx) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 0, result) + } + + // Check column egp_log doesn't exists in state.transactions table + const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'` + row := db.QueryRow(getFinalDeviationColumn) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 0, result) +} + +func TestMigration0011(t *testing.T) { + runMigrationTest(t, 11, migrationTest0011{}) +} diff --git a/jsonrpc/endpoints_debug.go b/jsonrpc/endpoints_debug.go index 350d7bb14b..a76cbadbca 100644 --- a/jsonrpc/endpoints_debug.go +++ b/jsonrpc/endpoints_debug.go @@ -218,7 +218,7 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty buffer := make(chan byte, bufferSize) - mu := sync.Mutex{} + mu := &sync.Mutex{} wg := sync.WaitGroup{} wg.Add(len(receipts)) responses := make([]traceResponse, 0, len(receipts)) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index e8d46f8014..94839b3443 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -9,7 +9,6 @@ import ( "net/http" "strings" "sync" - "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -21,7 +20,6 @@ import ( "github.com/0xPolygonHermez/zkevm-node/state/runtime" "github.com/ethereum/go-ethereum/common" ethTypes "github.com/ethereum/go-ethereum/core/types" - "github.com/gorilla/websocket" "github.com/jackc/pgx/v4" ) @@ -793,7 +791,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) { } // internal -func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) { +func (e *EthEndpoints) newBlockFilter(wsConn *concurrentWsConn) (interface{}, types.Error) { id, err := e.storage.NewBlockFilter(wsConn) if err != nil { return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err) @@ -810,7 +808,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) { } // internal -func (e *EthEndpoints) newFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (interface{}, types.Error) { +func (e *EthEndpoints) newFilter(wsConn *concurrentWsConn, filter LogFilter) (interface{}, types.Error) { id, err := e.storage.NewLogFilter(wsConn, filter) if errors.Is(err, ErrFilterInvalidPayload) { return RPCErrorResponse(types.InvalidParamsErrorCode, err.Error(), nil) @@ -829,7 +827,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error) } // internal -func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) { +func (e *EthEndpoints) newPendingTransactionFilter(wsConn *concurrentWsConn) (interface{}, types.Error) { return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet") // id, err := e.storage.NewPendingTransactionFilter(wsConn) // if err != nil { @@ -991,7 +989,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error { // The node will return a subscription id. // For each event that matches the subscription a notification with relevant // data is sent together with the subscription id. -func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) { +func (e *EthEndpoints) Subscribe(wsConn *concurrentWsConn, name string, logFilter *LogFilter) (interface{}, types.Error) { switch name { case "newHeads": return e.newBlockFilter(wsConn) @@ -1011,13 +1009,13 @@ func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name st } // Unsubscribe uninstalls the filter based on the provided filterID -func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (interface{}, types.Error) { +func (e *EthEndpoints) Unsubscribe(wsConn *concurrentWsConn, filterID string) (interface{}, types.Error) { return e.UninstallFilter(filterID) } // uninstallFilterByWSConn uninstalls the filters connected to the // provided web socket connection -func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { +func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error { return e.storage.UninstallFilterByWSConn(wsConn) } @@ -1054,7 +1052,6 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block } for _, filter := range blockFilters { filter.EnqueueSubscriptionDataToBeSent(data) - go filter.SendEnqueuedSubscriptionData() } } log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) @@ -1071,9 +1068,16 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE filterParameters := filter.Parameters.(LogFilter) bn := types.BlockNumber(event.Block.NumberU64()) - // if from and to blocks are nil, set it to the current block to make - // the query faster - if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { + if filterParameters.BlockHash != nil { + // if the filter block hash is set, we check if the block is the + // one with the expected hash, otherwise we ignore the filter + bh := *filterParameters.BlockHash + if bh.String() != event.Block.Hash().String() { + continue + } + } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { + // in case the block hash is nil and also from and to blocks are nil, set it + // to the current block to make the query faster filterParameters.FromBlock = &bn filterParameters.ToBlock = &bn } else { @@ -1086,12 +1090,15 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE log.Errorf(rpcErr.Error(), filter.ID, err) continue } - if fromBlock > event.Block.NumberU64() { + // if the block number is smaller than the fromBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() < fromBlock { continue } // otherwise set the from block to a fixed number // to avoid querying it again in the next step - fixedFromBlock := types.BlockNumber(fromBlock) + fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) filterParameters.FromBlock = &fixedFromBlock } @@ -1104,12 +1111,15 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE log.Errorf(rpcErr.Error(), filter.ID, err) continue } - if toBlock > event.Block.NumberU64() { + // if the block number is greater than the toBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() > toBlock { continue } // otherwise set the to block to a fixed number // to avoid querying it again in the next step - fixedToBlock := types.BlockNumber(toBlock) + fixedToBlock := types.BlockNumber(event.Block.NumberU64()) filterParameters.ToBlock = &fixedToBlock } } @@ -1130,7 +1140,6 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE log.Errorf("failed to marshal ethLog response to subscription: %v", err) } filter.EnqueueSubscriptionDataToBeSent(data) - go filter.SendEnqueuedSubscriptionData() } } } diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index e9ad2096c3..9f91ad5a36 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -7,7 +7,6 @@ import ( "fmt" "math/big" "strings" - "sync/atomic" "testing" "time" @@ -24,7 +23,6 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/trie" - "github.com/gorilla/websocket" "github.com/jackc/pgx/v4" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" @@ -3558,7 +3556,7 @@ func TestNewFilter(t *testing.T) { ExpectedError: nil, SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})). Return("1", nil). Once() }, @@ -3572,7 +3570,7 @@ func TestNewFilter(t *testing.T) { ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new log filter"), SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})). Return("", errors.New("failed to add new filter")). Once() }, @@ -3587,7 +3585,7 @@ func TestNewFilter(t *testing.T) { ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "invalid argument 0: cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other"), SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})). Once(). Return("", ErrFilterInvalidPayload). Once() @@ -3639,7 +3637,7 @@ func TestNewBlockFilter(t *testing.T) { ExpectedError: nil, SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + On("NewBlockFilter", mock.IsType(&concurrentWsConn{})). Return("1", nil). Once() }, @@ -3650,7 +3648,7 @@ func TestNewBlockFilter(t *testing.T) { ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"), SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + On("NewBlockFilter", mock.IsType(&concurrentWsConn{})). Return("", errors.New("failed to add new block filter")). Once() }, @@ -4717,7 +4715,7 @@ func TestSubscribeNewHeads(t *testing.T) { Name: "Subscribe to new heads Successfully", SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + On("NewBlockFilter", mock.IsType(&concurrentWsConn{})). Return("0x1", nil). Once() }, @@ -4727,7 +4725,7 @@ func TestSubscribeNewHeads(t *testing.T) { ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"), SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})). + On("NewBlockFilter", mock.IsType(&concurrentWsConn{})). Return("", fmt.Errorf("failed to add filter to storage")). Once() }, @@ -4785,7 +4783,7 @@ func TestSubscribeNewLogs(t *testing.T) { }, SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})). Return("0x1", nil). Once() }, @@ -4800,7 +4798,7 @@ func TestSubscribeNewLogs(t *testing.T) { }, SetupMocks: func(m *mocksWrapper, tc testCase) { m.Storage. - On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})). + On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})). Return("", fmt.Errorf("failed to add filter to storage")). Once() }, diff --git a/jsonrpc/handler.go b/jsonrpc/handler.go index ce57c8e98a..3edb3180e2 100644 --- a/jsonrpc/handler.go +++ b/jsonrpc/handler.go @@ -6,12 +6,10 @@ import ( "net/http" "reflect" "strings" - "sync/atomic" "unicode" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/types" "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/gorilla/websocket" ) const ( @@ -36,7 +34,7 @@ func (f *funcData) numParams() int { type handleRequest struct { types.Request - wsConn *atomic.Pointer[websocket.Conn] + wsConn *concurrentWsConn HttpRequest *http.Request } @@ -93,7 +91,7 @@ func (h *Handler) Handle(req handleRequest) types.Response { firstFuncParamIsWebSocketConn := false firstFuncParamIsHttpRequest := false if funcHasMoreThanOneInputParams { - firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&atomic.Pointer[websocket.Conn]{})) + firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&concurrentWsConn{})) firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{})) } if requestHasWebSocketConn && firstFuncParamIsWebSocketConn { @@ -143,7 +141,7 @@ func (h *Handler) Handle(req handleRequest) types.Response { } // HandleWs handle websocket requests -func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn], httpReq *http.Request) ([]byte, error) { +func (h *Handler) HandleWs(reqBody []byte, wsConn *concurrentWsConn, httpReq *http.Request) ([]byte, error) { log.Debugf("WS message received: %v", string(reqBody)) var req types.Request if err := json.Unmarshal(reqBody, &req); err != nil { @@ -160,7 +158,7 @@ func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn } // RemoveFilterByWsConn uninstalls the filter attached to this websocket connection -func (h *Handler) RemoveFilterByWsConn(wsConn *atomic.Pointer[websocket.Conn]) { +func (h *Handler) RemoveFilterByWsConn(wsConn *concurrentWsConn) { service, ok := h.serviceMap[APIEth] if !ok { return diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index 18d5249f80..fd6de2538f 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -1,20 +1,14 @@ package jsonrpc -import ( - "sync/atomic" - - "github.com/gorilla/websocket" -) - // storageInterface json rpc internal storage to persist data type storageInterface interface { GetAllBlockFiltersWithWSConn() ([]*Filter, error) GetAllLogFiltersWithWSConn() ([]*Filter, error) GetFilter(filterID string) (*Filter, error) - NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) - NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) - NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) + NewBlockFilter(wsConn *concurrentWsConn) (string, error) + NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error) + NewPendingTransactionFilter(wsConn *concurrentWsConn) (string, error) UninstallFilter(filterID string) error - UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error + UninstallFilterByWSConn(wsConn *concurrentWsConn) error UpdateFilterLastPoll(filterID string) error } diff --git a/jsonrpc/mock_storage.go b/jsonrpc/mock_storage.go index 57cff709ee..4ee466de93 100644 --- a/jsonrpc/mock_storage.go +++ b/jsonrpc/mock_storage.go @@ -3,9 +3,6 @@ package jsonrpc import ( - atomic "sync/atomic" - - websocket "github.com/gorilla/websocket" mock "github.com/stretchr/testify/mock" ) @@ -93,21 +90,21 @@ func (_m *storageMock) GetFilter(filterID string) (*Filter, error) { } // NewBlockFilter provides a mock function with given fields: wsConn -func (_m *storageMock) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { +func (_m *storageMock) NewBlockFilter(wsConn *concurrentWsConn) (string, error) { ret := _m.Called(wsConn) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn) (string, error)); ok { return rf(wsConn) } - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) string); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn) string); ok { r0 = rf(wsConn) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn]) error); ok { + if rf, ok := ret.Get(1).(func(*concurrentWsConn) error); ok { r1 = rf(wsConn) } else { r1 = ret.Error(1) @@ -117,21 +114,21 @@ func (_m *storageMock) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (s } // NewLogFilter provides a mock function with given fields: wsConn, filter -func (_m *storageMock) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) { +func (_m *storageMock) NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error) { ret := _m.Called(wsConn, filter) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn], LogFilter) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn, LogFilter) (string, error)); ok { return rf(wsConn, filter) } - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn], LogFilter) string); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn, LogFilter) string); ok { r0 = rf(wsConn, filter) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn], LogFilter) error); ok { + if rf, ok := ret.Get(1).(func(*concurrentWsConn, LogFilter) error); ok { r1 = rf(wsConn, filter) } else { r1 = ret.Error(1) @@ -141,21 +138,21 @@ func (_m *storageMock) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filt } // NewPendingTransactionFilter provides a mock function with given fields: wsConn -func (_m *storageMock) NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { +func (_m *storageMock) NewPendingTransactionFilter(wsConn *concurrentWsConn) (string, error) { ret := _m.Called(wsConn) var r0 string var r1 error - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) (string, error)); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn) (string, error)); ok { return rf(wsConn) } - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) string); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn) string); ok { r0 = rf(wsConn) } else { r0 = ret.Get(0).(string) } - if rf, ok := ret.Get(1).(func(*atomic.Pointer[websocket.Conn]) error); ok { + if rf, ok := ret.Get(1).(func(*concurrentWsConn) error); ok { r1 = rf(wsConn) } else { r1 = ret.Error(1) @@ -179,11 +176,11 @@ func (_m *storageMock) UninstallFilter(filterID string) error { } // UninstallFilterByWSConn provides a mock function with given fields: wsConn -func (_m *storageMock) UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { +func (_m *storageMock) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { ret := _m.Called(wsConn) var r0 error - if rf, ok := ret.Get(0).(func(*atomic.Pointer[websocket.Conn]) error); ok { + if rf, ok := ret.Get(0).(func(*concurrentWsConn) error); ok { r0 = rf(wsConn) } else { r0 = ret.Error(0) diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 5730949ea2..f47e34a9ef 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -4,7 +4,6 @@ import ( "encoding/json" "fmt" "sync" - "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" @@ -31,40 +30,41 @@ type Filter struct { Type FilterType Parameters interface{} LastPoll time.Time - WsConn *atomic.Pointer[websocket.Conn] + WsConn *concurrentWsConn - wsDataQueue state.Queue[[]byte] - mutex sync.Mutex - isSending bool + wsQueue *state.Queue[[]byte] + wsQueueSignal *sync.Cond } // EnqueueSubscriptionDataToBeSent enqueues subscription data to be sent // via web sockets connection func (f *Filter) EnqueueSubscriptionDataToBeSent(data []byte) { - f.wsDataQueue.Push(data) + f.wsQueue.Push(data) + f.wsQueueSignal.Broadcast() } // SendEnqueuedSubscriptionData consumes all the enqueued subscription data // and sends it via web sockets connection. func (f *Filter) SendEnqueuedSubscriptionData() { - if f.isSending { - return - } - - f.mutex.Lock() - defer f.mutex.Unlock() - f.isSending = true for { - d, err := f.wsDataQueue.Pop() - if err == state.ErrQueueEmpty { - break - } else if err != nil { - log.Errorf("failed to pop subscription data from queue to be sent via web sockets to filter %v, %s", f.ID, err.Error()) - break + // wait for a signal that a new item was + // added to the queue + log.Debugf("waiting subscription data signal") + f.wsQueueSignal.L.Lock() + f.wsQueueSignal.Wait() + f.wsQueueSignal.L.Unlock() + log.Debugf("subscription data signal received, sending enqueued data") + for { + d, err := f.wsQueue.Pop() + if err == state.ErrQueueEmpty { + break + } else if err != nil { + log.Errorf("failed to pop subscription data from queue to be sent via web sockets to filter %v, %s", f.ID, err.Error()) + break + } + f.sendSubscriptionResponse(d) } - f.sendSubscriptionResponse(d) } - f.isSending = false } // sendSubscriptionResponse send data as subscription response via @@ -87,7 +87,7 @@ func (f *Filter) sendSubscriptionResponse(data []byte) { return } - err = f.WsConn.Load().WriteMessage(websocket.TextMessage, message) + err = f.WsConn.WriteMessage(websocket.TextMessage, message) if err != nil { log.Errorf(fmt.Sprintf(errMessage, f.ID, err.Error())) return diff --git a/jsonrpc/server.go b/jsonrpc/server.go index df50fd0b4b..99d27e8718 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -10,7 +10,6 @@ import ( "net" "net/http" "sync" - "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/jsonrpc/metrics" @@ -46,7 +45,7 @@ type Server struct { wsSrv *http.Server wsUpgrader websocket.Upgrader - connCounterMutex sync.Mutex + connCounterMutex *sync.Mutex httpConnCounter int64 wsConnCounter int64 } @@ -77,9 +76,10 @@ func NewServer( } srv := &Server{ - config: cfg, - handler: handler, - chainID: chainID, + config: cfg, + handler: handler, + chainID: chainID, + connCounterMutex: &sync.Mutex{}, } return srv } @@ -347,12 +347,11 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { return } - wsConn := new(atomic.Pointer[websocket.Conn]) - wsConn.Store(innerWsConn) + wsConn := newConcurrentWsConn(innerWsConn) // Defer WS closure - defer func(wsConn *atomic.Pointer[websocket.Conn]) { - err = wsConn.Load().Close() + defer func(wsConn *concurrentWsConn) { + err = wsConn.Close() if err != nil { log.Error(fmt.Sprintf("Unable to gracefully close WS connection, %s", err.Error())) } @@ -369,7 +368,7 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { }() log.Info("Websocket connection established") for { - msgType, message, err := wsConn.Load().ReadMessage() + msgType, message, err := wsConn.ReadMessage() if err != nil { if websocket.IsCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure, websocket.CloseAbnormalClosure) { log.Info("Closing WS connection gracefully") @@ -387,9 +386,9 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { resp, err := s.handler.HandleWs(message, wsConn, req) if err != nil { log.Error(fmt.Sprintf("Unable to handle WS request, %s", err.Error())) - _ = wsConn.Load().WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error()))) + _ = wsConn.WriteMessage(msgType, []byte(fmt.Sprintf("WS Handle error: %s", err.Error()))) } else { - _ = wsConn.Load().WriteMessage(msgType, resp) + _ = wsConn.WriteMessage(msgType, resp) } } } @@ -397,37 +396,35 @@ func (s *Server) handleWs(w http.ResponseWriter, req *http.Request) { func (s *Server) increaseHttpConnCounter() { s.connCounterMutex.Lock() - atomic.AddInt64(&s.httpConnCounter, 1) + s.httpConnCounter++ s.logConnCounters() s.connCounterMutex.Unlock() } func (s *Server) decreaseHttpConnCounter() { s.connCounterMutex.Lock() - atomic.AddInt64(&s.httpConnCounter, -1) + s.httpConnCounter-- s.logConnCounters() s.connCounterMutex.Unlock() } func (s *Server) increaseWsConnCounter() { s.connCounterMutex.Lock() - atomic.AddInt64(&s.wsConnCounter, 1) + s.wsConnCounter++ s.logConnCounters() s.connCounterMutex.Unlock() } func (s *Server) decreaseWsConnCounter() { s.connCounterMutex.Lock() - atomic.AddInt64(&s.wsConnCounter, -1) + s.wsConnCounter-- s.logConnCounters() s.connCounterMutex.Unlock() } func (s *Server) logConnCounters() { - httpConnCounter := atomic.LoadInt64(&s.httpConnCounter) - wsConnCounter := atomic.LoadInt64(&s.wsConnCounter) - totalConnCounter := httpConnCounter + wsConnCounter - log.Debugf("[ HTTP conns: %v | WS conns: %v | Total conns: %v ]", httpConnCounter, wsConnCounter, totalConnCounter) + totalConnCounter := s.httpConnCounter + s.wsConnCounter + log.Infof("[ HTTP conns: %v | WS conns: %v | Total conns: %v ]", s.httpConnCounter, s.wsConnCounter, totalConnCounter) } func handleError(w http.ResponseWriter, err error) { diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index 8838c6fe08..fda57d1e6f 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -4,12 +4,11 @@ import ( "errors" "fmt" "sync" - "sync/atomic" "time" "github.com/0xPolygonHermez/zkevm-node/hex" + "github.com/0xPolygonHermez/zkevm-node/state" "github.com/google/uuid" - "github.com/gorilla/websocket" ) // ErrNotFound represent a not found error. @@ -32,7 +31,7 @@ func NewStorage() *Storage { } // NewLogFilter persists a new log filter -func (s *Storage) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error) { +func (s *Storage) NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error) { shouldFilterByBlockHash := filter.BlockHash != nil shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil @@ -44,29 +43,35 @@ func (s *Storage) NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter Lo } // NewBlockFilter persists a new block log filter -func (s *Storage) NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { +func (s *Storage) NewBlockFilter(wsConn *concurrentWsConn) (string, error) { return s.createFilter(FilterTypeBlock, nil, wsConn) } // NewPendingTransactionFilter persists a new pending transaction filter -func (s *Storage) NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error) { +func (s *Storage) NewPendingTransactionFilter(wsConn *concurrentWsConn) (string, error) { return s.createFilter(FilterTypePendingTx, nil, wsConn) } // create persists the filter to the memory and provides the filter id -func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *atomic.Pointer[websocket.Conn]) (string, error) { +func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *concurrentWsConn) (string, error) { lastPoll := time.Now().UTC() id, err := s.generateFilterID() if err != nil { return "", fmt.Errorf("failed to generate filter ID: %w", err) } - s.filters.Store(id, &Filter{ - ID: id, - Type: t, - Parameters: parameters, - LastPoll: lastPoll, - WsConn: wsConn, - }) + f := &Filter{ + ID: id, + Type: t, + Parameters: parameters, + LastPoll: lastPoll, + WsConn: wsConn, + wsQueue: state.NewQueue[[]byte](), + wsQueueSignal: sync.NewCond(&sync.Mutex{}), + } + + go state.InfiniteSafeRun(f.SendEnqueuedSubscriptionData, fmt.Sprintf("failed to send enqueued subscription data to filter %v", id), time.Second) + + s.filters.Store(id, f) return id, nil } @@ -155,7 +160,7 @@ func (s *Storage) UninstallFilter(filterID string) error { } // UninstallFilterByWSConn deletes all filters connected to the provided web socket connection -func (s *Storage) UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error { +func (s *Storage) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { filterIDsToDelete := []string{} s.filters.Range(func(key, value any) bool { id := key.(string) diff --git a/jsonrpc/wsconn.go b/jsonrpc/wsconn.go new file mode 100644 index 0000000000..3f26257026 --- /dev/null +++ b/jsonrpc/wsconn.go @@ -0,0 +1,41 @@ +package jsonrpc + +import ( + "sync" + + "github.com/gorilla/websocket" +) + +// concurrentWsConn is a wrapped web socket connection +// that provide methods to deal with concurrency +type concurrentWsConn struct { + wsConn *websocket.Conn + mutex *sync.Mutex +} + +// NewConcurrentWsConn creates a new instance of concurrentWsConn +func newConcurrentWsConn(wsConn *websocket.Conn) *concurrentWsConn { + return &concurrentWsConn{ + wsConn: wsConn, + mutex: &sync.Mutex{}, + } +} + +// ReadMessage reads a message from the inner web socket connection +func (c *concurrentWsConn) ReadMessage() (messageType int, p []byte, err error) { + return c.wsConn.ReadMessage() +} + +// WriteMessage writes a message to the inner web socket connection +func (c *concurrentWsConn) WriteMessage(messageType int, data []byte) error { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.wsConn.WriteMessage(messageType, data) +} + +// Close closes the inner web socket connection +func (c *concurrentWsConn) Close() error { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.wsConn.Close() +} diff --git a/state/infinite.go b/state/infinite.go new file mode 100644 index 0000000000..0f170fe53b --- /dev/null +++ b/state/infinite.go @@ -0,0 +1,27 @@ +package state + +import ( + "time" + + "github.com/0xPolygonHermez/zkevm-node/log" +) + +// InfiniteSafeRun executes a function and in case it fails, +// runs the function again infinitely +func InfiniteSafeRun(fn func(), errorMessage string, restartInterval time.Duration) { + for { + SafeRun(fn, errorMessage) + time.Sleep(restartInterval) + } +} + +// SafeRun executes a function with a deferred recover +// to avoid to panic. +func SafeRun(fn func(), errorMessage string) { + defer func() { + if r := recover(); r != nil { + log.Errorf(errorMessage, r) + } + }() + fn() +} diff --git a/state/l2block.go b/state/l2block.go index 0979504817..f76eed208a 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -3,7 +3,6 @@ package state import ( "context" "errors" - "math/big" "sync" "time" @@ -29,15 +28,8 @@ type NewL2BlockEvent struct { // filter subscription but can be used by any other component that // needs to react to a new L2 block added to the state. func (s *State) StartToMonitorNewL2Blocks() { - lastL2Block, err := s.GetLastL2Block(context.Background(), nil) - if errors.Is(err, ErrStateNotSynchronized) { - lastL2Block = types.NewBlockWithHeader(&types.Header{Number: big.NewInt(0)}) - } else if err != nil { - log.Fatalf("failed to load the last l2 block: %v", err) - } - s.lastL2BlockSeen.Store(lastL2Block) - go s.monitorNewL2Blocks() - go s.handleEvents() + go InfiniteSafeRun(s.monitorNewL2Blocks, "fail to monitor new l2 blocks: %v:", time.Second) + go InfiniteSafeRun(s.handleEvents, "fail to handle events: %v", time.Second) } // RegisterNewL2BlockEventHandler add the provided handler to the list of handlers @@ -52,13 +44,21 @@ func (s *State) monitorNewL2Blocks() { time.Sleep(newL2BlocksCheckInterval) } + lastL2BlockNumber, err := s.GetLastL2BlockNumber(context.Background(), nil) + if errors.Is(err, ErrStateNotSynchronized) { + lastL2BlockNumber = 0 + } else if err != nil { + log.Fatalf("failed to load the last l2 block: %v", err) + } + lastL2BlockNumberSeen := lastL2BlockNumber + for { if len(s.newL2BlockEventHandlers) == 0 { waitNextCycle() continue } - lastL2Block, err := s.GetLastL2Block(context.Background(), nil) + lastL2BlockNumber, err := s.GetLastL2BlockNumber(context.Background(), nil) if errors.Is(err, ErrStateNotSynchronized) { waitNextCycle() continue @@ -68,16 +68,14 @@ func (s *State) monitorNewL2Blocks() { continue } - lastL2BlockSeen := s.lastL2BlockSeen.Load() - // not updates until now - if lastL2Block == nil || lastL2BlockSeen.NumberU64() >= lastL2Block.NumberU64() { + if lastL2BlockNumber == 0 || lastL2BlockNumberSeen >= lastL2BlockNumber { waitNextCycle() continue } - fromBlockNumber := lastL2BlockSeen.NumberU64() + uint64(1) - toBlockNumber := lastL2Block.NumberU64() + fromBlockNumber := lastL2BlockNumberSeen + uint64(1) + toBlockNumber := lastL2BlockNumber log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) for bn := fromBlockNumber; bn <= toBlockNumber; bn++ { @@ -91,7 +89,7 @@ func (s *State) monitorNewL2Blocks() { s.newL2BlockEvents <- NewL2BlockEvent{ Block: *block, } - s.lastL2BlockSeen.Store(block) + lastL2BlockNumberSeen = block.NumberU64() log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) } diff --git a/state/queue.go b/state/queue.go index b91f11d4c7..7e78254bfd 100644 --- a/state/queue.go +++ b/state/queue.go @@ -12,13 +12,14 @@ var ErrQueueEmpty = fmt.Errorf("queue is empty") // Queue is a generic queue implementation that implements FIFO type Queue[T any] struct { items []T - mutex sync.Mutex + mutex *sync.Mutex } // NewQueue creates a new instance of queue and initializes it func NewQueue[T any]() *Queue[T] { return &Queue[T]{ items: make([]T, 0), + mutex: &sync.Mutex{}, } } diff --git a/state/state.go b/state/state.go index 47177459a1..46213f30c0 100644 --- a/state/state.go +++ b/state/state.go @@ -4,14 +4,12 @@ import ( "context" "math/big" "sync" - "sync/atomic" "github.com/0xPolygonHermez/zkevm-node/event" "github.com/0xPolygonHermez/zkevm-node/merkletree" "github.com/0xPolygonHermez/zkevm-node/state/metrics" "github.com/0xPolygonHermez/zkevm-node/state/runtime/executor" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/jackc/pgx/v4" "google.golang.org/protobuf/types/known/emptypb" ) @@ -33,7 +31,6 @@ type State struct { tree *merkletree.StateTree eventLog *event.EventLog - lastL2BlockSeen atomic.Pointer[types.Block] newL2BlockEvents chan NewL2BlockEvent newL2BlockEventHandlers []NewL2BlockEventHandler }