Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherry pick the latest WS improvements into v0.4.0 #2800

Merged
merged 9 commits into from
Nov 16, 2023
319 changes: 220 additions & 99 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ const (
// to communicate with the state for eth_EstimateGas and eth_Call when
// the From field is not specified because it is optional
DefaultSenderAddress = "0x1111111111111111111111111111111111111111"

// maxTopics is the max number of topics a log can have
maxTopics = 4
)

// EthEndpoints contains implementations for the "eth" RPC endpoints
Expand Down Expand Up @@ -1056,7 +1059,8 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error {

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
log.Infof("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
start := time.Now()
wg := sync.WaitGroup{}

wg.Add(1)
Expand All @@ -1066,127 +1070,244 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
go e.notifyNewLogs(&wg, event)

wg.Wait()
log.Infof("[onNewL2Block] new l2 block %v took %v to send the messages to all ws connections", event.Block.NumberU64(), time.Since(start))
}

func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()

b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
} else {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}
for _, filter := range blockFilters {
filter.EnqueueSubscriptionDataToBeSent(data)
}
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))

filters := e.storage.GetAllBlockFiltersWithWSConn()
log.Infof("[notifyNewHeads] took %v to get block filters with ws connections", time.Since(start))

const maxWorkers = 32
parallelize(maxWorkers, filters, func(worker int, filters []*Filter) {
for _, filter := range filters {
f := filter
start := time.Now()
f.EnqueueSubscriptionDataToBeSent(data)
log.Infof("[notifyNewHeads] took %v to enqueue new l2 block messages", time.Since(start))
}
})

log.Infof("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
}

func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

if filterParameters.BlockHash != nil {
// if the filter block hash is set, we check if the block is the
// one with the expected hash, otherwise we ignore the filter
bh := *filterParameters.BlockHash
if bh.String() != event.Block.Hash().String() {
continue
}
} else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
// in case the block hash is nil and also from and to blocks are nil, set it
// to the current block to make the query faster
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if filterParameters.FromBlock != nil {
fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
// if the block number is smaller than the fromBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() < fromBlock {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.FromBlock = &fixedFromBlock
}
filters := e.storage.GetAllLogFiltersWithWSConn()
log.Infof("[notifyNewLogs] took %v to get log filters with ws connections", time.Since(start))

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if filterParameters.ToBlock != nil {
toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
// if the block number is greater than the toBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() > toBlock {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.ToBlock = &fixedToBlock
}
const maxWorkers = 32
parallelize(maxWorkers, filters, func(worker int, filters []*Filter) {
for _, filter := range filters {
f := filter
start := time.Now()
if e.shouldSkipLogFilter(event, filter) {
return
}
log.Infof("[notifyNewLogs] took %v to check if should skip log filter", time.Since(start))

start = time.Now()
// get new logs for this specific filter
changes, err := e.internalGetLogs(context.Background(), nil, filterParameters)
if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) {
log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err)
err := e.storage.UninstallFilter(filter.ID)
if !errors.Is(err, ErrNotFound) && err != nil {
log.Errorf("failed to automatically uninstall filter %v: %v", filter.ID, err)
} else {
log.Infof("Filter %v automatically uninstalled", filter.ID)
logs := filterLogs(event.Logs, filter)
log.Infof("[notifyNewLogs] took %v to filter logs", time.Since(start))

start = time.Now()
for _, l := range logs {
data, err := json.Marshal(l)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
continue
} else if err != nil {
log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err)
f.EnqueueSubscriptionDataToBeSent(data)
}
log.Infof("[notifyNewLogs] took %v to enqueue log messages", time.Since(start))
}
})

log.Infof("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start))
}

