diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 3835c33a89..faa0de47fe 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -28,6 +28,9 @@ const ( // to communicate with the state for eth_EstimateGas and eth_Call when // the From field is not specified because it is optional DefaultSenderAddress = "0x1111111111111111111111111111111111111111" + + // maxTopics is the max number of topics a log can have + maxTopics = 4 ) // EthEndpoints contains implementations for the "eth" RPC endpoints @@ -1056,7 +1059,8 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error { // onNewL2Block is triggered when the state triggers the event for a new l2 block func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { - log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + log.Infof("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64()) + start := time.Now() wg := sync.WaitGroup{} wg.Add(1) @@ -1066,127 +1070,244 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { go e.notifyNewLogs(&wg, event) wg.Wait() + log.Infof("[onNewL2Block] new l2 block %v took %v to send the messages to all ws connections", event.Block.NumberU64(), time.Since(start)) } func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) { defer wg.Done() start := time.Now() - blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn() + + b, err := types.NewBlock(&event.Block, nil, false, false) if err != nil { - log.Errorf("failed to get all block filters with web sockets connections: %v", err) - } else { - b, err := types.NewBlock(&event.Block, nil, false, false) - if err != nil { - log.Errorf("failed to build block response to subscription: %v", err) - return - } - data, err := json.Marshal(b) - if err != nil { - log.Errorf("failed to marshal block response to subscription: %v", err) - return - } - for _, filter := range blockFilters { - filter.EnqueueSubscriptionDataToBeSent(data) - } + log.Errorf("failed to build block response to subscription: %v", err) + return + } + data, err := json.Marshal(b) + if err != nil { + log.Errorf("failed to marshal block response to subscription: %v", err) + return } - log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) + + filters := e.storage.GetAllBlockFiltersWithWSConn() + log.Infof("[notifyNewHeads] took %v to get block filters with ws connections", time.Since(start)) + + const maxWorkers = 32 + parallelize(maxWorkers, filters, func(worker int, filters []*Filter) { + for _, filter := range filters { + f := filter + start := time.Now() + f.EnqueueSubscriptionDataToBeSent(data) + log.Infof("[notifyNewHeads] took %v to enqueue new l2 block messages", time.Since(start)) + } + }) + + log.Infof("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start)) } func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) { defer wg.Done() start := time.Now() - logFilters, err := e.storage.GetAllLogFiltersWithWSConn() - if err != nil { - log.Errorf("failed to get all log filters with web sockets connections: %v", err) - } else { - for _, filter := range logFilters { - filterParameters := filter.Parameters.(LogFilter) - bn := types.BlockNumber(event.Block.NumberU64()) - if filterParameters.BlockHash != nil { - // if the filter block hash is set, we check if the block is the - // one with the expected hash, otherwise we ignore the filter - bh := *filterParameters.BlockHash - if bh.String() != event.Block.Hash().String() { - continue - } - } else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil { - // in case the block hash is nil and also from and to blocks are nil, set it - // to the current block to make the query faster - filterParameters.FromBlock = &bn - filterParameters.ToBlock = &bn - } else { - // if the filter has a fromBlock value set - // and the event block number is smaller than the - // from block, skip this filter - if filterParameters.FromBlock != nil { - fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf(rpcErr.Error(), filter.ID, err) - continue - } - // if the block number is smaller than the fromBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() < fromBlock { - continue - } - // otherwise set the from block to a fixed number - // to avoid querying it again in the next step - fixedFromBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.FromBlock = &fixedFromBlock - } + filters := e.storage.GetAllLogFiltersWithWSConn() + log.Infof("[notifyNewLogs] took %v to get log filters with ws connections", time.Since(start)) - // if the filter has a toBlock value set - // and the event block number is greater than the - // to block, skip this filter - if filterParameters.ToBlock != nil { - toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) - if rpcErr != nil { - log.Errorf(rpcErr.Error(), filter.ID, err) - continue - } - // if the block number is greater than the toBlock value - // this means this block is out of the block range for this - // filter, so we skip it - if event.Block.NumberU64() > toBlock { - continue - } - // otherwise set the to block to a fixed number - // to avoid querying it again in the next step - fixedToBlock := types.BlockNumber(event.Block.NumberU64()) - filterParameters.ToBlock = &fixedToBlock - } + const maxWorkers = 32 + parallelize(maxWorkers, filters, func(worker int, filters []*Filter) { + for _, filter := range filters { + f := filter + start := time.Now() + if e.shouldSkipLogFilter(event, filter) { + return } + log.Infof("[notifyNewLogs] took %v to check if should skip log filter", time.Since(start)) + start = time.Now() // get new logs for this specific filter - changes, err := e.internalGetLogs(context.Background(), nil, filterParameters) - if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) { - log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err) - err := e.storage.UninstallFilter(filter.ID) - if !errors.Is(err, ErrNotFound) && err != nil { - log.Errorf("failed to automatically uninstall filter %v: %v", filter.ID, err) - } else { - log.Infof("Filter %v automatically uninstalled", filter.ID) + logs := filterLogs(event.Logs, filter) + log.Infof("[notifyNewLogs] took %v to filter logs", time.Since(start)) + + start = time.Now() + for _, l := range logs { + data, err := json.Marshal(l) + if err != nil { + log.Errorf("failed to marshal ethLog response to subscription: %v", err) } - continue - } else if err != nil { - log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err) + f.EnqueueSubscriptionDataToBeSent(data) + } + log.Infof("[notifyNewLogs] took %v to enqueue log messages", time.Since(start)) + } + }) + + log.Infof("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) +} + +// shouldSkipLogFilter checks if the log filter can be skipped while notifying new logs. +// it checks the log filter information against the block in the event to decide if the +// information in the event is required by the filter or can be ignored to save resources. +func (e *EthEndpoints) shouldSkipLogFilter(event state.NewL2BlockEvent, filter *Filter) bool { + logFilter := filter.Parameters.(LogFilter) + + if logFilter.BlockHash != nil { + // if the filter block hash is set, we check if the block is the + // one with the expected hash, otherwise we ignore the filter + bh := *logFilter.BlockHash + if bh.String() != event.Block.Hash().String() { + return true + } + } else { + // if the filter has a fromBlock value set + // and the event block number is smaller than the + // from block, skip this filter + if logFilter.FromBlock != nil { + fromBlock, rpcErr := logFilter.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", filter.ID, rpcErr) + return true + } + // if the block number is smaller than the fromBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() < fromBlock { + return true + } + } + + // if the filter has a toBlock value set + // and the event block number is greater than the + // to block, skip this filter + if logFilter.ToBlock != nil { + toBlock, rpcErr := logFilter.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil) + if rpcErr != nil { + log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", filter.ID, rpcErr) + return true + } + // if the block number is greater than the toBlock value + // this means this block is out of the block range for this + // filter, so we skip it + if event.Block.NumberU64() > toBlock { + return true + } + } + } + return false +} + +// filterLogs will filter the provided logsToFilter accordingly to the filters provided +func filterLogs(logsToFilter []*ethTypes.Log, filter *Filter) []types.Log { + logFilter := filter.Parameters.(LogFilter) + + logs := make([]types.Log, 0) + for _, l := range logsToFilter { + // check address filter + if len(logFilter.Addresses) > 0 { + // if the log address doesn't match any address in the filter, skip this log + if !contains(logFilter.Addresses, l.Address) { continue } + } - // if there are new logs for the filter, send it - if changes != nil { - ethLogs := changes.([]types.Log) - for _, ethLog := range ethLogs { - data, err := json.Marshal(ethLog) - if err != nil { - log.Errorf("failed to marshal ethLog response to subscription: %v", err) - } - filter.EnqueueSubscriptionDataToBeSent(data) + // check topics + match := true + if len(logFilter.Topics) > 0 { + out: + // check all topics + for i := 0; i < maxTopics; i++ { + // check if the filter contains information + // to filter this topic position + checkTopic := len(logFilter.Topics) > i + if !checkTopic { + // if we shouldn't check this topic, we can assume + // no more topics needs to be checked, because there + // will be no more topic filters, so we can break out + break out + } + + // check if the topic filter allows any topic + acceptAnyTopic := len(logFilter.Topics[i]) == 0 + if acceptAnyTopic { + // since any topic is allowed, we continue to the next topic filters + continue + } + + // check if the log has the required topic set + logHasTopic := len(l.Topics) > i + if !logHasTopic { + // if the log doesn't have the required topic set, skip this log + match = false + break out + } + + // check if the any topic in the filter matches the log topic + if !contains(logFilter.Topics[i], l.Topics[i]) { + match = false + // if the log topic doesn't match any topic in the filter, skip this log + break out } } } + if match { + logs = append(logs, types.NewLog(*l)) + } } - log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start)) + return logs +} + +// contains check if the item can be found in the items +func contains[T comparable](items []T, itemsToFind T) bool { + for _, item := range items { + if item == itemsToFind { + return true + } + } + return false +} + +// parallelize split the items into workers accordingly +// to the max number of workers and the number of items, +// allowing the fn to be executed in concurrently for different +// chunks of items. +func parallelize[T any](maxWorkers int, items []T, fn func(worker int, items []T)) { + if len(items) == 0 { + return + } + + var workersCount = maxWorkers + if workersCount > len(items) { + workersCount = len(items) + } + + var jobSize = len(items) / workersCount + var rest = len(items) % workersCount + if rest > 0 { + jobSize++ + } + + wg := sync.WaitGroup{} + for worker := 0; worker < workersCount; worker++ { + rangeStart := worker * jobSize + rangeEnd := ((worker + 1) * jobSize) + + if rangeStart > len(items) { + continue + } + + if rangeEnd > len(items) { + rangeEnd = len(items) + } + + jobItems := items[rangeStart:rangeEnd] + + wg.Add(1) + go func(worker int, filteredItems []T, fn func(worker int, items []T)) { + defer func() { + wg.Done() + err := recover() + if err != nil { + fmt.Println(err) + } + }() + fn(worker, filteredItems) + }(worker, jobItems, fn) + } + wg.Wait() } diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 4ab2d80890..ab796c193b 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -7,6 +7,7 @@ import ( "fmt" "math/big" "strings" + "sync" "testing" "time" @@ -5054,3 +5055,130 @@ func TestSubscribeNewLogs(t *testing.T) { }) } } + +func TestFilterLogs(t *testing.T) { + logs := []*ethTypes.Log{{ + Address: common.HexToAddress("0x1"), + Topics: []common.Hash{ + common.HexToHash("0xA"), + common.HexToHash("0xB"), + }, + }} + + // empty filter + filteredLogs := filterLogs(logs, &Filter{Parameters: LogFilter{}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by the log address + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Addresses: []common.Address{ + common.HexToAddress("0x1"), + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by the log address and another random address + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Addresses: []common.Address{ + common.HexToAddress("0x1"), + common.HexToAddress("0x2"), + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by unknown address + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Addresses: []common.Address{ + common.HexToAddress("0x2"), + }}}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by topic0 + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA")}, + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by topic0 but allows any topic1 + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA")}, + {}, + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by any topic0 but forces topic1 + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {}, + {common.HexToHash("0xB")}, + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by forcing topic0 and topic1 + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA")}, + {common.HexToHash("0xB")}, + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by forcing topic0 and topic1 to be any of the values + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xA"), common.HexToHash("0xB")}, + {common.HexToHash("0xA"), common.HexToHash("0xB")}, + }}}) + assert.Equal(t, 1, len(filteredLogs)) + + // filtered by forcing topic0 and topic1 to wrong values + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xB")}, + {common.HexToHash("0xA")}, + }}}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by forcing topic0 to wrong value + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {common.HexToHash("0xB")}, + }}}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by accepting any topic0 by forcing topic1 to wrong value + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {}, + {common.HexToHash("0xA")}, + }}}) + assert.Equal(t, 0, len(filteredLogs)) + + // filtered by accepting any topic0 and topic1 but forcing topic2 that doesn't exist + filteredLogs = filterLogs(logs, &Filter{Parameters: LogFilter{Topics: [][]common.Hash{ + {}, + {}, + {common.HexToHash("0xA")}, + }}}) + assert.Equal(t, 0, len(filteredLogs)) +} + +func TestContains(t *testing.T) { + items := []int{1, 2, 3} + assert.Equal(t, false, contains(items, 0)) + assert.Equal(t, true, contains(items, 1)) + assert.Equal(t, true, contains(items, 2)) + assert.Equal(t, true, contains(items, 3)) + assert.Equal(t, false, contains(items, 4)) +} + +func TestParalelize(t *testing.T) { + items := []int{ + 1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 11, 12, 13, 14, 15, 16, + } + + results := map[int][]int{} + mu := &sync.Mutex{} + + parallelize(7, items, func(worker int, items []int) { + mu.Lock() + results[worker] = items + mu.Unlock() + }) + + assert.ElementsMatch(t, []int{1, 2, 3}, results[0]) + assert.ElementsMatch(t, []int{4, 5, 6}, results[1]) + assert.ElementsMatch(t, []int{7, 8, 9}, results[2]) + assert.ElementsMatch(t, []int{10, 11, 12}, results[3]) + assert.ElementsMatch(t, []int{13, 14, 15}, results[4]) + assert.ElementsMatch(t, []int{16}, results[5]) +} diff --git a/jsonrpc/interfaces.go b/jsonrpc/interfaces.go index fd6de2538f..acfec7205b 100644 --- a/jsonrpc/interfaces.go +++ b/jsonrpc/interfaces.go @@ -2,8 +2,8 @@ package jsonrpc // storageInterface json rpc internal storage to persist data type storageInterface interface { - GetAllBlockFiltersWithWSConn() ([]*Filter, error) - GetAllLogFiltersWithWSConn() ([]*Filter, error) + GetAllBlockFiltersWithWSConn() []*Filter + GetAllLogFiltersWithWSConn() []*Filter GetFilter(filterID string) (*Filter, error) NewBlockFilter(wsConn *concurrentWsConn) (string, error) NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error) diff --git a/jsonrpc/mock_storage.go b/jsonrpc/mock_storage.go index 4ee466de93..e32d1205b9 100644 --- a/jsonrpc/mock_storage.go +++ b/jsonrpc/mock_storage.go @@ -2,9 +2,7 @@ package jsonrpc -import ( - mock "github.com/stretchr/testify/mock" -) +import mock "github.com/stretchr/testify/mock" // storageMock is an autogenerated mock type for the storageInterface type type storageMock struct { @@ -12,14 +10,10 @@ type storageMock struct { } // GetAllBlockFiltersWithWSConn provides a mock function with given fields: -func (_m *storageMock) GetAllBlockFiltersWithWSConn() ([]*Filter, error) { +func (_m *storageMock) GetAllBlockFiltersWithWSConn() []*Filter { ret := _m.Called() var r0 []*Filter - var r1 error - if rf, ok := ret.Get(0).(func() ([]*Filter, error)); ok { - return rf() - } if rf, ok := ret.Get(0).(func() []*Filter); ok { r0 = rf() } else { @@ -28,24 +22,14 @@ func (_m *storageMock) GetAllBlockFiltersWithWSConn() ([]*Filter, error) { } } - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } // GetAllLogFiltersWithWSConn provides a mock function with given fields: -func (_m *storageMock) GetAllLogFiltersWithWSConn() ([]*Filter, error) { +func (_m *storageMock) GetAllLogFiltersWithWSConn() []*Filter { ret := _m.Called() var r0 []*Filter - var r1 error - if rf, ok := ret.Get(0).(func() ([]*Filter, error)); ok { - return rf() - } if rf, ok := ret.Get(0).(func() []*Filter); ok { r0 = rf() } else { @@ -54,13 +38,7 @@ func (_m *storageMock) GetAllLogFiltersWithWSConn() ([]*Filter, error) { } } - if rf, ok := ret.Get(1).(func() error); ok { - r1 = rf() - } else { - r1 = ret.Error(1) - } - - return r0, r1 + return r0 } // GetFilter provides a mock function with given fields: filterID diff --git a/jsonrpc/query.go b/jsonrpc/query.go index 02b355fce7..57cf626bcc 100644 --- a/jsonrpc/query.go +++ b/jsonrpc/query.go @@ -94,7 +94,7 @@ func (f *Filter) sendSubscriptionResponse(data []byte) { return } log.Debugf("WS message sent: %v", string(message)) - log.Debugf("[SendSubscriptionResponse] took %v", time.Since(start)) + log.Infof("[SendSubscriptionResponse] took %v", time.Since(start)) } // FilterType express the type of the filter, block, logs, pending transactions diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index a1381bc619..c9f0dc1619 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -20,13 +20,28 @@ var ErrFilterInvalidPayload = errors.New("invalid argument 0: cannot specify bot // Storage uses memory to store the data // related to the json rpc server type Storage struct { - filters sync.Map + allFilters map[string]*Filter + allFiltersWithWSConn map[*concurrentWsConn]map[string]*Filter + blockFiltersWithWSConn map[string]*Filter + logFiltersWithWSConn map[string]*Filter + pendingTxFiltersWithWSConn map[string]*Filter + + blockMutex *sync.Mutex + logMutex *sync.Mutex + pendingTxMutex *sync.Mutex } // NewStorage creates and initializes an instance of Storage func NewStorage() *Storage { return &Storage{ - filters: sync.Map{}, + allFilters: make(map[string]*Filter), + allFiltersWithWSConn: make(map[*concurrentWsConn]map[string]*Filter), + blockFiltersWithWSConn: make(map[string]*Filter), + logFiltersWithWSConn: make(map[string]*Filter), + pendingTxFiltersWithWSConn: make(map[string]*Filter), + blockMutex: &sync.Mutex{}, + logMutex: &sync.Mutex{}, + pendingTxMutex: &sync.Mutex{}, } } @@ -56,6 +71,14 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *con if err != nil { return "", fmt.Errorf("failed to generate filter ID: %w", err) } + + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() + f := &Filter{ ID: id, Type: t, @@ -68,8 +91,21 @@ func (s *Storage) createFilter(t FilterType, parameters interface{}, wsConn *con go state.InfiniteSafeRun(f.SendEnqueuedSubscriptionData, fmt.Sprintf("failed to send enqueued subscription data to filter %v", id), time.Second) - s.filters.Store(id, f) + s.allFilters[id] = f + if f.WsConn != nil { + if _, found := s.allFiltersWithWSConn[f.WsConn]; !found { + s.allFiltersWithWSConn[f.WsConn] = make(map[string]*Filter) + } + s.allFiltersWithWSConn[f.WsConn][id] = f + if t == FilterTypeBlock { + s.blockFiltersWithWSConn[id] = f + } else if t == FilterTypeLog { + s.logFiltersWithWSConn[id] = f + } else if t == FilterTypePendingTx { + s.pendingTxFiltersWithWSConn[id] = f + } + } return id, nil } @@ -90,87 +126,122 @@ func (s *Storage) generateFilterID() (string, error) { // GetAllBlockFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new blocks -func (s *Storage) GetAllBlockFiltersWithWSConn() ([]*Filter, error) { - filtersWithWSConn := []*Filter{} - s.filters.Range(func(key, value any) bool { - filter := value.(*Filter) - if filter.WsConn == nil || filter.Type != FilterTypeBlock { - return true - } +func (s *Storage) GetAllBlockFiltersWithWSConn() []*Filter { + s.blockMutex.Lock() + defer s.blockMutex.Unlock() + filters := []*Filter{} + for _, filter := range s.blockFiltersWithWSConn { f := filter - filtersWithWSConn = append(filtersWithWSConn, f) - return true - }) - - return filtersWithWSConn, nil + filters = append(filters, f) + } + return filters } // GetAllLogFiltersWithWSConn returns an array with all filter that have // a web socket connection and are filtering by new logs -func (s *Storage) GetAllLogFiltersWithWSConn() ([]*Filter, error) { - filtersWithWSConn := []*Filter{} - s.filters.Range(func(key, value any) bool { - filter := value.(*Filter) - if filter.WsConn == nil || filter.Type != FilterTypeLog { - return true - } +func (s *Storage) GetAllLogFiltersWithWSConn() []*Filter { + s.logMutex.Lock() + defer s.logMutex.Unlock() + filters := []*Filter{} + for _, filter := range s.logFiltersWithWSConn { f := filter - filtersWithWSConn = append(filtersWithWSConn, f) - return true - }) - - return filtersWithWSConn, nil + filters = append(filters, f) + } + return filters } // GetFilter gets a filter by its id func (s *Storage) GetFilter(filterID string) (*Filter, error) { - filter, found := s.filters.Load(filterID) + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() + + filter, found := s.allFilters[filterID] if !found { return nil, ErrNotFound } - return filter.(*Filter), nil + return filter, nil } // UpdateFilterLastPoll updates the last poll to now func (s *Storage) UpdateFilterLastPoll(filterID string) error { - filterValue, found := s.filters.Load(filterID) + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() + + filter, found := s.allFilters[filterID] if !found { return ErrNotFound } - filter := filterValue.(*Filter) filter.LastPoll = time.Now().UTC() - s.filters.Store(filterID, filter) + s.allFilters[filterID] = filter return nil } // UninstallFilter deletes a filter by its id func (s *Storage) UninstallFilter(filterID string) error { - _, found := s.filters.Load(filterID) + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() + + filter, found := s.allFilters[filterID] if !found { return ErrNotFound } - s.filters.Delete(filterID) + + s.deleteFilter(filter) return nil } // UninstallFilterByWSConn deletes all filters connected to the provided web socket connection func (s *Storage) UninstallFilterByWSConn(wsConn *concurrentWsConn) error { - filterIDsToDelete := []string{} - s.filters.Range(func(key, value any) bool { - id := key.(string) - filter := value.(*Filter) - if filter.WsConn == wsConn { - filterIDsToDelete = append(filterIDsToDelete, id) - } - return true - }) + s.blockMutex.Lock() + s.logMutex.Lock() + s.pendingTxMutex.Lock() + defer s.blockMutex.Unlock() + defer s.logMutex.Unlock() + defer s.pendingTxMutex.Unlock() + + filters, found := s.allFiltersWithWSConn[wsConn] + if !found { + return nil + } - for _, filterID := range filterIDsToDelete { - s.filters.Delete(filterID) + for _, filter := range filters { + s.deleteFilter(filter) } return nil } + +// deleteFilter deletes a filter from all the maps +func (s *Storage) deleteFilter(filter *Filter) { + if filter.Type == FilterTypeBlock { + delete(s.blockFiltersWithWSConn, filter.ID) + } else if filter.Type == FilterTypeLog { + delete(s.logFiltersWithWSConn, filter.ID) + } else if filter.Type == FilterTypePendingTx { + delete(s.pendingTxFiltersWithWSConn, filter.ID) + } + + if filter.WsConn != nil { + delete(s.allFiltersWithWSConn[filter.WsConn], filter.ID) + if len(s.allFiltersWithWSConn[filter.WsConn]) == 0 { + delete(s.allFiltersWithWSConn, filter.WsConn) + } + } + + delete(s.allFilters, filter.ID) +} diff --git a/state/l2block.go b/state/l2block.go index f76eed208a..ed2205c2a6 100644 --- a/state/l2block.go +++ b/state/l2block.go @@ -20,6 +20,7 @@ type NewL2BlockEventHandler func(e NewL2BlockEvent) // when a new l2 block is detected with data related to this new l2 block. type NewL2BlockEvent struct { Block types.Block + Logs []*types.Log } // StartToMonitorNewL2Blocks starts 2 go routines that will @@ -76,7 +77,7 @@ func (s *State) monitorNewL2Blocks() { fromBlockNumber := lastL2BlockNumberSeen + uint64(1) toBlockNumber := lastL2BlockNumber - log.Debugf("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) + log.Infof("[monitorNewL2Blocks] new l2 block detected from block %v to %v", fromBlockNumber, toBlockNumber) for bn := fromBlockNumber; bn <= toBlockNumber; bn++ { block, err := s.GetL2BlockByNumber(context.Background(), bn, nil) @@ -84,13 +85,20 @@ func (s *State) monitorNewL2Blocks() { log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) break } + logs, err := s.GetLogsByBlockNumber(context.Background(), bn, nil) + if err != nil { + log.Errorf("failed to get l2 block while monitoring new blocks: %v", err) + break + } + log.Debugf("[monitorNewL2Blocks] sending NewL2BlockEvent for block %v", block.NumberU64()) start := time.Now() s.newL2BlockEvents <- NewL2BlockEvent{ Block: *block, + Logs: logs, } lastL2BlockNumberSeen = block.NumberU64() - log.Debugf("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) + log.Infof("[monitorNewL2Blocks] NewL2BlockEvent for block %v took %v to be sent", block.NumberU64(), time.Since(start)) log.Infof("new l2 block detected: number %v, hash %v", block.NumberU64(), block.Hash().String()) } @@ -101,7 +109,7 @@ func (s *State) monitorNewL2Blocks() { func (s *State) handleEvents() { for newL2BlockEvent := range s.newL2BlockEvents { - log.Debugf("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) + log.Infof("[handleEvents] new l2 block event detected for block: %v", newL2BlockEvent.Block.NumberU64()) if len(s.newL2BlockEventHandlers) == 0 { continue } @@ -116,10 +124,10 @@ func (s *State) handleEvents() { log.Errorf("failed and recovered in NewL2BlockEventHandler: %v", r) } }() - log.Debugf("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) + log.Infof("[handleEvents] triggering new l2 block event handler for block: %v", e.Block.NumberU64()) start := time.Now() h(e) - log.Debugf("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start)) + log.Infof("[handleEvents] new l2 block event handler for block %v took %v to be executed", e.Block.NumberU64(), time.Since(start)) }(handler, newL2BlockEvent) } wg.Wait() diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index 9bf866c3c9..f3a8fff2e9 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -1487,7 +1487,7 @@ func (p *PostgresStorage) GetTransactionEGPLogByHash(ctx context.Context, transa // AddL2Block adds a new L2 block to the State Store func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2Block *types.Block, receipts []*types.Receipt, txsEGPData []StoreTxEGPData, dbTx pgx.Tx) error { - log.Debugf("[AddL2Block] adding l2 block: %v", l2Block.NumberU64()) + log.Infof("[AddL2Block] adding l2 block: %v", l2Block.NumberU64()) start := time.Now() e := p.getExecQuerier(dbTx) @@ -1562,7 +1562,7 @@ func (p *PostgresStorage) AddL2Block(ctx context.Context, batchNumber uint64, l2 } } } - log.Debugf("[AddL2Block] l2 block %v took %v to be added", l2Block.NumberU64(), time.Since(start)) + log.Infof("[AddL2Block] l2 block %v took %v to be added", l2Block.NumberU64(), time.Since(start)) return nil } @@ -1968,6 +1968,25 @@ func (p *PostgresStorage) IsL2BlockVirtualized(ctx context.Context, blockNumber return isVirtualized, nil } +// GetLogsByBlockNumber get all the logs from a specific block ordered by log index +func (p *PostgresStorage) GetLogsByBlockNumber(ctx context.Context, blockNumber uint64, dbTx pgx.Tx) ([]*types.Log, error) { + const query = ` + SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3 + FROM state.log l + INNER JOIN state.transaction t ON t.hash = l.tx_hash + INNER JOIN state.l2block b ON b.block_num = t.l2_block_num + WHERE b.block_num = $1 + ORDER BY l.log_index ASC` + + q := p.getExecQuerier(dbTx) + rows, err := q.Query(ctx, query, blockNumber) + if err != nil { + return nil, err + } + + return scanLogs(rows) +} + // GetLogs returns the logs that match the filter func (p *PostgresStorage) GetLogs(ctx context.Context, fromBlock uint64, toBlock uint64, addresses []common.Address, topics [][]common.Hash, blockHash *common.Hash, since *time.Time, dbTx pgx.Tx) ([]*types.Log, error) { // query parts