Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

replace atomic wsConn by concurrentWsConn #2782

Merged
merged 8 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions db/migrations/state/0011.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- +migrate Up
CREATE INDEX IF NOT EXISTS l2block_created_at_idx ON state.l2block (created_at);

CREATE INDEX IF NOT EXISTS log_log_index_idx ON state.log (log_index);
CREATE INDEX IF NOT EXISTS log_topic0_idx ON state.log (topic0);
CREATE INDEX IF NOT EXISTS log_topic1_idx ON state.log (topic1);
CREATE INDEX IF NOT EXISTS log_topic2_idx ON state.log (topic2);
CREATE INDEX IF NOT EXISTS log_topic3_idx ON state.log (topic3);

ALTER TABLE state.transaction ADD COLUMN egp_log JSONB;

-- +migrate Down
DROP INDEX IF EXISTS state.l2block_created_at_idx;

DROP INDEX IF EXISTS state.log_log_index_idx;
DROP INDEX IF EXISTS state.log_topic0_idx;
DROP INDEX IF EXISTS state.log_topic1_idx;
DROP INDEX IF EXISTS state.log_topic2_idx;
DROP INDEX IF EXISTS state.log_topic3_idx;

ALTER TABLE state.transaction DROP COLUMN egp_log;
73 changes: 73 additions & 0 deletions db/migrations/state/0011_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package migrations_test

import (
"database/sql"
"testing"

"github.com/stretchr/testify/assert"
)

// this migration changes length of the token name
type migrationTest0011 struct{}

func (m migrationTest0011) InsertData(db *sql.DB) error {
return nil
}

func (m migrationTest0011) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
indexes := []string{
"l2block_created_at_idx",
"log_log_index_idx",
"log_topic0_idx",
"log_topic1_idx",
"log_topic2_idx",
"log_topic3_idx",
}
// Check indexes adding
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)
}

// Check column egp_log exists in state.transactions table
const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'`
row := db.QueryRow(getFinalDeviationColumn)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)
}

func (m migrationTest0011) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
indexes := []string{
"l2block_created_at_idx",
"log_log_index_idx",
"log_topic0_idx",
"log_topic1_idx",
"log_topic2_idx",
"log_topic3_idx",
}
// Check indexes removing
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}

// Check column egp_log doesn't exists in state.transactions table
const getFinalDeviationColumn = `SELECT count(*) FROM information_schema.columns WHERE table_name='transaction' and column_name='egp_log'`
row := db.QueryRow(getFinalDeviationColumn)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}

