Skip to content

Commit

Permalink
Improve WS subscription (#2635)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos authored Oct 13, 2023
1 parent bc33559 commit 9ea4ab4
Show file tree
Hide file tree
Showing 16 changed files with 470 additions and 125 deletions.
125 changes: 96 additions & 29 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import (
"math/big"
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
"github.com/0xPolygonHermez/zkevm-node/jsonrpc/client"
Expand Down Expand Up @@ -822,7 +825,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
id, err := e.storage.NewBlockFilter(wsConn)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err, true)
Expand All @@ -841,7 +844,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *websocket.Conn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *atomic.Pointer[websocket.Conn], filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil

if shouldFilterByBlockRange {
Expand Down Expand Up @@ -883,7 +886,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error)
}

// internal
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *websocket.Conn) (interface{}, types.Error) {
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet")
// id, err := e.storage.NewPendingTransactionFilter(wsConn)
// if err != nil {
Expand Down Expand Up @@ -935,7 +938,6 @@ func (e *EthEndpoints) tryToAddTxToPool(input, ip string) (interface{}, types.Er
if err != nil {
return RPCErrorResponse(types.InvalidParamsErrorCode, "invalid tx input", err, false)
}

log.Infof("adding TX to the pool: %v", tx.Hash().Hex())
if err := e.pool.AddTx(context.Background(), *tx, ip); err != nil {
// it's not needed to log the error here, because we check and log if needed
Expand Down Expand Up @@ -1047,7 +1049,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error {
// The node will return a subscription id.
// For each event that matches the subscription a notification with relevant
// data is sent together with the subscription id.
func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter *LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) {
switch name {
case "newHeads":
return e.newBlockFilter(wsConn)
Expand Down Expand Up @@ -1075,43 +1077,104 @@ func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (int

// uninstallFilterByWSConn uninstalls the filters connected to the
// provided web socket connection
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error {
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error {
return e.storage.UninstallFilterByWSConn(wsConn)
}

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
e.processBlockFilters(event)
e.processLogFilters(event)
log.Debugf("[onNewL2Block] new l2 block event detected for block %v", event.Block.NumberU64())
wg := sync.WaitGroup{}

wg.Add(1)
go e.notifyNewHeads(&wg, event)

wg.Add(1)
go e.notifyNewLogs(&wg, event)

wg.Wait()
}

// processBlockFilters answer filters subscribed for block updates when a new l2 block event
// is detected
func (e *EthEndpoints) processBlockFilters(event state.NewL2BlockEvent) {
func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
} else {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
return
}
data, err := json.Marshal(b)
if err != nil {
log.Errorf("failed to marshal block response to subscription: %v", err)
return
}
for _, filter := range blockFilters {
b, err := types.NewBlock(&event.Block, nil, false, false)
if err != nil {
log.Errorf("failed to build block response to subscription: %v", err)
} else {
e.sendSubscriptionResponse(filter, b)
}
e.sendSubscriptionResponse(filter, data)
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %vms to send all the messages for block filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

// processLogFilters answer filters subscribed for log updates when a new l2 block event
// is detected
func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) {
func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockEvent) {
defer wg.Done()
start := time.Now()
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
changes, err := e.GetFilterChanges(filter.ID)
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

// if from and to blocks are new, set it to the current block to make
// the query faster
if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
// if the filter has a fromBlock value set
// and the event block number is smaller than the
// from block, skip this filter
if filterParameters.FromBlock != nil {
fromBlock, rpcErr := filterParameters.FromBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if fromBlock > event.Block.NumberU64() {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(fromBlock)
filterParameters.FromBlock = &fixedFromBlock
}

// if the filter has a toBlock value set
// and the event block number is greater than the
// to block, skip this filter
if filterParameters.ToBlock != nil {
toBlock, rpcErr := filterParameters.ToBlock.GetNumericBlockNumber(context.Background(), e.state, e.etherman, nil)
if rpcErr != nil {
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if toBlock > event.Block.NumberU64() {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(toBlock)
filterParameters.ToBlock = &fixedToBlock
}
}

// get new logs for this specific filter
changes, err := e.internalGetLogs(context.Background(), nil, filterParameters)
if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) {
log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err)
err := e.storage.UninstallFilter(filter.ID)
Expand All @@ -1126,39 +1189,43 @@ func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) {
continue
}

// if there are new logs for the filter, send it
if changes != nil {
ethLogs := changes.([]types.Log)
for _, ethLog := range ethLogs {
e.sendSubscriptionResponse(filter, ethLog)
data, err := json.Marshal(ethLog)
if err != nil {
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
e.sendSubscriptionResponse(filter, data)
}
}
}
}
log.Debugf("[notifyNewLogs] new l2 block event for block %v took %vms to send all the messages for log filters", event.Block.NumberU64(), time.Since(start).Milliseconds())
}

func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data interface{}) {
func (e *EthEndpoints) sendSubscriptionResponse(filter *Filter, data []byte) {
const errMessage = "Unable to write WS message to filter %v, %s"
result, err := json.Marshal(data)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
}

res := types.SubscriptionResponse{
JSONRPC: "2.0",
Method: "eth_subscription",
Params: types.SubscriptionResponseParams{
Subscription: filter.ID,
Result: result,
Result: data,
},
}
message, err := json.Marshal(res)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}

err = filter.WsConn.WriteMessage(websocket.TextMessage, message)
err = filter.WsConn.Load().WriteMessage(websocket.TextMessage, message)
if err != nil {
log.Errorf(fmt.Sprintf(errMessage, filter.ID, err.Error()))
return
}
log.Debugf("WS message sent: %v", string(message))
}
Loading

0 comments on commit 9ea4ab4

Please sign in to comment.