Skip to content

Commit

Permalink
replace atomic wsConn by concurrentWsConn (#2782) (#2790)
Browse files Browse the repository at this point in the history
  • Loading branch information
tclemos committed Nov 14, 2023
1 parent 59e43db commit 1a29bc7
Show file tree
Hide file tree
Showing 14 changed files with 211 additions and 144 deletions.
2 changes: 1 addition & 1 deletion jsonrpc/endpoints_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty

requests := make(chan (ethTypes.Receipt), bufferSize)

mu := sync.Mutex{}
mu := &sync.Mutex{}
wg := sync.WaitGroup{}
wg.Add(len(receipts))
responses := make([]traceResponse, 0, len(receipts))
Expand Down
43 changes: 26 additions & 17 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
Expand All @@ -21,7 +20,6 @@ import (
"github.com/0xPolygonHermez/zkevm-node/state/runtime"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -816,7 +814,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
func (e *EthEndpoints) newBlockFilter(wsConn *concurrentWsConn) (interface{}, types.Error) {
id, err := e.storage.NewBlockFilter(wsConn)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err, true)
Expand All @@ -835,7 +833,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *atomic.Pointer[websocket.Conn], filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *concurrentWsConn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
if filter.ShouldFilterByBlockRange() {
_, _, rpcErr := filter.GetNumericBlockNumbers(ctx, e.cfg, e.state, e.etherman, nil)
if rpcErr != nil {
Expand All @@ -861,7 +859,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error)
}

// internal
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *concurrentWsConn) (interface{}, types.Error) {
return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet")
// id, err := e.storage.NewPendingTransactionFilter(wsConn)
// if err != nil {
Expand Down Expand Up @@ -1024,7 +1022,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error {
// The node will return a subscription id.
// For each event that matches the subscription a notification with relevant
// data is sent together with the subscription id.
func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) Subscribe(wsConn *concurrentWsConn, name string, logFilter *LogFilter) (interface{}, types.Error) {
switch name {
case "newHeads":
return e.newBlockFilter(wsConn)
Expand All @@ -1046,13 +1044,13 @@ func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name st
}

// Unsubscribe uninstalls the filter based on the provided filterID
func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (interface{}, types.Error) {
func (e *EthEndpoints) Unsubscribe(wsConn *concurrentWsConn, filterID string) (interface{}, types.Error) {
return e.UninstallFilter(filterID)
}

// uninstallFilterByWSConn uninstalls the filters connected to the
// provided web socket connection
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error {
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error {
return e.storage.UninstallFilterByWSConn(wsConn)
}

Expand Down Expand Up @@ -1089,7 +1087,6 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block
}
for _, filter := range blockFilters {
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
Expand All @@ -1106,9 +1103,16 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

// if from and to blocks are new, set it to the current block to make
// the query faster
if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
if filterParameters.BlockHash != nil {
// if the filter block hash is set, we check if the block is the
// one with the expected hash, otherwise we ignore the filter
bh := *filterParameters.BlockHash
if bh.String() != event.Block.Hash().String() {
continue
}
} else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
// in case the block hash is nil and also from and to blocks are nil, set it
// to the current block to make the query faster
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
Expand All @@ -1121,12 +1125,15 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if fromBlock > event.Block.NumberU64() {
// if the block number is smaller than the fromBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() < fromBlock {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(fromBlock)
fixedFromBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.FromBlock = &fixedFromBlock
}

Expand All @@ -1139,12 +1146,15 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if toBlock > event.Block.NumberU64() {
// if the block number is greater than the toBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() > toBlock {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(toBlock)
fixedToBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.ToBlock = &fixedToBlock
}
}
Expand Down Expand Up @@ -1174,7 +1184,6 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
}
Expand Down
28 changes: 13 additions & 15 deletions jsonrpc/endpoints_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math/big"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -24,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -3630,7 +3628,7 @@ func TestNewFilter(t *testing.T) {
Once()

m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("1", nil).
Once()
},
Expand All @@ -3654,7 +3652,7 @@ func TestNewFilter(t *testing.T) {
Once()

m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("1", nil).
Once()
},
Expand Down Expand Up @@ -3717,7 +3715,7 @@ func TestNewFilter(t *testing.T) {
Return(m.DbTx, nil).
Once()
m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("", errors.New("failed to add new filter")).
Once()
},
Expand Down Expand Up @@ -3768,7 +3766,7 @@ func TestNewBlockFilter(t *testing.T) {
ExpectedError: nil,
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("1", nil).
Once()
},
Expand All @@ -3779,7 +3777,7 @@ func TestNewBlockFilter(t *testing.T) {
ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"),
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("", errors.New("failed to add new block filter")).
Once()
},
Expand Down Expand Up @@ -3828,9 +3826,9 @@ func TestNewPendingTransactionFilter(t *testing.T) {
// Name: "New pending transaction filter created successfully",
// ExpectedResult: "1",
// ExpectedError: nil,
// SetupMocks: func(m *mocks, tc testCase) {
// SetupMocks: func(m *mocksWrapper, tc testCase) {
// m.Storage.
// On("NewPendingTransactionFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
// On("NewPendingTransactionFilter", mock.IsType(&concurrentWsConn{})).
// Return("1", nil).
// Once()
// },
Expand All @@ -3839,9 +3837,9 @@ func TestNewPendingTransactionFilter(t *testing.T) {
// Name: "failed to create new pending transaction filter",
// ExpectedResult: "",
// ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new pending transaction filter"),
// SetupMocks: func(m *mocks, tc testCase) {
// SetupMocks: func(m *mocksWrapper, tc testCase) {
// m.Storage.
// On("NewPendingTransactionFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
// On("NewPendingTransactionFilter", mock.IsType(&concurrentWsConn{})).
// Return("", errors.New("failed to add new pending transaction filter")).
// Once()
// },
Expand Down Expand Up @@ -4898,7 +4896,7 @@ func TestSubscribeNewHeads(t *testing.T) {
Name: "Subscribe to new heads Successfully",
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("0x1", nil).
Once()
},
Expand All @@ -4908,7 +4906,7 @@ func TestSubscribeNewHeads(t *testing.T) {
ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"),
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("", fmt.Errorf("failed to add filter to storage")).
Once()
},
Expand Down Expand Up @@ -4976,7 +4974,7 @@ func TestSubscribeNewLogs(t *testing.T) {
Once()

m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("0x1", nil).
Once()
},
Expand All @@ -5001,7 +4999,7 @@ func TestSubscribeNewLogs(t *testing.T) {
Once()

m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("", fmt.Errorf("failed to add filter to storage")).
Once()
},
Expand Down
10 changes: 4 additions & 6 deletions jsonrpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"net/http"
"reflect"
"strings"
"sync/atomic"
"unicode"

"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/gorilla/websocket"
)

const (
Expand All @@ -36,7 +34,7 @@ func (f *funcData) numParams() int {

type handleRequest struct {
types.Request
wsConn *atomic.Pointer[websocket.Conn]
wsConn *concurrentWsConn
HttpRequest *http.Request
}

Expand Down Expand Up @@ -93,7 +91,7 @@ func (h *Handler) Handle(req handleRequest) types.Response {
firstFuncParamIsWebSocketConn := false
firstFuncParamIsHttpRequest := false
if funcHasMoreThanOneInputParams {
firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&atomic.Pointer[websocket.Conn]{}))
firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&concurrentWsConn{}))
firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{}))
}
if requestHasWebSocketConn && firstFuncParamIsWebSocketConn {
Expand Down Expand Up @@ -143,7 +141,7 @@ func (h *Handler) Handle(req handleRequest) types.Response {
}

// HandleWs handle websocket requests
func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn], httpReq *http.Request) ([]byte, error) {
func (h *Handler) HandleWs(reqBody []byte, wsConn *concurrentWsConn, httpReq *http.Request) ([]byte, error) {
log.Debugf("WS message received: %v", string(reqBody))
var req types.Request
if err := json.Unmarshal(reqBody, &req); err != nil {
Expand All @@ -160,7 +158,7 @@ func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn
}

// RemoveFilterByWsConn uninstalls the filter attached to this websocket connection
func (h *Handler) RemoveFilterByWsConn(wsConn *atomic.Pointer[websocket.Conn]) {
func (h *Handler) RemoveFilterByWsConn(wsConn *concurrentWsConn) {
service, ok := h.serviceMap[APIEth]
if !ok {
return
Expand Down
14 changes: 4 additions & 10 deletions jsonrpc/interfaces.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,14 @@
package jsonrpc

import (
"sync/atomic"

"github.com/gorilla/websocket"
)

// storageInterface json rpc internal storage to persist data
type storageInterface interface {
GetAllBlockFiltersWithWSConn() ([]*Filter, error)
GetAllLogFiltersWithWSConn() ([]*Filter, error)
GetFilter(filterID string) (*Filter, error)
NewBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error)
NewLogFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (string, error)
NewPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (string, error)
NewBlockFilter(wsConn *concurrentWsConn) (string, error)
NewLogFilter(wsConn *concurrentWsConn, filter LogFilter) (string, error)
NewPendingTransactionFilter(wsConn *concurrentWsConn) (string, error)
UninstallFilter(filterID string) error
UninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error
UninstallFilterByWSConn(wsConn *concurrentWsConn) error
UpdateFilterLastPoll(filterID string) error
}
Loading

0 comments on commit 1a29bc7

Please sign in to comment.