func TestMigration0011(t *testing.T) {
runMigrationTest(t, 11, migrationTest0011{})
}
2 changes: 1 addition & 1 deletion jsonrpc/endpoints_debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func (d *DebugEndpoints) TraceBatchByNumber(httpRequest *http.Request, number ty

buffer := make(chan byte, bufferSize)

mu := sync.Mutex{}
mu := &sync.Mutex{}
agnusmor marked this conversation as resolved.
Show resolved Hide resolved
wg := sync.WaitGroup{}
wg.Add(len(receipts))
responses := make([]traceResponse, 0, len(receipts))
Expand Down
43 changes: 26 additions & 17 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"net/http"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/0xPolygonHermez/zkevm-node/hex"
Expand All @@ -21,7 +20,6 @@ import (
"github.com/0xPolygonHermez/zkevm-node/state/runtime"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v4"
)

Expand Down Expand Up @@ -793,7 +791,7 @@ func (e *EthEndpoints) NewBlockFilter() (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newBlockFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
func (e *EthEndpoints) newBlockFilter(wsConn *concurrentWsConn) (interface{}, types.Error) {
id, err := e.storage.NewBlockFilter(wsConn)
if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to create new block filter", err)
Expand All @@ -810,7 +808,7 @@ func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
}

// internal
func (e *EthEndpoints) newFilter(wsConn *atomic.Pointer[websocket.Conn], filter LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(wsConn *concurrentWsConn, filter LogFilter) (interface{}, types.Error) {
id, err := e.storage.NewLogFilter(wsConn, filter)
if errors.Is(err, ErrFilterInvalidPayload) {
return RPCErrorResponse(types.InvalidParamsErrorCode, err.Error(), nil)
Expand All @@ -829,7 +827,7 @@ func (e *EthEndpoints) NewPendingTransactionFilter() (interface{}, types.Error)
}

// internal
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *atomic.Pointer[websocket.Conn]) (interface{}, types.Error) {
func (e *EthEndpoints) newPendingTransactionFilter(wsConn *concurrentWsConn) (interface{}, types.Error) {
return nil, types.NewRPCError(types.DefaultErrorCode, "not supported yet")
// id, err := e.storage.NewPendingTransactionFilter(wsConn)
// if err != nil {
Expand Down Expand Up @@ -991,7 +989,7 @@ func (e *EthEndpoints) updateFilterLastPoll(filterID string) types.Error {
// The node will return a subscription id.
// For each event that matches the subscription a notification with relevant
// data is sent together with the subscription id.
func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name string, logFilter *LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) Subscribe(wsConn *concurrentWsConn, name string, logFilter *LogFilter) (interface{}, types.Error) {
switch name {
case "newHeads":
return e.newBlockFilter(wsConn)
Expand All @@ -1011,13 +1009,13 @@ func (e *EthEndpoints) Subscribe(wsConn *atomic.Pointer[websocket.Conn], name st
}

// Unsubscribe uninstalls the filter based on the provided filterID
func (e *EthEndpoints) Unsubscribe(wsConn *websocket.Conn, filterID string) (interface{}, types.Error) {
func (e *EthEndpoints) Unsubscribe(wsConn *concurrentWsConn, filterID string) (interface{}, types.Error) {
return e.UninstallFilter(filterID)
}

// uninstallFilterByWSConn uninstalls the filters connected to the
// provided web socket connection
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *atomic.Pointer[websocket.Conn]) error {
func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *concurrentWsConn) error {
return e.storage.UninstallFilterByWSConn(wsConn)
}

Expand Down Expand Up @@ -1054,7 +1052,6 @@ func (e *EthEndpoints) notifyNewHeads(wg *sync.WaitGroup, event state.NewL2Block
}
for _, filter := range blockFilters {
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
log.Debugf("[notifyNewHeads] new l2 block event for block %v took %v to send all the messages for block filters", event.Block.NumberU64(), time.Since(start))
Expand All @@ -1071,9 +1068,16 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
filterParameters := filter.Parameters.(LogFilter)
bn := types.BlockNumber(event.Block.NumberU64())

// if from and to blocks are nil, set it to the current block to make
// the query faster
if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
if filterParameters.BlockHash != nil {
// if the filter block hash is set, we check if the block is the
// one with the expected hash, otherwise we ignore the filter
bh := *filterParameters.BlockHash
if bh.String() != event.Block.Hash().String() {
continue
}
} else if filterParameters.FromBlock == nil && filterParameters.ToBlock == nil {
// in case the block hash is nil and also from and to blocks are nil, set it
// to the current block to make the query faster
filterParameters.FromBlock = &bn
filterParameters.ToBlock = &bn
} else {
Expand All @@ -1086,12 +1090,15 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if fromBlock > event.Block.NumberU64() {
// if the block number is smaller than the fromBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() < fromBlock {
continue
}
// otherwise set the from block to a fixed number
// to avoid querying it again in the next step
fixedFromBlock := types.BlockNumber(fromBlock)
fixedFromBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.FromBlock = &fixedFromBlock
}

Expand All @@ -1104,12 +1111,15 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
log.Errorf(rpcErr.Error(), filter.ID, err)
continue
}
if toBlock > event.Block.NumberU64() {
// if the block number is greater than the toBlock value
// this means this block is out of the block range for this
// filter, so we skip it
if event.Block.NumberU64() > toBlock {
continue
}
// otherwise set the to block to a fixed number
// to avoid querying it again in the next step
fixedToBlock := types.BlockNumber(toBlock)
fixedToBlock := types.BlockNumber(event.Block.NumberU64())
filterParameters.ToBlock = &fixedToBlock
}
}
Expand All @@ -1130,7 +1140,6 @@ func (e *EthEndpoints) notifyNewLogs(wg *sync.WaitGroup, event state.NewL2BlockE
log.Errorf("failed to marshal ethLog response to subscription: %v", err)
}
filter.EnqueueSubscriptionDataToBeSent(data)
go filter.SendEnqueuedSubscriptionData()
}
}
}
Expand Down
20 changes: 9 additions & 11 deletions jsonrpc/endpoints_eth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"fmt"
"math/big"
"strings"
"sync/atomic"
"testing"
"time"

Expand All @@ -24,7 +23,6 @@ import (
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/rpc"
"github.com/ethereum/go-ethereum/trie"
"github.com/gorilla/websocket"
"github.com/jackc/pgx/v4"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
Expand Down Expand Up @@ -3558,7 +3556,7 @@ func TestNewFilter(t *testing.T) {
ExpectedError: nil,
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("1", nil).
Once()
},
Expand All @@ -3572,7 +3570,7 @@ func TestNewFilter(t *testing.T) {
ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new log filter"),
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("", errors.New("failed to add new filter")).
Once()
},
Expand All @@ -3587,7 +3585,7 @@ func TestNewFilter(t *testing.T) {
ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "invalid argument 0: cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other"),
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Once().
Return("", ErrFilterInvalidPayload).
Once()
Expand Down Expand Up @@ -3639,7 +3637,7 @@ func TestNewBlockFilter(t *testing.T) {
ExpectedError: nil,
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("1", nil).
Once()
},
Expand All @@ -3650,7 +3648,7 @@ func TestNewBlockFilter(t *testing.T) {
ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"),
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("", errors.New("failed to add new block filter")).
Once()
},
Expand Down Expand Up @@ -4717,7 +4715,7 @@ func TestSubscribeNewHeads(t *testing.T) {
Name: "Subscribe to new heads Successfully",
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("0x1", nil).
Once()
},
Expand All @@ -4727,7 +4725,7 @@ func TestSubscribeNewHeads(t *testing.T) {
ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new block filter"),
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewBlockFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{})).
On("NewBlockFilter", mock.IsType(&concurrentWsConn{})).
Return("", fmt.Errorf("failed to add filter to storage")).
Once()
},
Expand Down Expand Up @@ -4785,7 +4783,7 @@ func TestSubscribeNewLogs(t *testing.T) {
},
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("0x1", nil).
Once()
},
Expand All @@ -4800,7 +4798,7 @@ func TestSubscribeNewLogs(t *testing.T) {
},
SetupMocks: func(m *mocksWrapper, tc testCase) {
m.Storage.
On("NewLogFilter", mock.IsType(&atomic.Pointer[websocket.Conn]{}), mock.IsType(LogFilter{})).
On("NewLogFilter", mock.IsType(&concurrentWsConn{}), mock.IsType(LogFilter{})).
Return("", fmt.Errorf("failed to add filter to storage")).
Once()
},
Expand Down
10 changes: 4 additions & 6 deletions jsonrpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import (
"net/http"
"reflect"
"strings"
"sync/atomic"
"unicode"

"github.com/0xPolygonHermez/zkevm-node/jsonrpc/types"
"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/gorilla/websocket"
)

const (
Expand All @@ -36,7 +34,7 @@ func (f *funcData) numParams() int {

type handleRequest struct {
types.Request
wsConn *atomic.Pointer[websocket.Conn]
wsConn *concurrentWsConn
HttpRequest *http.Request
}

Expand Down Expand Up @@ -93,7 +91,7 @@ func (h *Handler) Handle(req handleRequest) types.Response {
firstFuncParamIsWebSocketConn := false
firstFuncParamIsHttpRequest := false
if funcHasMoreThanOneInputParams {
firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&atomic.Pointer[websocket.Conn]{}))
firstFuncParamIsWebSocketConn = fd.reqt[1].AssignableTo(reflect.TypeOf(&concurrentWsConn{}))
firstFuncParamIsHttpRequest = fd.reqt[1].AssignableTo(reflect.TypeOf(&http.Request{}))
}
if requestHasWebSocketConn && firstFuncParamIsWebSocketConn {
Expand Down Expand Up @@ -143,7 +141,7 @@ func (h *Handler) Handle(req handleRequest) types.Response {
}

// HandleWs handle websocket requests
func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn], httpReq *http.Request) ([]byte, error) {
func (h *Handler) HandleWs(reqBody []byte, wsConn *concurrentWsConn, httpReq *http.Request) ([]byte, error) {
log.Debugf("WS message received: %v", string(reqBody))
var req types.Request
if err := json.Unmarshal(reqBody, &req); err != nil {
Expand All @@ -160,7 +158,7 @@ func (h *Handler) HandleWs(reqBody []byte, wsConn *atomic.Pointer[websocket.Conn
}

// RemoveFilterByWsConn uninstalls the filter attached to this websocket connection
func (h *Handler) RemoveFilterByWsConn(wsConn *atomic.Pointer[websocket.Conn]) {
func (h *Handler) RemoveFilterByWsConn(wsConn *concurrentWsConn) {
service, ok := h.serviceMap[APIEth]
if !ok {
return
Expand Down
Loading
Loading