// shouldSkipLogFilter checks if the log filter can be skipped while notifying new logs.
// it checks the log filter information against the block in the event to decide if the
// information in the event is required by the filter or can be ignored to save resources.
func (e *EthEndpoints) shouldSkipLogFilter(event state.NewL2BlockEvent, filter *Filter) bool {
logFilter := filter.Parameters.(LogFilter)

if logFilter.BlockHash != nil {
// if the filter block hash is set, we check if the block is the
// one with the expected hash, otherwise we ignore the filter
bh := *logFilter.BlockHash
if bh.String() != event.Block.Hash().String() {
return true
}
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if logFilter.FromBlock != nil {
fromBlock, rpcErr := logFilter.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf("failed to get numeric block number for FromBlock field for filter %v: %v", filter.ID, rpcErr)
return true
}
// if the block number is smaller than the fromBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() < fromBlock {
return true
}
}

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if logFilter.ToBlock != nil {
toBlock, rpcErr := logFilter.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf("failed to get numeric block number for ToBlock field for filter %v: %v", filter.ID, rpcErr)
return true
}
// if the block number is greater than the toBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() > toBlock {
return true
}
}
}
return false
}

// filterLogs will filter the provided logsToFilter accordingly to the filters provided
func filterLogs(logsToFilter []*ethTypes.Log, filter *Filter) []types.Log {
logFilter := filter.Parameters.(LogFilter)

logs := make([]types.Log, 0)
for _, l := range logsToFilter {
// check address filter
if len(logFilter.Addresses) > 0 {
// if the log address doesn't match any address in the filter, skip this log
if !contains(logFilter.Addresses, l.Address) {
continue
}
}

// if there are new logs for the filter, send it
if changes != nil {
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
data, err := json.Marshal(ethLog)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
filter.EnqueueSubscriptionDataToBeSent(data)
// check topics
match := true
if len(logFilter.Topics) > 0 {
out:
// check all topics
for i := 0; i < maxTopics; i++ {
// check if the filter contains information
// to filter this topic position
checkTopic := len(logFilter.Topics) > i
if !checkTopic {
// if we shouldn't check this topic, we can assume
// no more topics needs to be checked, because there
// will be no more topic filters, so we can break out
break out
}

// check if the topic filter allows any topic
acceptAnyTopic := len(logFilter.Topics[i]) == 0
if acceptAnyTopic {
// since any topic is allowed, we continue to the next topic filters
continue
}

// check if the log has the required topic set
logHasTopic := len(l.Topics) > i
if !logHasTopic {
// if the log doesn't have the required topic set, skip this log
match = false
break out
}

// check if the any topic in the filter matches the log topic
if !contains(logFilter.Topics[i], l.Topics[i]) {
match = false
// if the log topic doesn't match any topic in the filter, skip this log
break out
}
}
}
if match {
logs = append(logs, types.NewLog(*l))
}
}
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %v to send all the messages for log filters", event.Block.NumberU64(), time.Since(start))
return logs
}

// contains check if the item can be found in the items
func contains[T comparable](items []T, itemsToFind T) bool {
for _, item := range items {
if item == itemsToFind {
return true
}
}
return false
}

// parallelize split the items into workers accordingly
// to the max number of workers and the number of items,
// allowing the fn to be executed in concurrently for different
// chunks of items.
func parallelize[T any](maxWorkers int, items []T, fn func(worker int, items []T)) {
if len(items) == 0 {
return
}

var workersCount = maxWorkers
if workersCount > len(items) {
workersCount = len(items)
}

var jobSize = len(items) / workersCount
var rest = len(items) % workersCount
if rest > 0 {
jobSize++
}

wg := sync.WaitGroup{}
for worker := 0; worker < workersCount; worker++ {
rangeStart := worker * jobSize
rangeEnd := ((worker + 1) * jobSize)

if rangeStart > len(items) {
continue
}

if rangeEnd > len(items) {
rangeEnd = len(items)
}

jobItems := items[rangeStart:rangeEnd]

wg.Add(1)
go func(worker int, filteredItems []T, fn func(worker int, items []T)) {
defer func() {
wg.Done()
err := recover()
if err != nil {
fmt.Println(err)
}
}()
fn(worker, filteredItems)
}(worker, jobItems, fn)
}
wg.Wait()
}
Loading