Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve WS subscription #2635

Merged
merged 12 commits into from
Oct 13, 2023
125 changes: 96 additions & 29 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -822,7 +825,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
id, err := e.storage.NewBlockFilter(wsConn)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err, true)
Expand All @@ -841,7 +844,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *websocket.Conn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *atomic.Pointer[websocket.Conn], filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil

if shouldFilterByBlockRange {
Expand Down Expand Up @@ -883,7 +886,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error)
}

// internal
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet")
// id, err := e.storage.NewPendingTransactionFilter(wsConn)
// if err != nil {
Expand Down Expand Up @@ -935,7 +938,6 @@ func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, types.Er
if err != nil {
return RPCErrorResponse(types.InvalidParamsErrorCode, "invalid tx input", err, false)
}

log.Infof("adding TX to the pool: %v", tx.Hash().Hex())
if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil {
// it's not needed to log the error here, because we check and log if needed
Expand Down Expand Up @@ -1047,7 +1049,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error {
// The node will return a subscription id.
// For each event that matches the subscription a notification with relevant
// data is sent together with the subscription id.
func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter *LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) {
switch name {
case "newHeads":
return e.newBlockFilter(wsConn)
Expand Down Expand Up @@ -1075,43 +1077,104 @@ func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (int

// uninstallFilterByWSConn uninstalls the filters connected to the
// provided web socket connection
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error {
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error {
return e.storage.UninstallFilterByWSConn(wsConn)
}

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
e.processBlockFilters(event)
e.processLogFilters(event)
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
wg := sync.WaitGroup{}

wg.Add(1)
go e.notifyNewHeads(&wg, event)

wg.Add(1)
go e.notifyNewLogs(&wg, event)

wg.Wait()
}

// processBlockFilters answer filters subscribed for block updates when a new l2 block event
// is detected
func (e *EthEndpoints) processBlockFilters(event state.NewL2BlockEvent) {
func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
} else {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
tclemos marked this conversation as resolved.
Show resolved Hide resolved
return
}
for _, filter := range blockFilters {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
} else {
e.sendSubscriptionResponse(filter, b)
}
e.sendSubscriptionResponse(filter, data)
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

// processLogFilters answer filters subscribed for log updates when a new l2 block event
// is detected
func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) {
func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
changes, err := e.GetFilterChanges(filter.ID)
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

// if from and to blocks are new, set it to the current block to make
// the query faster
if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if filterParameters.FromBlock != nil {
fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if fromBlock > event.Block.NumberU64() {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(fromBlock)
filterParameters.FromBlock = &fixedFromBlock
}

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if filterParameters.ToBlock != nil {
toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if toBlock > event.Block.NumberU64() {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(toBlock)
filterParameters.ToBlock = &fixedToBlock
}
}

// get new logs for this specific filter
changes, err := e.internalGetLogs(context.Background(), nil, filterParameters)
if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) {
log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err)
err := e.storage.UninstallFilter(filter.ID)
Expand All @@ -1126,39 +1189,43 @@ func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) {
continue
}

// if there are new logs for the filter, send it
if changes != nil {
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
e.sendSubscriptionResponse(filter, ethLog)
data, err := json.Marshal(ethLog)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
e.sendSubscriptionResponse(filter, data)
}
}
}
}
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) {
func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"
result, err := json.Marshal(data)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}

res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: filter.ID,
Result: result,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}

err = filter.WsConn.WriteMessage(websocket.TextMessage, message)
err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
}
Loading