Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/jrpc log limit #2572

Merged
merged 16 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/dumpstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func dumpState(ctx *cli.Context) error {
if err != nil {
return err
}
stateDB := state.NewPostgresStorage(stateSqlDB)
stateDB := state.NewPostgresStorage(state.Config{}, stateSqlDB)

dump := dumpedState{
Description: description,
Expand Down
4 changes: 3 additions & 1 deletion cmd/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func waitSignal(cancelFuncs []context.CancelFunc) {
}

func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool, eventLog *event.EventLog, needsExecutor, needsStateTree bool) *state.State {
stateDb := state.NewPostgresStorage(sqlDB)
stateDb := state.NewPostgresStorage(c.State, sqlDB)

// Executor
var executorClient executor.ExecutorServiceClient
Expand All @@ -487,6 +487,8 @@ func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDInt
WaitOnResourceExhaustion: c.Executor.WaitOnResourceExhaustion,
ForkUpgradeBatchNumber: c.ForkUpgradeBatchNumber,
ForkUpgradeNewForkId: c.ForkUpgradeNewForkId,
MaxLogsCount: c.RPC.MaxLogsCount,
MaxLogsBlockRange: c.RPC.MaxLogsBlockRange,
}

st := state.NewState(stateCfg, stateDb, executorClient, stateTree, eventLog)
Expand Down
8 changes: 8 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,14 @@ func Test_Defaults(t *testing.T) {
path: "RPC.BatchRequestsLimit",
expectedValue: uint(20),
},
{
path: "RPC.MaxLogsCount",
expectedValue: uint64(10000),
},
{
path: "RPC.MaxLogsBlockRange",
expectedValue: uint64(10000),
},
{
path: "RPC.WebSockets.Enabled",
expectedValue: true,
Expand Down
2 changes: 2 additions & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ SequencerNodeURI = ""
EnableL2SuggestedGasPricePolling = true
BatchRequestsEnabled = false
BatchRequestsLimit = 20
MaxLogsCount = 10000
MaxLogsBlockRange = 10000
[RPC.WebSockets]
Enabled = true
Host = "0.0.0.0"
Expand Down
17 changes: 17 additions & 0 deletions db/migrations/state/0011.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
-- +migrate Up
CREATE INDEX IF NOT EXISTS l2block_created_at_idx ON state.l2block (created_at);

CREATE INDEX IF NOT EXISTS log_log_index_idx ON state.log (log_index);
CREATE INDEX IF NOT EXISTS log_topic0_idx ON state.log (topic0);
CREATE INDEX IF NOT EXISTS log_topic1_idx ON state.log (topic1);
CREATE INDEX IF NOT EXISTS log_topic2_idx ON state.log (topic2);
CREATE INDEX IF NOT EXISTS log_topic3_idx ON state.log (topic3);

-- +migrate Down
DROP INDEX IF EXISTS state.l2block_created_at_idx;

DROP INDEX IF EXISTS state.log_log_index_idx;
DROP INDEX IF EXISTS state.log_topic0_idx;
DROP INDEX IF EXISTS state.log_topic1_idx;
DROP INDEX IF EXISTS state.log_topic2_idx;
DROP INDEX IF EXISTS state.log_topic3_idx;
59 changes: 59 additions & 0 deletions db/migrations/state/0011_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package migrations_test

import (
"database/sql"
"testing"

"github.com/stretchr/testify/assert"
)

// this migration changes length of the token name
type migrationTest0011 struct{}

func (m migrationTest0011) InsertData(db *sql.DB) error {
return nil
}

func (m migrationTest0011) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) {
indexes := []string{
"l2block_created_at_idx",
"log_log_index_idx",
"log_topic0_idx",
"log_topic1_idx",
"log_topic2_idx",
"log_topic3_idx",
}
// Check indexes adding
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 1, result)
}
}

