From f9ae4553046560ead0debdcfe0d2678eebf9e00d Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 15 Nov 2023 15:41:31 -0300 Subject: [PATCH 1/9] refactoring RPC filter storage --- db/migrations/state/0011_test.go | 73 ------------------ db/migrations/state/util_test.go | 117 +++++++++++++++++++++++++++++ jsonrpc/storage.go | 124 ++++++++++++++++++++----------- 3 files changed, 197 insertions(+), 117 deletions(-) delete mode 100644 db/migrations/state/0011_test.go create mode 100644 db/migrations/state/util_test.go diff --git a/db/migrations/state/0011_test.go b/db/migrations/state/0011_test.go deleted file mode 100644 index 3c245e7d31..0000000000 --- a/db/migrations/state/0011_test.go +++ /dev/null @@ -1,73 +0,0 @@ -package migrations_test - -import ( - "database/sql" - "testing" - - "github.com/stretchr/testify/assert" -) - -// this migration changes length of the token name -type migrationTest0011 struct{} - -func (m migrationTest0011) InsertData(db *sql.DB) error { - return nil -} - -func (m migrationTest0011) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) { - indexes := []string{ - "l2block_created_at_idx", - "log_log_index_idx", - "log_topic0_idx", - "log_topic1_idx", - "log_topic2_idx", - "log_topic3_idx", - } - // Check indexes adding - for _, idx := range indexes { - // getIndex - const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` - row := db.QueryRow(getIndex, idx) - var result int - assert.NoError(t, row.Scan(&result)) - assert.Equal(t, 1, result) - } - - // Check column egp_log exists in state.transactions table - const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'` - row := db.QueryRow(getFinalDeviationColumn) - var result int - assert.NoError(t, row.Scan(&result)) - assert.Equal(t, 1, result) -} - -func (m migrationTest0011) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) { - indexes := []string{ - "l2block_created_at_idx", - "log_log_index_idx", - "log_topic0_idx", - "log_topic1_idx", - "log_topic2_idx", - "log_topic3_idx", - } - // Check indexes removing - for _, idx := range indexes { - // getIndex - const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` - row := db.QueryRow(getIndex, idx) - var result int - assert.NoError(t, row.Scan(&result)) - assert.Equal(t, 0, result) - } - - // Check column egp_log doesn't exists in state.transactions table - const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'` - row := db.QueryRow(getFinalDeviationColumn) - var result int - assert.NoError(t, row.Scan(&result)) - assert.Equal(t, 0, result) -} - -func TestMigration0011(t *testing.T) { - runMigrationTest(t, 11, migrationTest0011{}) -} diff --git a/db/migrations/state/util_test.go b/db/migrations/state/util_test.go new file mode 100644 index 0000000000..022ff869cc --- /dev/null +++ b/db/migrations/state/util_test.go @@ -0,0 +1,117 @@ +package migrations_test + +import ( + "database/sql" + "fmt" + "testing" + + "github.com/0xPolygonHermez/zkevm-node/db" + "github.com/0xPolygonHermez/zkevm-node/log" + "github.com/0xPolygonHermez/zkevm-node/test/dbutils" + "github.com/gobuffalo/packr/v2" + "github.com/jackc/pgx/v4" + "github.com/jackc/pgx/v4/stdlib" + migrate "github.com/rubenv/sql-migrate" + "github.com/stretchr/testify/require" +) + +/* + Considerations tricks and tips for migration file testing: + - Functionality of the DB is tested by the rest of the packages, migration tests only have to check persistence across migrations (both UP and DOWN) + - It's recommended to use real data (from testnet/mainnet), but modifying NULL fields to check that those are migrated properly + - It's recommended to use some SQL tool (such as DBeaver) that generates insert queries from existing rows + - Any new migration file could be tested using the existing `migrationTester` interface. Check `0002_test.go` for an example +*/ + +func init() { + log.Init(log.Config{ + Level: "debug", + Outputs: []string{"stderr"}, + }) +} + +type migrationTester interface { + // InsertData used to insert data in the affected tables of the migration that is being tested + // data will be inserted with the schema as it was previous the migration that is being tested + InsertData(*sql.DB) error + // RunAssertsAfterMigrationUp this function will be called after running the migration is being tested + // and should assert that the data inserted in the function InsertData is persisted properly + RunAssertsAfterMigrationUp(*testing.T, *sql.DB) + // RunAssertsAfterMigrationDown this function will be called after reverting the migration that is being tested + // and should assert that the data inserted in the function InsertData is persisted properly + RunAssertsAfterMigrationDown(*testing.T, *sql.DB) +} + +var ( + stateDBCfg = dbutils.NewStateConfigFromEnv() + packrMigrations = map[string]*packr.Box{ + db.StateMigrationName: packr.New(db.StateMigrationName, "./migrations/state"), + db.PoolMigrationName: packr.New(db.PoolMigrationName, "./migrations/pool"), + } +) + +func runMigrationTest(t *testing.T, migrationNumber int, miter migrationTester) { + // Initialize an empty DB + d, err := initCleanSQLDB() + require.NoError(t, err) + require.NoError(t, runMigrationsDown(d, 0, db.StateMigrationName)) + // Run migrations until migration to test + require.NoError(t, runMigrationsUp(d, migrationNumber-1, db.StateMigrationName)) + // Insert data into table(s) affected by migration + require.NoError(t, miter.InsertData(d)) + // Run migration that is being tested + require.NoError(t, runMigrationsUp(d, 1, db.StateMigrationName)) + // Check that data is persisted properly after migration up + miter.RunAssertsAfterMigrationUp(t, d) + // Revert migration to test + require.NoError(t, runMigrationsDown(d, 1, db.StateMigrationName)) + // Check that data is persisted properly after migration down + miter.RunAssertsAfterMigrationDown(t, d) +} + +func initCleanSQLDB() (*sql.DB, error) { + // run migrations + if err := db.RunMigrationsDown(stateDBCfg, db.StateMigrationName); err != nil { + return nil, err + } + c, err := pgx.ParseConfig(fmt.Sprintf("postgres://%s:%s@%s:%s/%s", stateDBCfg.User, stateDBCfg.Password, stateDBCfg.Host, stateDBCfg.Port, stateDBCfg.Name)) + if err != nil { + return nil, err + } + sqlDB := stdlib.OpenDB(*c) + return sqlDB, nil +} + +func runMigrationsUp(d *sql.DB, n int, packrName string) error { + box, ok := packrMigrations[packrName] + if !ok { + return fmt.Errorf("packr box not found with name: %v", packrName) + } + + var migrations = &migrate.PackrMigrationSource{Box: box} + nMigrations, err := migrate.ExecMax(d, "postgres", migrations, migrate.Up, n) + if err != nil { + return err + } + if nMigrations != n { + return fmt.Errorf("Unexpected amount of migrations: expected: %d, actual: %d", n, nMigrations) + } + return nil +} + +func runMigrationsDown(d *sql.DB, n int, packrName string) error { + box, ok := packrMigrations[packrName] + if !ok { + return fmt.Errorf("packr box not found with name: %v", packrName) + } + + var migrations = &migrate.PackrMigrationSource{Box: box} + nMigrations, err := migrate.ExecMax(d, "postgres", migrations, migrate.Down, n) + if err != nil { + return err + } + if nMigrations != n { + return fmt.Errorf("Unexpected amount of migrations: expected: %d, actual: %d", n, nMigrations) + } + return nil +} diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index a1381bc619..2e644c3cb6 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -20,13 +20,23 @@ var ErrFilterInvalidPayload = errors.New("invalid argument 0: cannot specify bot // Storage uses memory to store the data // related to the json rpc server type Storage struct { - filters sync.Map + allFilters map[string]*Filter + allFiltersWithWSConn map[*concurrentWsConn]*Filter + blockFiltersWithWSConn map[string]*Filter + logFiltersWithWSConn map[string]*Filter + pendingTxFiltersWithWSConn map[string]*Filter + mutex *sync.Mutex } // NewStorage creates and initializes an instance of Storage func NewStorage() *Storage { return &Storage{ - filters: sync.Map{}, + allFilters: make(map[string]*Filter), + allFiltersWithWSConn: make(map[*concurrentWsConn]*Filter), + blockFiltersWithWSConn: make(map[string]*Filter), + logFiltersWithWSConn: make(map[string]*Filter), + pendingTxFiltersWithWSConn: make(map[string]*Filter), + mutex: &sync.Mutex{}, } } @@ -56,6 +66,10 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *con if err != nil { return "", fmt.Errorf("failed to generate filter ID: %w", err) } + + s.mutex.Lock() + defer s.mutex.Unlock() + f := &Filter{ ID: id, Type: t, @@ -68,8 +82,17 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *con go state.InfiniteSafeRun(f.SendEnqueuedSubscriptionData, fmt.Sprintf("failed to send enqueued subscription data to filter %v", id), time.Second) - s.filters.Store(id, f) - + s.allFilters[id] = f + if f.WsConn != nil { + s.allFiltersWithWSConn[f.WsConn] = f + if t == FilterTypeBlock { + s.blockFiltersWithWSConn[id] = f + } else if t == FilterTypeLog { + s.logFiltersWithWSConn[id] = f + } else if t == FilterTypePendingTx { + s.pendingTxFiltersWithWSConn[id] = f + } + } return id, nil } @@ -91,86 +114,99 @@ func (s *Storage) generateFilterID() (string, error) { // GetAllBlockFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new blocks func (s *Storage) GetAllBlockFiltersWithWSConn() ([]*Filter, error) { - filtersWithWSConn := []*Filter{} - s.filters.Range(func(key, value any) bool { - filter := value.(*Filter) - if filter.WsConn == nil || filter.Type != FilterTypeBlock { - return true - } + s.mutex.Lock() + defer s.mutex.Unlock() + filters := []*Filter{} + for _, filter := range s.blockFiltersWithWSConn { f := filter - filtersWithWSConn = append(filtersWithWSConn, f) - return true - }) - - return filtersWithWSConn, nil + filters = append(filters, f) + } + return filters, nil } // GetAllLogFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new logs func (s *Storage) GetAllLogFiltersWithWSConn() ([]*Filter, error) { - filtersWithWSConn := []*Filter{} - s.filters.Range(func(key, value any) bool { - filter := value.(*Filter) - if filter.WsConn == nil || filter.Type != FilterTypeLog { - return true - } + s.mutex.Lock() + defer s.mutex.Unlock() + filters := []*Filter{} + for _, filter := range s.logFiltersWithWSConn { f := filter - filtersWithWSConn = append(filtersWithWSConn, f) - return true - }) - - return filtersWithWSConn, nil + filters = append(filters, f) + } + return filters, nil } // GetFilter gets a filter by its id func (s *Storage) GetFilter(filterID string) (*Filter, error) { - filter, found := s.filters.Load(filterID) + s.mutex.Lock() + defer s.mutex.Unlock() + + filter, found := s.allFilters[filterID] if !found { return nil, ErrNotFound } - return filter.(*Filter), nil + return filter, nil } // UpdateFilterLastPoll updates the last poll to now func (s *Storage) UpdateFilterLastPoll(filterID string) error { - filterValue, found := s.filters.Load(filterID) + s.mutex.Lock() + defer s.mutex.Unlock() + + filter, found := s.allFilters[filterID] if !found { return ErrNotFound } - filter := filterValue.(*Filter) filter.LastPoll = time.Now().UTC() - s.filters.Store(filterID, filter) + s.allFilters[filterID] = filter return nil } // UninstallFilter deletes a filter by its id func (s *Storage) UninstallFilter(filterID string) error { - _, found := s.filters.Load(filterID) + s.mutex.Lock() + defer s.mutex.Unlock() + + filter, found := s.allFilters[filterID] if !found { return ErrNotFound } - s.filters.Delete(filterID) + + s.deleteFilter(filter) return nil } // UninstallFilterByWSConn deletes all filters connected to the provided web socket connection func (s *Storage) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { - filterIDsToDelete := []string{} - s.filters.Range(func(key, value any) bool { - id := key.(string) - filter := value.(*Filter) - if filter.WsConn == wsConn { - filterIDsToDelete = append(filterIDsToDelete, id) - } - return true - }) + s.mutex.Lock() + defer s.mutex.Unlock() - for _, filterID := range filterIDsToDelete { - s.filters.Delete(filterID) + filter, found := s.allFiltersWithWSConn[wsConn] + if !found { + return ErrNotFound } + s.deleteFilter(filter) return nil } + +// deleteFilter deletes a filter from all the maps +func (s *Storage) deleteFilter(filter *Filter) { + if filter.Type == FilterTypeBlock { + delete(s.blockFiltersWithWSConn, filter.ID) + } else if filter.Type == FilterTypeLog { + delete(s.logFiltersWithWSConn, filter.ID) + } else if filter.Type == FilterTypePendingTx { + delete(s.pendingTxFiltersWithWSConn, filter.ID) + } + + if filter.WsConn != nil { + delete(s.allFiltersWithWSConn, filter.WsConn) + } + + delete(s.allFilters, filter.ID) +} From 491060f34d85a754569d4d8ada25e4c712cbde57 Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 15 Nov 2023 16:36:40 -0300 Subject: [PATCH 2/9] expose mutex and log filters from storage to allow direct access avoiding the copy --- jsonrpc/endpoints_eth.go | 181 ++++++++++++++++++--------------------- jsonrpc/interfaces.go | 6 +- jsonrpc/mock_storage.go | 54 +++++------- jsonrpc/storage.go | 34 +++----- 4 files changed, 124 insertions(+), 151 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 3835c33a89..72bc0f8398 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1071,23 +1071,22 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) { defer wg.Done() start := time.Now() - blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn() + + b, err := types.NewBlock(&event.Block, nil, false, false) if err != nil { - log.Errorf("failed to get all block filters with web sockets connections: %v", err) - } else { - b, err := types.NewBlock(&event.Block, nil, false, false) - if err != nil { - log.Errorf("failed to build block response to subscription: %v", err) - return - } - data, err := json.Marshal(b) - if err != nil { - log.Errorf("failed to marshal block response to subscription: %v", err) - return - } - for _, filter := range blockFilters { - filter.EnqueueSubscriptionDataToBeSent(data) - } + log.Errorf("failed to build block response to subscription: %v", err) + return + } + data, err := json.Marshal(b) + if err != nil { + log.Errorf("failed to marshal block response to subscription: %v", err) + return + } + e.storage.Lock() + defer e.storage.Unlock() + filters := e.storage.GetAllBlockFiltersWithWSConn() + for _, filter := range filters { + filter.EnqueueSubscriptionDataToBeSent(data) } log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) } @@ -1095,96 +1094,86 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) { defer wg.Done() start := time.Now() - logFilters, err := e.storage.GetAllLogFiltersWithWSConn() - if err != nil { - log.Errorf("failed to get all log filters with web sockets connections: %v", err) - } else { - for _, filter := range logFilters { - filterParameters := filter.Parameters.(LogFilter) - bn := types.BlockNumber(event.Block.NumberU64()) - if filterParameters.BlockHash != nil { - // if the filter block hash is set, we check if the block is the - // one with the expected hash, otherwise we ignore the filter - bh := *filterParameters.BlockHash - if bh.String() != event.Block.Hash().String() { + e.storage.Lock() + defer e.storage.Unlock() + filters := e.storage.GetAllLogFiltersWithWSConn() + for _, filter := range filters { + filterParameters := filter.Parameters.(LogFilter) + bn := types.BlockNumber(event.Block.NumberU64()) + + if filterParameters.BlockHash != nil { + // if the filter block hash is set, we check if the block is the + // one with the expected hash, otherwise we ignore the filter + bh := *filterParameters.BlockHash + if bh.String() != event.Block.Hash().String() { + continue + } + } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { + // in case the block hash is nil and also from and to blocks are nil, set it + // to the current block to make the query faster + filterParameters.FromBlock = &bn + filterParameters.ToBlock = &bn + } else { + // if the filter has a fromBlock value set + // and the event block number is smaller than the + // from block, skip this filter + if filterParameters.FromBlock != nil { + fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", filter.ID, rpcErr) continue } - } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { - // in case the block hash is nil and also from and to blocks are nil, set it - // to the current block to make the query faster - filterParameters.FromBlock = &bn - filterParameters.ToBlock = &bn - } else { - // if the filter has a fromBlock value set - // and the event block number is smaller than the - // from block, skip this filter - if filterParameters.FromBlock != nil { - fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf(rpcErr.Error(), filter.ID, err) - continue - } - // if the block number is smaller than the fromBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() < fromBlock { - continue - } - // otherwise set the from block to a fixed number - // to avoid querying it again in the next step - fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.FromBlock = &fixedFromBlock - } - - // if the filter has a toBlock value set - // and the event block number is greater than the - // to block, skip this filter - if filterParameters.ToBlock != nil { - toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf(rpcErr.Error(), filter.ID, err) - continue - } - // if the block number is greater than the toBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() > toBlock { - continue - } - // otherwise set the to block to a fixed number - // to avoid querying it again in the next step - fixedToBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.ToBlock = &fixedToBlock + // if the block number is smaller than the fromBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() < fromBlock { + continue } + // otherwise set the from block to a fixed number + // to avoid querying it again in the next step + fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) + filterParameters.FromBlock = &fixedFromBlock } - // get new logs for this specific filter - changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) - if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) { - log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err) - err := e.storage.UninstallFilter(filter.ID) - if !errors.Is(err, ErrNotFound) && err != nil { - log.Errorf("failed to automatically uninstall filter %v: %v", filter.ID, err) - } else { - log.Infof("Filter %v automatically uninstalled", filter.ID) + // if the filter has a toBlock value set + // and the event block number is greater than the + // to block, skip this filter + if filterParameters.ToBlock != nil { + toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", filter.ID, rpcErr) + continue } - continue - } else if err != nil { - log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err) - continue + // if the block number is greater than the toBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() > toBlock { + continue + } + // otherwise set the to block to a fixed number + // to avoid querying it again in the next step + fixedToBlock := types.BlockNumber(event.Block.NumberU64()) + filterParameters.ToBlock = &fixedToBlock } + } - // if there are new logs for the filter, send it - if changes != nil { - ethLogs := changes.([]types.Log) - for _, ethLog := range ethLogs { - data, err := json.Marshal(ethLog) - if err != nil { - log.Errorf("failed to marshal ethLog response to subscription: %v", err) - } - filter.EnqueueSubscriptionDataToBeSent(data) + // get new logs for this specific filter + changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) + if err != nil { + log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err) + continue + } + + // if there are new logs for the filter, send it + if changes != nil { + ethLogs := changes.([]types.Log) + for _, ethLog := range ethLogs { + data, err := json.Marshal(ethLog) + if err != nil { + log.Errorf("failed to marshal ethLog response to subscription: %v", err) } + filter.EnqueueSubscriptionDataToBeSent(data) } } } diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index fd6de2538f..36344ff88c 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -2,8 +2,10 @@ package jsonrpc // storageInterface json rpc internal storage to persist data type storageInterface interface { - GetAllBlockFiltersWithWSConn() ([]*Filter, error) - GetAllLogFiltersWithWSConn() ([]*Filter, error) + Lock() + Unlock() + GetAllBlockFiltersWithWSConn() map[string]*Filter + GetAllLogFiltersWithWSConn() map[string]*Filter GetFilter(filterID string) (*Filter, error) NewBlockFilter(wsConn *concurrentWsConn) (string, error) NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error) diff --git a/jsonrpc/mock_storage.go b/jsonrpc/mock_storage.go index 4ee466de93..f9d7e404cf 100644 --- a/jsonrpc/mock_storage.go +++ b/jsonrpc/mock_storage.go @@ -2,9 +2,7 @@ package jsonrpc -import ( - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // storageMock is an autogenerated mock type for the storageInterface type type storageMock struct { @@ -12,55 +10,35 @@ type storageMock struct { } // GetAllBlockFiltersWithWSConn provides a mock function with given fields: -func (_m *storageMock) GetAllBlockFiltersWithWSConn() ([]*Filter, error) { +func (_m *storageMock) GetAllBlockFiltersWithWSConn() map[string]*Filter { ret := _m.Called() - var r0 []*Filter - var r1 error - if rf, ok := ret.Get(0).(func() ([]*Filter, error)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() []*Filter); ok { + var r0 map[string]*Filter + if rf, ok := ret.Get(0).(func() map[string]*Filter); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*Filter) + r0 = ret.Get(0).(map[string]*Filter) } } - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } // GetAllLogFiltersWithWSConn provides a mock function with given fields: -func (_m *storageMock) GetAllLogFiltersWithWSConn() ([]*Filter, error) { +func (_m *storageMock) GetAllLogFiltersWithWSConn() map[string]*Filter { ret := _m.Called() - var r0 []*Filter - var r1 error - if rf, ok := ret.Get(0).(func() ([]*Filter, error)); ok { - return rf() - } - if rf, ok := ret.Get(0).(func() []*Filter); ok { + var r0 map[string]*Filter + if rf, ok := ret.Get(0).(func() map[string]*Filter); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*Filter) + r0 = ret.Get(0).(map[string]*Filter) } } - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } // GetFilter provides a mock function with given fields: filterID @@ -89,6 +67,11 @@ func (_m *storageMock) GetFilter(filterID string) (*Filter, error) { return r0, r1 } +// Lock provides a mock function with given fields: +func (_m *storageMock) Lock() { + _m.Called() +} + // NewBlockFilter provides a mock function with given fields: wsConn func (_m *storageMock) NewBlockFilter(wsConn *concurrentWsConn) (string, error) { ret := _m.Called(wsConn) @@ -189,6 +172,11 @@ func (_m *storageMock) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { return r0 } +// Unlock provides a mock function with given fields: +func (_m *storageMock) Unlock() { + _m.Called() +} + // UpdateFilterLastPoll provides a mock function with given fields: filterID func (_m *storageMock) UpdateFilterLastPoll(filterID string) error { ret := _m.Called(filterID) diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index 2e644c3cb6..97984ea184 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -111,32 +111,26 @@ func (s *Storage) generateFilterID() (string, error) { return id, nil } -// GetAllBlockFiltersWithWSConn returns an array with all filter that have -// a web socket connection and are filtering by new blocks -func (s *Storage) GetAllBlockFiltersWithWSConn() ([]*Filter, error) { +// Lock locks the internal mutex +func (s *Storage) Lock() { s.mutex.Lock() - defer s.mutex.Unlock() +} - filters := []*Filter{} - for _, filter := range s.blockFiltersWithWSConn { - f := filter - filters = append(filters, f) - } - return filters, nil +// Unlock unlocks the internal mutex +func (s *Storage) Unlock() { + s.mutex.Unlock() +} + +// GetAllBlockFiltersWithWSConn returns an array with all filter that have +// a web socket connection and are filtering by new blocks +func (s *Storage) GetAllBlockFiltersWithWSConn() map[string]*Filter { + return s.blockFiltersWithWSConn } // GetAllLogFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new logs -func (s *Storage) GetAllLogFiltersWithWSConn() ([]*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() - - filters := []*Filter{} - for _, filter := range s.logFiltersWithWSConn { - f := filter - filters = append(filters, f) - } - return filters, nil +func (s *Storage) GetAllLogFiltersWithWSConn() map[string]*Filter { + return s.logFiltersWithWSConn } // GetFilter gets a filter by its id From 8b3244d7730247607f05981e1011eef5c7d3083e Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 15 Nov 2023 17:31:43 -0300 Subject: [PATCH 3/9] fix filter storage when deleting filters by wsConn --- jsonrpc/endpoints_eth.go | 4 ++-- jsonrpc/query.go | 2 +- jsonrpc/storage.go | 22 ++++++++++++++++------ state/l2block.go | 10 +++++----- state/pgstatestorage.go | 4 ++-- 5 files changed, 26 insertions(+), 16 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 72bc0f8398..8628cf731f 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1088,7 +1088,7 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block for _, filter := range filters { filter.EnqueueSubscriptionDataToBeSent(data) } - log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) + log.Infof("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) } func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) { @@ -1177,5 +1177,5 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE } } } - log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) + log.Infof("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) } diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 02b355fce7..57cf626bcc 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -94,7 +94,7 @@ func (f *Filter) sendSubscriptionResponse(data []byte) { return } log.Debugf("WS message sent: %v", string(message)) - log.Debugf("[SendSubscriptionResponse] took %v", time.Since(start)) + log.Infof("[SendSubscriptionResponse] took %v", time.Since(start)) } // FilterType express the type of the filter, block, logs, pending transactions diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index 97984ea184..52359fe38b 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -21,7 +21,7 @@ var ErrFilterInvalidPayload = errors.New("invalid argument 0: cannot specify bot // related to the json rpc server type Storage struct { allFilters map[string]*Filter - allFiltersWithWSConn map[*concurrentWsConn]*Filter + allFiltersWithWSConn map[*concurrentWsConn]map[string]*Filter blockFiltersWithWSConn map[string]*Filter logFiltersWithWSConn map[string]*Filter pendingTxFiltersWithWSConn map[string]*Filter @@ -32,7 +32,7 @@ type Storage struct { func NewStorage() *Storage { return &Storage{ allFilters: make(map[string]*Filter), - allFiltersWithWSConn: make(map[*concurrentWsConn]*Filter), + allFiltersWithWSConn: make(map[*concurrentWsConn]map[string]*Filter), blockFiltersWithWSConn: make(map[string]*Filter), logFiltersWithWSConn: make(map[string]*Filter), pendingTxFiltersWithWSConn: make(map[string]*Filter), @@ -84,7 +84,11 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *con s.allFilters[id] = f if f.WsConn != nil { - s.allFiltersWithWSConn[f.WsConn] = f + if _, found := s.allFiltersWithWSConn[f.WsConn]; !found { + s.allFiltersWithWSConn[f.WsConn] = make(map[string]*Filter) + } + + s.allFiltersWithWSConn[f.WsConn][id] = f if t == FilterTypeBlock { s.blockFiltersWithWSConn[id] = f } else if t == FilterTypeLog { @@ -179,12 +183,15 @@ func (s *Storage) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { s.mutex.Lock() defer s.mutex.Unlock() - filter, found := s.allFiltersWithWSConn[wsConn] + filters, found := s.allFiltersWithWSConn[wsConn] if !found { return ErrNotFound } - s.deleteFilter(filter) + for _, filter := range filters { + s.deleteFilter(filter) + } + return nil } @@ -199,7 +206,10 @@ func (s *Storage) deleteFilter(filter *Filter) { } if filter.WsConn != nil { - delete(s.allFiltersWithWSConn, filter.WsConn) + delete(s.allFiltersWithWSConn[filter.WsConn], filter.ID) + if len(s.allFiltersWithWSConn[filter.WsConn]) == 0 { + delete(s.allFiltersWithWSConn, filter.WsConn) + } } delete(s.allFilters, filter.ID) diff --git a/state/l2block.go b/state/l2block.go index f76eed208a..8e4be4f917 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -76,7 +76,7 @@ func (s *State) monitorNewL2Blocks() { fromBlockNumber := lastL2BlockNumberSeen + uint64(1) toBlockNumber := lastL2BlockNumber - log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) + log.Infof("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) for bn := fromBlockNumber; bn <= toBlockNumber; bn++ { block, err := s.GetL2BlockByNumber(context.Background(), bn, nil) @@ -90,7 +90,7 @@ func (s *State) monitorNewL2Blocks() { Block: *block, } lastL2BlockNumberSeen = block.NumberU64() - log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) + log.Infof("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) } @@ -101,7 +101,7 @@ func (s *State) monitorNewL2Blocks() { func (s *State) handleEvents() { for newL2BlockEvent := range s.newL2BlockEvents { - log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) + log.Infof("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) if len(s.newL2BlockEventHandlers) == 0 { continue } @@ -116,10 +116,10 @@ func (s *State) handleEvents() { log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r) } }() - log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) + log.Infof("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) start := time.Now() h(e) - log.Debugf("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start)) + log.Infof("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start)) }(handler, newL2BlockEvent) } wg.Wait() diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 9bf866c3c9..ad83b380a6 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -1487,7 +1487,7 @@ func (p *PostgresStorage) GetTransactionEGPLogByHash(ctx context.Context, transa // AddL2Block adds a new L2 block to the State Store func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2Block *types.Block, receipts []*types.Receipt, txsEGPData []StoreTxEGPData, dbTx pgx.Tx) error { - log.Debugf("[AddL2Block] adding l2 block: %v", l2Block.NumberU64()) + log.Infof("[AddL2Block] adding l2 block: %v", l2Block.NumberU64()) start := time.Now() e := p.getExecQuerier(dbTx) @@ -1562,7 +1562,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2 } } } - log.Debugf("[AddL2Block] l2 block %v took %v to be added", l2Block.NumberU64(), time.Since(start)) + log.Infof("[AddL2Block] l2 block %v took %v to be added", l2Block.NumberU64(), time.Since(start)) return nil } From 86d5f5c5c6aea0dd4c24a30c59275cce2bd03c61 Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 15 Nov 2023 17:56:57 -0300 Subject: [PATCH 4/9] fix UninstallFilterByWSConn when the wsConn has no filters installed --- jsonrpc/storage.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index 52359fe38b..33ac4e04af 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -185,7 +185,7 @@ func (s *Storage) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { filters, found := s.allFiltersWithWSConn[wsConn] if !found { - return ErrNotFound + return nil } for _, filter := range filters { From ab14469c05ae98f46e978876f71b1f132e8d16ec Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 15 Nov 2023 19:15:01 -0300 Subject: [PATCH 5/9] more logs; return filter copy; more concurrency --- jsonrpc/endpoints_eth.go | 176 +++++++++++++++++++++++---------------- jsonrpc/interfaces.go | 6 +- jsonrpc/mock_storage.go | 26 ++---- jsonrpc/storage.go | 83 ++++++++++++------ 4 files changed, 169 insertions(+), 122 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 8628cf731f..03e7dc1308 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1056,7 +1056,8 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error { // onNewL2Block is triggered when the state triggers the event for a new l2 block func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { - log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + log.Infof("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + start := time.Now() wg := sync.WaitGroup{} wg.Add(1) @@ -1066,6 +1067,7 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { go e.notifyNewLogs(&wg, event) wg.Wait() + log.Infof("[onNewL2Block] new l2 block %v took %v to send the messages to all ws connections", event.Block.NumberU64(), time.Since(start)) } func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) { @@ -1082,12 +1084,22 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block log.Errorf("failed to marshal block response to subscription: %v", err) return } - e.storage.Lock() - defer e.storage.Unlock() + filters := e.storage.GetAllBlockFiltersWithWSConn() + log.Infof("[notifyNewHeads] took %v to get block filters with ws connections", time.Since(start)) + enqueueWg := sync.WaitGroup{} for _, filter := range filters { - filter.EnqueueSubscriptionDataToBeSent(data) + f := filter + enqueueWg.Add(1) + go func(f *Filter) { + defer enqueueWg.Done() + start := time.Now() + f.EnqueueSubscriptionDataToBeSent(data) + log.Infof("[notifyNewHeads] took %v to enqueue new l2 block messages", time.Since(start)) + }(f) } + enqueueWg.Wait() + log.Infof("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) } @@ -1095,87 +1107,103 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE defer wg.Done() start := time.Now() - e.storage.Lock() - defer e.storage.Unlock() filters := e.storage.GetAllLogFiltersWithWSConn() + log.Infof("[notifyNewLogs] took %v to get log filters with ws connections", time.Since(start)) + enqueueWg := sync.WaitGroup{} for _, filter := range filters { - filterParameters := filter.Parameters.(LogFilter) - bn := types.BlockNumber(event.Block.NumberU64()) - - if filterParameters.BlockHash != nil { - // if the filter block hash is set, we check if the block is the - // one with the expected hash, otherwise we ignore the filter - bh := *filterParameters.BlockHash - if bh.String() != event.Block.Hash().String() { - continue - } - } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { - // in case the block hash is nil and also from and to blocks are nil, set it - // to the current block to make the query faster - filterParameters.FromBlock = &bn - filterParameters.ToBlock = &bn - } else { - // if the filter has a fromBlock value set - // and the event block number is smaller than the - // from block, skip this filter - if filterParameters.FromBlock != nil { - fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", filter.ID, rpcErr) - continue + f := filter + enqueueWg.Add(1) + go func(f *Filter) { + defer enqueueWg.Done() + start := time.Now() + + filterParameters := f.Parameters.(LogFilter) + bn := types.BlockNumber(event.Block.NumberU64()) + + if filterParameters.BlockHash != nil { + // if the filter block hash is set, we check if the block is the + // one with the expected hash, otherwise we ignore the filter + bh := *filterParameters.BlockHash + if bh.String() != event.Block.Hash().String() { + return } - // if the block number is smaller than the fromBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() < fromBlock { - continue + } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { + // in case the block hash is nil and also from and to blocks are nil, set it + // to the current block to make the query faster + filterParameters.FromBlock = &bn + filterParameters.ToBlock = &bn + } else { + // if the filter has a fromBlock value set + // and the event block number is smaller than the + // from block, skip this filter + if filterParameters.FromBlock != nil { + fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", f.ID, rpcErr) + return + } + // if the block number is smaller than the fromBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() < fromBlock { + return + } + // otherwise set the from block to a fixed number + // to avoid querying it again in the next step + fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) + filterParameters.FromBlock = &fixedFromBlock } - // otherwise set the from block to a fixed number - // to avoid querying it again in the next step - fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.FromBlock = &fixedFromBlock - } - // if the filter has a toBlock value set - // and the event block number is greater than the - // to block, skip this filter - if filterParameters.ToBlock != nil { - toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", filter.ID, rpcErr) - continue + // if the filter has a toBlock value set + // and the event block number is greater than the + // to block, skip this filter + if filterParameters.ToBlock != nil { + toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", f.ID, rpcErr) + return + } + // if the block number is greater than the toBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() > toBlock { + return + } + // otherwise set the to block to a fixed number + // to avoid querying it again in the next step + fixedToBlock := types.BlockNumber(event.Block.NumberU64()) + filterParameters.ToBlock = &fixedToBlock } - // if the block number is greater than the toBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() > toBlock { - continue - } - // otherwise set the to block to a fixed number - // to avoid querying it again in the next step - fixedToBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.ToBlock = &fixedToBlock } - } - // get new logs for this specific filter - changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) - if err != nil { - log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err) - continue - } + log.Infof("[notifyNewLogs] took %v to prepare filter parameters", time.Since(start)) + start = time.Now() - // if there are new logs for the filter, send it - if changes != nil { - ethLogs := changes.([]types.Log) - for _, ethLog := range ethLogs { - data, err := json.Marshal(ethLog) - if err != nil { - log.Errorf("failed to marshal ethLog response to subscription: %v", err) + // get new logs for this specific filter + changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) + if err != nil { + log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", f.ID, err) + return + } + + log.Infof("[notifyNewLogs] took %v to run internalGetLogs", time.Since(start)) + start = time.Now() + + // if there are new logs for the filter, send it + if changes != nil { + ethLogs := changes.([]types.Log) + for _, ethLog := range ethLogs { + data, err := json.Marshal(ethLog) + if err != nil { + log.Errorf("failed to marshal ethLog response to subscription: %v", err) + } + f.EnqueueSubscriptionDataToBeSent(data) } - filter.EnqueueSubscriptionDataToBeSent(data) } - } + + log.Infof("[notifyNewLogs] took %v to enqueue log messages", time.Since(start)) + }(f) } + enqueueWg.Wait() log.Infof("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) } diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index 36344ff88c..acfec7205b 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -2,10 +2,8 @@ package jsonrpc // storageInterface json rpc internal storage to persist data type storageInterface interface { - Lock() - Unlock() - GetAllBlockFiltersWithWSConn() map[string]*Filter - GetAllLogFiltersWithWSConn() map[string]*Filter + GetAllBlockFiltersWithWSConn() []*Filter + GetAllLogFiltersWithWSConn() []*Filter GetFilter(filterID string) (*Filter, error) NewBlockFilter(wsConn *concurrentWsConn) (string, error) NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error) diff --git a/jsonrpc/mock_storage.go b/jsonrpc/mock_storage.go index f9d7e404cf..e32d1205b9 100644 --- a/jsonrpc/mock_storage.go +++ b/jsonrpc/mock_storage.go @@ -10,15 +10,15 @@ type storageMock struct { } // GetAllBlockFiltersWithWSConn provides a mock function with given fields: -func (_m *storageMock) GetAllBlockFiltersWithWSConn() map[string]*Filter { +func (_m *storageMock) GetAllBlockFiltersWithWSConn() []*Filter { ret := _m.Called() - var r0 map[string]*Filter - if rf, ok := ret.Get(0).(func() map[string]*Filter); ok { + var r0 []*Filter + if rf, ok := ret.Get(0).(func() []*Filter); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]*Filter) + r0 = ret.Get(0).([]*Filter) } } @@ -26,15 +26,15 @@ func (_m *storageMock) GetAllBlockFiltersWithWSConn() map[string]*Filter { } // GetAllLogFiltersWithWSConn provides a mock function with given fields: -func (_m *storageMock) GetAllLogFiltersWithWSConn() map[string]*Filter { +func (_m *storageMock) GetAllLogFiltersWithWSConn() []*Filter { ret := _m.Called() - var r0 map[string]*Filter - if rf, ok := ret.Get(0).(func() map[string]*Filter); ok { + var r0 []*Filter + if rf, ok := ret.Get(0).(func() []*Filter); ok { r0 = rf() } else { if ret.Get(0) != nil { - r0 = ret.Get(0).(map[string]*Filter) + r0 = ret.Get(0).([]*Filter) } } @@ -67,11 +67,6 @@ func (_m *storageMock) GetFilter(filterID string) (*Filter, error) { return r0, r1 } -// Lock provides a mock function with given fields: -func (_m *storageMock) Lock() { - _m.Called() -} - // NewBlockFilter provides a mock function with given fields: wsConn func (_m *storageMock) NewBlockFilter(wsConn *concurrentWsConn) (string, error) { ret := _m.Called(wsConn) @@ -172,11 +167,6 @@ func (_m *storageMock) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { return r0 } -// Unlock provides a mock function with given fields: -func (_m *storageMock) Unlock() { - _m.Called() -} - // UpdateFilterLastPoll provides a mock function with given fields: filterID func (_m *storageMock) UpdateFilterLastPoll(filterID string) error { ret := _m.Called(filterID) diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index 33ac4e04af..c9f0dc1619 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -25,7 +25,10 @@ type Storage struct { blockFiltersWithWSConn map[string]*Filter logFiltersWithWSConn map[string]*Filter pendingTxFiltersWithWSConn map[string]*Filter - mutex *sync.Mutex + + blockMutex *sync.Mutex + logMutex *sync.Mutex + pendingTxMutex *sync.Mutex } // NewStorage creates and initializes an instance of Storage @@ -36,7 +39,9 @@ func NewStorage() *Storage { blockFiltersWithWSConn: make(map[string]*Filter), logFiltersWithWSConn: make(map[string]*Filter), pendingTxFiltersWithWSConn: make(map[string]*Filter), - mutex: &sync.Mutex{}, + blockMutex: &sync.Mutex{}, + logMutex: &sync.Mutex{}, + pendingTxMutex: &sync.Mutex{}, } } @@ -67,8 +72,12 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *con return "", fmt.Errorf("failed to generate filter ID: %w", err) } - s.mutex.Lock() - defer s.mutex.Unlock() + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() f := &Filter{ ID: id, @@ -115,32 +124,42 @@ func (s *Storage) generateFilterID() (string, error) { return id, nil } -// Lock locks the internal mutex -func (s *Storage) Lock() { - s.mutex.Lock() -} - -// Unlock unlocks the internal mutex -func (s *Storage) Unlock() { - s.mutex.Unlock() -} - // GetAllBlockFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new blocks -func (s *Storage) GetAllBlockFiltersWithWSConn() map[string]*Filter { - return s.blockFiltersWithWSConn +func (s *Storage) GetAllBlockFiltersWithWSConn() []*Filter { + s.blockMutex.Lock() + defer s.blockMutex.Unlock() + + filters := []*Filter{} + for _, filter := range s.blockFiltersWithWSConn { + f := filter + filters = append(filters, f) + } + return filters } // GetAllLogFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new logs -func (s *Storage) GetAllLogFiltersWithWSConn() map[string]*Filter { - return s.logFiltersWithWSConn +func (s *Storage) GetAllLogFiltersWithWSConn() []*Filter { + s.logMutex.Lock() + defer s.logMutex.Unlock() + + filters := []*Filter{} + for _, filter := range s.logFiltersWithWSConn { + f := filter + filters = append(filters, f) + } + return filters } // GetFilter gets a filter by its id func (s *Storage) GetFilter(filterID string) (*Filter, error) { - s.mutex.Lock() - defer s.mutex.Unlock() + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() filter, found := s.allFilters[filterID] if !found { @@ -152,8 +171,12 @@ func (s *Storage) GetFilter(filterID string) (*Filter, error) { // UpdateFilterLastPoll updates the last poll to now func (s *Storage) UpdateFilterLastPoll(filterID string) error { - s.mutex.Lock() - defer s.mutex.Unlock() + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() filter, found := s.allFilters[filterID] if !found { @@ -166,8 +189,12 @@ func (s *Storage) UpdateFilterLastPoll(filterID string) error { // UninstallFilter deletes a filter by its id func (s *Storage) UninstallFilter(filterID string) error { - s.mutex.Lock() - defer s.mutex.Unlock() + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() filter, found := s.allFilters[filterID] if !found { @@ -180,8 +207,12 @@ func (s *Storage) UninstallFilter(filterID string) error { // UninstallFilterByWSConn deletes all filters connected to the provided web socket connection func (s *Storage) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { - s.mutex.Lock() - defer s.mutex.Unlock() + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() filters, found := s.allFiltersWithWSConn[wsConn] if !found { From 1f51eb802531fb12a6b390be3ef3116b97dd8830 Mon Sep 17 00:00:00 2001 From: tclemos Date: Wed, 15 Nov 2023 22:57:59 -0300 Subject: [PATCH 6/9] refactoring the mechanism to notify new logs --- jsonrpc/endpoints_eth.go | 95 +++++++++++++++++++++++++------ jsonrpc/endpoints_eth_test.go | 104 ++++++++++++++++++++++++++++++++++ state/l2block.go | 8 +++ state/pgstatestorage.go | 19 +++++++ 4 files changed, 208 insertions(+), 18 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 03e7dc1308..92515e2dd6 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -28,6 +28,9 @@ const ( // to communicate with the state for eth_EstimateGas and eth_Call when // the From field is not specified because it is optional DefaultSenderAddress = "0x1111111111111111111111111111111111111111" + + // maxTopics is the max number of topics a log can have + maxTopics = 4 ) // EthEndpoints contains implementations for the "eth" RPC endpoints @@ -1180,25 +1183,13 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE start = time.Now() // get new logs for this specific filter - changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) - if err != nil { - log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", f.ID, err) - return - } - - log.Infof("[notifyNewLogs] took %v to run internalGetLogs", time.Since(start)) - start = time.Now() - - // if there are new logs for the filter, send it - if changes != nil { - ethLogs := changes.([]types.Log) - for _, ethLog := range ethLogs { - data, err := json.Marshal(ethLog) - if err != nil { - log.Errorf("failed to marshal ethLog response to subscription: %v", err) - } - f.EnqueueSubscriptionDataToBeSent(data) + logs := filterLogs(event.Logs, filterParameters) + for _, l := range logs { + data, err := json.Marshal(l) + if err != nil { + log.Errorf("failed to marshal ethLog response to subscription: %v", err) } + f.EnqueueSubscriptionDataToBeSent(data) } log.Infof("[notifyNewLogs] took %v to enqueue log messages", time.Since(start)) @@ -1207,3 +1198,71 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE enqueueWg.Wait() log.Infof("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) } + +// filterLogs will filter the provided logsToFilter accordingly to the filters provided +func filterLogs(logsToFilter []*ethTypes.Log, filter LogFilter) []types.Log { + logs := make([]types.Log, 0) + for _, l := range logsToFilter { + // check address filter + if len(filter.Addresses) > 0 { + // if the log address doesn't match any address in the filter, skip this log + if !contains(filter.Addresses, l.Address) { + continue + } + } + + // check topics + match := true + if len(filter.Topics) > 0 { + out: + // check all topics + for i := 0; i < maxTopics; i++ { + // check if the filter contains information + // to filter this topic position + checkTopic := len(filter.Topics) > i + if !checkTopic { + // if we shouldn't check this topic, we can assume + // no more topics needs to be checked, because there + // will be no more topic filters, so we can break out + break out + } + + // check if the topic filter allows any topic + acceptAnyTopic := len(filter.Topics[i]) == 0 + if acceptAnyTopic { + // since any topic is allowed, we continue to the next topic filters + continue + } + + // check if the log has the required topic set + logHasTopic := len(l.Topics) > i + if !logHasTopic { + // if the log doesn't have the required topic set, skip this log + match = false + break out + } + + // check if the any topic in the filter matches the log topic + if !contains(filter.Topics[i], l.Topics[i]) { + match = false + // if the log topic doesn't match any topic in the filter, skip this log + break out + } + } + } + if match { + logs = append(logs, types.NewLog(*l)) + } + } + return logs +} + +// contains check if the item can be found in the items +func contains[T comparable](items []T, itemsToFind T) bool { + for _, item := range items { + if item == itemsToFind { + return true + } + } + return false +} diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 4ab2d80890..382cee5998 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -5054,3 +5054,107 @@ func TestSubscribeNewLogs(t *testing.T) { }) } } + +func TestFilterLogs(t *testing.T) { + logs := []*ethTypes.Log{{ + Address: common.HexToAddress("0x1"), + Topics: []common.Hash{ + common.HexToHash("0xA"), + common.HexToHash("0xB"), + }, + }} + + // empty filter + filteredLogs := filterLogs(logs, LogFilter{}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by the log address + filteredLogs = filterLogs(logs, LogFilter{Addresses: []common.Address{ + common.HexToAddress("0x1"), + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by the log address and another random address + filteredLogs = filterLogs(logs, LogFilter{Addresses: []common.Address{ + common.HexToAddress("0x1"), + common.HexToAddress("0x2"), + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by unknown address + filteredLogs = filterLogs(logs, LogFilter{Addresses: []common.Address{ + common.HexToAddress("0x2"), + }}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by topic0 + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA")}, + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by topic0 but allows any topic1 + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA")}, + {}, + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by any topic0 but forces topic1 + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {}, + {common.HexToHash("0xB")}, + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by forcing topic0 and topic1 + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA")}, + {common.HexToHash("0xB")}, + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by forcing topic0 and topic1 to be any of the values + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA"), common.HexToHash("0xB")}, + {common.HexToHash("0xA"), common.HexToHash("0xB")}, + }}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by forcing topic0 and topic1 to wrong values + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xB")}, + {common.HexToHash("0xA")}, + }}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by forcing topic0 to wrong value + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xB")}, + }}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by accepting any topic0 by forcing topic1 to wrong value + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {}, + {common.HexToHash("0xA")}, + }}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by accepting any topic0 and topic1 but forcing topic2 that doesn't exist + filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + {}, + {}, + {common.HexToHash("0xA")}, + }}) + assert.Equal(t, 0, len(filteredLogs)) +} + +func TestContains(t *testing.T) { + items := []int{1, 2, 3} + assert.Equal(t, false, contains(items, 0)) + assert.Equal(t, true, contains(items, 1)) + assert.Equal(t, true, contains(items, 2)) + assert.Equal(t, true, contains(items, 3)) + assert.Equal(t, false, contains(items, 4)) +} diff --git a/state/l2block.go b/state/l2block.go index 8e4be4f917..ed2205c2a6 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -20,6 +20,7 @@ type NewL2BlockEventHandler func(e NewL2BlockEvent) // when a new l2 block is detected with data related to this new l2 block. type NewL2BlockEvent struct { Block types.Block + Logs []*types.Log } // StartToMonitorNewL2Blocks starts 2 go routines that will @@ -84,10 +85,17 @@ func (s *State) monitorNewL2Blocks() { log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) break } + logs, err := s.GetLogsByBlockNumber(context.Background(), bn, nil) + if err != nil { + log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) + break + } + log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64()) start := time.Now() s.newL2BlockEvents <- NewL2BlockEvent{ Block: *block, + Logs: logs, } lastL2BlockNumberSeen = block.NumberU64() log.Infof("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index ad83b380a6..f3a8fff2e9 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -1968,6 +1968,25 @@ func (p *PostgresStorage) IsL2BlockVirtualized(ctx context.Context, blockNumber return isVirtualized, nil } +// GetLogsByBlockNumber get all the logs from a specific block ordered by log index +func (p *PostgresStorage) GetLogsByBlockNumber(ctx context.Context, blockNumber uint64, dbTx pgx.Tx) ([]*types.Log, error) { + const query = ` + SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3 + FROM state.log l + INNER JOIN state.transaction t ON t.hash = l.tx_hash + INNER JOIN state.l2block b ON b.block_num = t.l2_block_num + WHERE b.block_num = $1 + ORDER BY l.log_index ASC` + + q := p.getExecQuerier(dbTx) + rows, err := q.Query(ctx, query, blockNumber) + if err != nil { + return nil, err + } + + return scanLogs(rows) +} + // GetLogs returns the logs that match the filter func (p *PostgresStorage) GetLogs(ctx context.Context, fromBlock uint64, toBlock uint64, addresses []common.Address, topics [][]common.Hash, blockHash *common.Hash, since *time.Time, dbTx pgx.Tx) ([]*types.Log, error) { // query parts From d103663779bd553455c4c2023c204d09b2ee7560 Mon Sep 17 00:00:00 2001 From: tclemos Date: Thu, 16 Nov 2023 01:24:59 -0300 Subject: [PATCH 7/9] replace WS notification concurrency to avoid congestion limiting the number of workers --- jsonrpc/endpoints_eth.go | 219 ++++++++++++++++++++-------------- jsonrpc/endpoints_eth_test.go | 74 ++++++++---- 2 files changed, 181 insertions(+), 112 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 92515e2dd6..2074c68a15 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1090,18 +1090,16 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block filters := e.storage.GetAllBlockFiltersWithWSConn() log.Infof("[notifyNewHeads] took %v to get block filters with ws connections", time.Since(start)) - enqueueWg := sync.WaitGroup{} - for _, filter := range filters { - f := filter - enqueueWg.Add(1) - go func(f *Filter) { - defer enqueueWg.Done() + + const maxWorkers = 16 + parallelize(maxWorkers, filters, func(worker int, filters []*Filter) { + for _, filter := range filters { + f := filter start := time.Now() f.EnqueueSubscriptionDataToBeSent(data) log.Infof("[notifyNewHeads] took %v to enqueue new l2 block messages", time.Since(start)) - }(f) - } - enqueueWg.Wait() + } + }) log.Infof("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) } @@ -1112,78 +1110,23 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE filters := e.storage.GetAllLogFiltersWithWSConn() log.Infof("[notifyNewLogs] took %v to get log filters with ws connections", time.Since(start)) - enqueueWg := sync.WaitGroup{} - for _, filter := range filters { - f := filter - enqueueWg.Add(1) - go func(f *Filter) { - defer enqueueWg.Done() - start := time.Now() - - filterParameters := f.Parameters.(LogFilter) - bn := types.BlockNumber(event.Block.NumberU64()) - if filterParameters.BlockHash != nil { - // if the filter block hash is set, we check if the block is the - // one with the expected hash, otherwise we ignore the filter - bh := *filterParameters.BlockHash - if bh.String() != event.Block.Hash().String() { - return - } - } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { - // in case the block hash is nil and also from and to blocks are nil, set it - // to the current block to make the query faster - filterParameters.FromBlock = &bn - filterParameters.ToBlock = &bn - } else { - // if the filter has a fromBlock value set - // and the event block number is smaller than the - // from block, skip this filter - if filterParameters.FromBlock != nil { - fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", f.ID, rpcErr) - return - } - // if the block number is smaller than the fromBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() < fromBlock { - return - } - // otherwise set the from block to a fixed number - // to avoid querying it again in the next step - fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.FromBlock = &fixedFromBlock - } - - // if the filter has a toBlock value set - // and the event block number is greater than the - // to block, skip this filter - if filterParameters.ToBlock != nil { - toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", f.ID, rpcErr) - return - } - // if the block number is greater than the toBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() > toBlock { - return - } - // otherwise set the to block to a fixed number - // to avoid querying it again in the next step - fixedToBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.ToBlock = &fixedToBlock - } + const maxWorkers = 16 + parallelize(maxWorkers, filters, func(worker int, filters []*Filter) { + for _, filter := range filters { + f := filter + start := time.Now() + if e.shouldSkipLogFilter(event, filter) { + return } + log.Infof("[notifyNewLogs] took %v to check if should skip log filter", time.Since(start)) - log.Infof("[notifyNewLogs] took %v to prepare filter parameters", time.Since(start)) start = time.Now() - // get new logs for this specific filter - logs := filterLogs(event.Logs, filterParameters) + logs := filterLogs(event.Logs, filter) + log.Infof("[notifyNewLogs] took %v to filter logs", time.Since(start)) + + start = time.Now() for _, l := range logs { data, err := json.Marshal(l) if err != nil { @@ -1191,35 +1134,87 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE } f.EnqueueSubscriptionDataToBeSent(data) } - log.Infof("[notifyNewLogs] took %v to enqueue log messages", time.Since(start)) - }(f) - } - enqueueWg.Wait() + } + }) + log.Infof("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) } +// shouldSkipLogFilter checks if the log filter can be skipped while notifying new logs. +// it checks the log filter information against the block in the event to decide if the +// information in the event is required by the filter or can be ignored to save resources. +func (e *EthEndpoints) shouldSkipLogFilter(event state.NewL2BlockEvent, filter *Filter) bool { + logFilter := filter.Parameters.(LogFilter) + + if logFilter.BlockHash != nil { + // if the filter block hash is set, we check if the block is the + // one with the expected hash, otherwise we ignore the filter + bh := *logFilter.BlockHash + if bh.String() != event.Block.Hash().String() { + return true + } + } else { + // if the filter has a fromBlock value set + // and the event block number is smaller than the + // from block, skip this filter + if logFilter.FromBlock != nil { + fromBlock, rpcErr := logFilter.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", filter.ID, rpcErr) + return true + } + // if the block number is smaller than the fromBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() < fromBlock { + return true + } + } + + // if the filter has a toBlock value set + // and the event block number is greater than the + // to block, skip this filter + if logFilter.ToBlock != nil { + toBlock, rpcErr := logFilter.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", filter.ID, rpcErr) + return true + } + // if the block number is greater than the toBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() > toBlock { + return true + } + } + } + return false +} + // filterLogs will filter the provided logsToFilter accordingly to the filters provided -func filterLogs(logsToFilter []*ethTypes.Log, filter LogFilter) []types.Log { +func filterLogs(logsToFilter []*ethTypes.Log, filter *Filter) []types.Log { + logFilter := filter.Parameters.(LogFilter) + logs := make([]types.Log, 0) for _, l := range logsToFilter { // check address filter - if len(filter.Addresses) > 0 { + if len(logFilter.Addresses) > 0 { // if the log address doesn't match any address in the filter, skip this log - if !contains(filter.Addresses, l.Address) { + if !contains(logFilter.Addresses, l.Address) { continue } } // check topics match := true - if len(filter.Topics) > 0 { + if len(logFilter.Topics) > 0 { out: // check all topics for i := 0; i < maxTopics; i++ { // check if the filter contains information // to filter this topic position - checkTopic := len(filter.Topics) > i + checkTopic := len(logFilter.Topics) > i if !checkTopic { // if we shouldn't check this topic, we can assume // no more topics needs to be checked, because there @@ -1228,7 +1223,7 @@ func filterLogs(logsToFilter []*ethTypes.Log, filter LogFilter) []types.Log { } // check if the topic filter allows any topic - acceptAnyTopic := len(filter.Topics[i]) == 0 + acceptAnyTopic := len(logFilter.Topics[i]) == 0 if acceptAnyTopic { // since any topic is allowed, we continue to the next topic filters continue @@ -1243,7 +1238,7 @@ func filterLogs(logsToFilter []*ethTypes.Log, filter LogFilter) []types.Log { } // check if the any topic in the filter matches the log topic - if !contains(filter.Topics[i], l.Topics[i]) { + if !contains(logFilter.Topics[i], l.Topics[i]) { match = false // if the log topic doesn't match any topic in the filter, skip this log break out @@ -1266,3 +1261,53 @@ func contains[T comparable](items []T, itemsToFind T) bool { } return false } + +// parallelize split the items into workers accordingly +// to the max number of workers and the number of items, +// allowing the fn to be executed in concurrently for different +// chunks of items. +func parallelize[T any](maxWorkers int, items []T, fn func(worker int, items []T)) { + if len(items) == 0 { + return + } + + var workersCount = maxWorkers + if workersCount > len(items) { + workersCount = len(items) + } + + var jobSize = len(items) / workersCount + var rest = len(items) % workersCount + if rest > 0 { + jobSize++ + } + + wg := sync.WaitGroup{} + for worker := 0; worker < workersCount; worker++ { + rangeStart := worker * jobSize + rangeEnd := ((worker + 1) * jobSize) + + if rangeStart > len(items) { + continue + } + + if rangeEnd > len(items) { + rangeEnd = len(items) + } + + jobItems := items[rangeStart:rangeEnd] + + wg.Add(1) + go func(worker int, filteredItems []T, fn func(worker int, items []T)) { + defer func() { + wg.Done() + err := recover() + if err != nil { + fmt.Println(err) + } + }() + fn(worker, filteredItems) + }(worker, jobItems, fn) + } + wg.Wait() +} diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 382cee5998..ab796c193b 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "testing" "time" @@ -5065,88 +5066,88 @@ func TestFilterLogs(t *testing.T) { }} // empty filter - filteredLogs := filterLogs(logs, LogFilter{}) + filteredLogs := filterLogs(logs, &Filter{Parameters: LogFilter{}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by the log address - filteredLogs = filterLogs(logs, LogFilter{Addresses: []common.Address{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Addresses: []common.Address{ common.HexToAddress("0x1"), - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by the log address and another random address - filteredLogs = filterLogs(logs, LogFilter{Addresses: []common.Address{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Addresses: []common.Address{ common.HexToAddress("0x1"), common.HexToAddress("0x2"), - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by unknown address - filteredLogs = filterLogs(logs, LogFilter{Addresses: []common.Address{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Addresses: []common.Address{ common.HexToAddress("0x2"), - }}) + }}}) assert.Equal(t, 0, len(filteredLogs)) // filtered by topic0 - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {common.HexToHash("0xA")}, - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by topic0 but allows any topic1 - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {common.HexToHash("0xA")}, {}, - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by any topic0 but forces topic1 - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {}, {common.HexToHash("0xB")}, - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by forcing topic0 and topic1 - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {common.HexToHash("0xA")}, {common.HexToHash("0xB")}, - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by forcing topic0 and topic1 to be any of the values - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {common.HexToHash("0xA"), common.HexToHash("0xB")}, {common.HexToHash("0xA"), common.HexToHash("0xB")}, - }}) + }}}) assert.Equal(t, 1, len(filteredLogs)) // filtered by forcing topic0 and topic1 to wrong values - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {common.HexToHash("0xB")}, {common.HexToHash("0xA")}, - }}) + }}}) assert.Equal(t, 0, len(filteredLogs)) // filtered by forcing topic0 to wrong value - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {common.HexToHash("0xB")}, - }}) + }}}) assert.Equal(t, 0, len(filteredLogs)) // filtered by accepting any topic0 by forcing topic1 to wrong value - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {}, {common.HexToHash("0xA")}, - }}) + }}}) assert.Equal(t, 0, len(filteredLogs)) // filtered by accepting any topic0 and topic1 but forcing topic2 that doesn't exist - filteredLogs = filterLogs(logs, LogFilter{Topics: [][]common.Hash{ + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ {}, {}, {common.HexToHash("0xA")}, - }}) + }}}) assert.Equal(t, 0, len(filteredLogs)) } @@ -5158,3 +5159,26 @@ func TestContains(t *testing.T) { assert.Equal(t, true, contains(items, 3)) assert.Equal(t, false, contains(items, 4)) } + +func TestParalelize(t *testing.T) { + items := []int{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 11, 12, 13, 14, 15, 16, + } + + results := map[int][]int{} + mu := &sync.Mutex{} + + parallelize(7, items, func(worker int, items []int) { + mu.Lock() + results[worker] = items + mu.Unlock() + }) + + assert.ElementsMatch(t, []int{1, 2, 3}, results[0]) + assert.ElementsMatch(t, []int{4, 5, 6}, results[1]) + assert.ElementsMatch(t, []int{7, 8, 9}, results[2]) + assert.ElementsMatch(t, []int{10, 11, 12}, results[3]) + assert.ElementsMatch(t, []int{13, 14, 15}, results[4]) + assert.ElementsMatch(t, []int{16}, results[5]) +} From c8f43bf0016127cec784b59945bda8014e802c02 Mon Sep 17 00:00:00 2001 From: tclemos Date: Thu, 16 Nov 2023 14:19:00 -0300 Subject: [PATCH 8/9] increase maxWorkers from 16 to 32 --- jsonrpc/endpoints_eth.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 2074c68a15..faa0de47fe 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -1091,7 +1091,7 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block filters := e.storage.GetAllBlockFiltersWithWSConn() log.Infof("[notifyNewHeads] took %v to get block filters with ws connections", time.Since(start)) - const maxWorkers = 16 + const maxWorkers = 32 parallelize(maxWorkers, filters, func(worker int, filters []*Filter) { for _, filter := range filters { f := filter @@ -1111,7 +1111,7 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE filters := e.storage.GetAllLogFiltersWithWSConn() log.Infof("[notifyNewLogs] took %v to get log filters with ws connections", time.Since(start)) - const maxWorkers = 16 + const maxWorkers = 32 parallelize(maxWorkers, filters, func(worker int, filters []*Filter) { for _, filter := range filters { f := filter From e0f360c1486dd85553c1b5f04906d4ba8e651faa Mon Sep 17 00:00:00 2001 From: tclemos Date: Thu, 16 Nov 2023 14:36:58 -0300 Subject: [PATCH 9/9] revert unnecessary migration changes --- db/migrations/state/0011_test.go | 73 +++++++++++++++++++ db/migrations/state/util_test.go | 117 ------------------------------- 2 files changed, 73 insertions(+), 117 deletions(-) create mode 100644 db/migrations/state/0011_test.go delete mode 100644 db/migrations/state/util_test.go diff --git a/db/migrations/state/0011_test.go b/db/migrations/state/0011_test.go new file mode 100644 index 0000000000..3c245e7d31 --- /dev/null +++ b/db/migrations/state/0011_test.go @@ -0,0 +1,73 @@ +package migrations_test + +import ( + "database/sql" + "testing" + + "github.com/stretchr/testify/assert" +) + +// this migration changes length of the token name +type migrationTest0011 struct{} + +func (m migrationTest0011) InsertData(db *sql.DB) error { + return nil +} + +func (m migrationTest0011) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) { + indexes := []string{ + "l2block_created_at_idx", + "log_log_index_idx", + "log_topic0_idx", + "log_topic1_idx", + "log_topic2_idx", + "log_topic3_idx", + } + // Check indexes adding + for _, idx := range indexes { + // getIndex + const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` + row := db.QueryRow(getIndex, idx) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 1, result) + } + + // Check column egp_log exists in state.transactions table + const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'` + row := db.QueryRow(getFinalDeviationColumn) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 1, result) +} + +func (m migrationTest0011) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) { + indexes := []string{ + "l2block_created_at_idx", + "log_log_index_idx", + "log_topic0_idx", + "log_topic1_idx", + "log_topic2_idx", + "log_topic3_idx", + } + // Check indexes removing + for _, idx := range indexes { + // getIndex + const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` + row := db.QueryRow(getIndex, idx) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 0, result) + } + + // Check column egp_log doesn't exists in state.transactions table + const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'` + row := db.QueryRow(getFinalDeviationColumn) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 0, result) +} + +func TestMigration0011(t *testing.T) { + runMigrationTest(t, 11, migrationTest0011{}) +} diff --git a/db/migrations/state/util_test.go b/db/migrations/state/util_test.go deleted file mode 100644 index 022ff869cc..0000000000 --- a/db/migrations/state/util_test.go +++ /dev/null @@ -1,117 +0,0 @@ -package migrations_test - -import ( - "database/sql" - "fmt" - "testing" - - "github.com/0xPolygonHermez/zkevm-node/db" - "github.com/0xPolygonHermez/zkevm-node/log" - "github.com/0xPolygonHermez/zkevm-node/test/dbutils" - "github.com/gobuffalo/packr/v2" - "github.com/jackc/pgx/v4" - "github.com/jackc/pgx/v4/stdlib" - migrate "github.com/rubenv/sql-migrate" - "github.com/stretchr/testify/require" -) - -/* - Considerations tricks and tips for migration file testing: - - Functionality of the DB is tested by the rest of the packages, migration tests only have to check persistence across migrations (both UP and DOWN) - - It's recommended to use real data (from testnet/mainnet), but modifying NULL fields to check that those are migrated properly - - It's recommended to use some SQL tool (such as DBeaver) that generates insert queries from existing rows - - Any new migration file could be tested using the existing `migrationTester` interface. Check `0002_test.go` for an example -*/ - -func init() { - log.Init(log.Config{ - Level: "debug", - Outputs: []string{"stderr"}, - }) -} - -type migrationTester interface { - // InsertData used to insert data in the affected tables of the migration that is being tested - // data will be inserted with the schema as it was previous the migration that is being tested - InsertData(*sql.DB) error - // RunAssertsAfterMigrationUp this function will be called after running the migration is being tested - // and should assert that the data inserted in the function InsertData is persisted properly - RunAssertsAfterMigrationUp(*testing.T, *sql.DB) - // RunAssertsAfterMigrationDown this function will be called after reverting the migration that is being tested - // and should assert that the data inserted in the function InsertData is persisted properly - RunAssertsAfterMigrationDown(*testing.T, *sql.DB) -} - -var ( - stateDBCfg = dbutils.NewStateConfigFromEnv() - packrMigrations = map[string]*packr.Box{ - db.StateMigrationName: packr.New(db.StateMigrationName, "./migrations/state"), - db.PoolMigrationName: packr.New(db.PoolMigrationName, "./migrations/pool"), - } -) - -func runMigrationTest(t *testing.T, migrationNumber int, miter migrationTester) { - // Initialize an empty DB - d, err := initCleanSQLDB() - require.NoError(t, err) - require.NoError(t, runMigrationsDown(d, 0, db.StateMigrationName)) - // Run migrations until migration to test - require.NoError(t, runMigrationsUp(d, migrationNumber-1, db.StateMigrationName)) - // Insert data into table(s) affected by migration - require.NoError(t, miter.InsertData(d)) - // Run migration that is being tested - require.NoError(t, runMigrationsUp(d, 1, db.StateMigrationName)) - // Check that data is persisted properly after migration up - miter.RunAssertsAfterMigrationUp(t, d) - // Revert migration to test - require.NoError(t, runMigrationsDown(d, 1, db.StateMigrationName)) - // Check that data is persisted properly after migration down - miter.RunAssertsAfterMigrationDown(t, d) -} - -func initCleanSQLDB() (*sql.DB, error) { - // run migrations - if err := db.RunMigrationsDown(stateDBCfg, db.StateMigrationName); err != nil { - return nil, err - } - c, err := pgx.ParseConfig(fmt.Sprintf("postgres://%s:%s@%s:%s/%s", stateDBCfg.User, stateDBCfg.Password, stateDBCfg.Host, stateDBCfg.Port, stateDBCfg.Name)) - if err != nil { - return nil, err - } - sqlDB := stdlib.OpenDB(*c) - return sqlDB, nil -} - -func runMigrationsUp(d *sql.DB, n int, packrName string) error { - box, ok := packrMigrations[packrName] - if !ok { - return fmt.Errorf("packr box not found with name: %v", packrName) - } - - var migrations = &migrate.PackrMigrationSource{Box: box} - nMigrations, err := migrate.ExecMax(d, "postgres", migrations, migrate.Up, n) - if err != nil { - return err - } - if nMigrations != n { - return fmt.Errorf("Unexpected amount of migrations: expected: %d, actual: %d", n, nMigrations) - } - return nil -} - -func runMigrationsDown(d *sql.DB, n int, packrName string) error { - box, ok := packrMigrations[packrName] - if !ok { - return fmt.Errorf("packr box not found with name: %v", packrName) - } - - var migrations = &migrate.PackrMigrationSource{Box: box} - nMigrations, err := migrate.ExecMax(d, "postgres", migrations, migrate.Down, n) - if err != nil { - return err - } - if nMigrations != n { - return fmt.Errorf("Unexpected amount of migrations: expected: %d, actual: %d", n, nMigrations) - } - return nil -}