func (m migrationTest0011) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) {
indexes := []string{
"l2block_created_at_idx",
"log_log_index_idx",
"log_topic0_idx",
"log_topic1_idx",
"log_topic2_idx",
"log_topic3_idx",
}
// Check indexes removing
for _, idx := range indexes {
// getIndex
const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;`
row := db.QueryRow(getIndex, idx)
var result int
assert.NoError(t, row.Scan(&result))
assert.Equal(t, 0, result)
}
}

func TestMigration0011(t *testing.T) {
runMigrationTest(t, 11, migrationTest0011{})
}
4 changes: 2 additions & 2 deletions docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

114 changes: 89 additions & 25 deletions docs/config-file/node-config-doc.md

Large diffs are not rendered by default.

20 changes: 20 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,16 @@
"maxItems": 20,
"minItems": 20,
"description": "L2Coinbase defines which address is going to receive the fees"
},
"MaxLogsCount": {
"type": "integer",
"description": "MaxLogsCount is a configuration to set the max number of logs that can be returned\nin a single call to the state, if zero it means no limit",
"default": 10000
},
"MaxLogsBlockRange": {
"type": "integer",
"description": "MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs\nlogs in a single call to the state, if zero it means no limit",
"default": 10000
}
},
"additionalProperties": false,
Expand Down Expand Up @@ -1457,6 +1467,16 @@
"additionalProperties": false,
"type": "object",
"description": "Configuration for the batch constraints"
},
"MaxLogsCount": {
"type": "integer",
"description": "MaxLogsCount is a configuration to set the max number of logs that can be returned\nin a single call to the state, if zero it means no limit",
"default": 0
},
"MaxLogsBlockRange": {
"type": "integer",
"description": "MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs\nlogs in a single call to the state, if zero it means no limit",
"default": 0
}
},
"additionalProperties": false,
Expand Down
8 changes: 8 additions & 0 deletions jsonrpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,14 @@ type Config struct {

// L2Coinbase defines which address is going to receive the fees
L2Coinbase common.Address

// MaxLogsCount is a configuration to set the max number of logs that can be returned
// in a single call to the state, if zero it means no limit
MaxLogsCount uint64 `mapstructure:"MaxLogsCount"`

// MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs
// logs in a single call to the state, if zero it means no limit
MaxLogsBlockRange uint64 `mapstructure:"MaxLogsBlockRange"`
}

// WebSocketsConfig has parameters to config the rpc websocket support
Expand Down
71 changes: 62 additions & 9 deletions jsonrpc/endpoints_eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,7 +499,13 @@ func (e *EthEndpoints) internalGetLogs(ctx context.Context, dbTx pgx.Tx, filter
}

logs, err := e.state.GetLogs(ctx, fromBlock, toBlock, filter.Addresses, filter.Topics, filter.BlockHash, filter.Since, dbTx)
if err != nil {
if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) {
errMsg := fmt.Sprintf(state.ErrMaxLogsCountLimitExceeded.Error(), e.cfg.MaxLogsCount)
return RPCErrorResponse(types.InvalidParamsErrorCode, errMsg, nil, false)
} else if errors.Is(err, state.ErrMaxLogsBlockRangeLimitExceeded) {
errMsg := fmt.Sprintf(state.ErrMaxLogsBlockRangeLimitExceeded.Error(), e.cfg.MaxLogsBlockRange)
return RPCErrorResponse(types.InvalidParamsErrorCode, errMsg, nil, false)
} else if err != nil {
return RPCErrorResponse(types.DefaultErrorCode, "failed to get logs from state", err, true)
}

Expand Down Expand Up @@ -829,11 +835,36 @@ func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, type
// to notify when the state changes (logs). To check if the state
// has changed, call eth_getFilterChanges.
func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) {
return e.newFilter(nil, filter)
return e.txMan.NewDbTxScope(e.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) {
return e.newFilter(ctx, nil, filter, dbTx)
})
}

// internal
func (e *EthEndpoints) newFilter(wsConn *websocket.Conn, filter LogFilter) (interface{}, types.Error) {
func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *websocket.Conn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) {
shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil

if shouldFilterByBlockRange {
toBlockNumber, rpcErr := filter.ToBlock.GetNumericBlockNumber(ctx, e.state, e.etherman, dbTx)
if rpcErr != nil {
return nil, rpcErr
}
fromBlockNumber, rpcErr := filter.FromBlock.GetNumericBlockNumber(ctx, e.state, e.etherman, dbTx)
if rpcErr != nil {
return nil, rpcErr
}

if toBlockNumber < fromBlockNumber {
return RPCErrorResponse(types.InvalidParamsErrorCode, state.ErrInvalidBlockRange.Error(), nil, false)
}

blockRange := toBlockNumber - fromBlockNumber
if e.cfg.MaxLogsBlockRange > 0 && blockRange > e.cfg.MaxLogsBlockRange {
errMsg := fmt.Sprintf(state.ErrMaxLogsBlockRangeLimitExceeded.Error(), e.cfg.MaxLogsBlockRange)
return RPCErrorResponse(types.InvalidParamsErrorCode, errMsg, nil, false)
}
}

id, err := e.storage.NewLogFilter(wsConn, filter)
if errors.Is(err, ErrFilterInvalidPayload) {
return RPCErrorResponse(types.InvalidParamsErrorCode, err.Error(), nil, false)
Expand Down Expand Up @@ -1021,11 +1052,13 @@ func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter
case "newHeads":
return e.newBlockFilter(wsConn)
case "logs":
var lf LogFilter
if logFilter != nil {
lf = *logFilter
}
return e.newFilter(wsConn, lf)
return e.txMan.NewDbTxScope(e.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) {
var lf LogFilter
if logFilter != nil {
lf = *logFilter
}
return e.newFilter(ctx, wsConn, lf, dbTx)
})
case "pendingTransactions", "newPendingTransactions":
return e.newPendingTransactionFilter(wsConn)
case "syncing":
Expand All @@ -1048,6 +1081,13 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error {

// onNewL2Block is triggered when the state triggers the event for a new l2 block
func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
e.processBlockFilters(event)
e.processLogFilters(event)
}

// processBlockFilters answer filters subscribed for block updates when a new l2 block event
// is detected
func (e *EthEndpoints) processBlockFilters(event state.NewL2BlockEvent) {
blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all block filters with web sockets connections: %v", err)
Expand All @@ -1061,14 +1101,27 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) {
}
}
}
}

// processLogFilters answer filters subscribed for log updates when a new l2 block event
// is detected
func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) {
logFilters, err := e.storage.GetAllLogFiltersWithWSConn()
if err != nil {
log.Errorf("failed to get all log filters with web sockets connections: %v", err)
} else {
for _, filter := range logFilters {
changes, err := e.GetFilterChanges(filter.ID)
if err != nil {
if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) {
log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err)
err := e.storage.UninstallFilter(filter.ID)
if !errors.Is(err, ErrNotFound) && err != nil {
log.Errorf("failed to automatically uninstall filter %v: %v", filter.ID, err)
} else {
log.Infof("Filter %v automatically uninstalled", filter.ID)
}
continue
} else if err != nil {
log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err)
continue
}
Expand Down
Loading