diff --git a/cmd/dumpstate.go b/cmd/dumpstate.go index 6fc3b3156c..350048a3eb 100644 --- a/cmd/dumpstate.go +++ b/cmd/dumpstate.go @@ -113,7 +113,7 @@ func dumpState(ctx *cli.Context) error { if err != nil { return err } - stateDB := state.NewPostgresStorage(stateSqlDB) + stateDB := state.NewPostgresStorage(state.Config{}, stateSqlDB) dump := dumpedState{ Description: description, diff --git a/cmd/run.go b/cmd/run.go index 66556ad6cd..683ba4bccc 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -464,7 +464,7 @@ func waitSignal(cancelFuncs []context.CancelFunc) { } func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool, eventLog *event.EventLog, needsExecutor, needsStateTree bool) *state.State { - stateDb := state.NewPostgresStorage(sqlDB) + stateDb := state.NewPostgresStorage(c.State, sqlDB) // Executor var executorClient executor.ExecutorServiceClient @@ -487,6 +487,8 @@ func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDInt WaitOnResourceExhaustion: c.Executor.WaitOnResourceExhaustion, ForkUpgradeBatchNumber: c.ForkUpgradeBatchNumber, ForkUpgradeNewForkId: c.ForkUpgradeNewForkId, + MaxLogsCount: c.RPC.MaxLogsCount, + MaxLogsBlockRange: c.RPC.MaxLogsBlockRange, } st := state.NewState(stateCfg, stateDb, executorClient, stateTree, eventLog) diff --git a/config/config_test.go b/config/config_test.go index d3c5e5c11c..90f2373734 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -341,6 +341,14 @@ func Test_Defaults(t *testing.T) { path: "RPC.BatchRequestsLimit", expectedValue: uint(20), }, + { + path: "RPC.MaxLogsCount", + expectedValue: uint64(10000), + }, + { + path: "RPC.MaxLogsBlockRange", + expectedValue: uint64(10000), + }, { path: "RPC.WebSockets.Enabled", expectedValue: true, diff --git a/config/default.go b/config/default.go index 008195cea7..c1b54cd0d8 100644 --- a/config/default.go +++ b/config/default.go @@ -77,6 +77,8 @@ SequencerNodeURI = "" EnableL2SuggestedGasPricePolling = true BatchRequestsEnabled = false BatchRequestsLimit = 20 +MaxLogsCount = 10000 +MaxLogsBlockRange = 10000 [RPC.WebSockets] Enabled = true Host = "0.0.0.0" diff --git a/db/migrations/state/0011.sql b/db/migrations/state/0011.sql new file mode 100644 index 0000000000..ed40c997f5 --- /dev/null +++ b/db/migrations/state/0011.sql @@ -0,0 +1,17 @@ +-- +migrate Up +CREATE INDEX IF NOT EXISTS l2block_created_at_idx ON state.l2block (created_at); + +CREATE INDEX IF NOT EXISTS log_log_index_idx ON state.log (log_index); +CREATE INDEX IF NOT EXISTS log_topic0_idx ON state.log (topic0); +CREATE INDEX IF NOT EXISTS log_topic1_idx ON state.log (topic1); +CREATE INDEX IF NOT EXISTS log_topic2_idx ON state.log (topic2); +CREATE INDEX IF NOT EXISTS log_topic3_idx ON state.log (topic3); + +-- +migrate Down +DROP INDEX IF EXISTS state.l2block_created_at_idx; + +DROP INDEX IF EXISTS state.log_log_index_idx; +DROP INDEX IF EXISTS state.log_topic0_idx; +DROP INDEX IF EXISTS state.log_topic1_idx; +DROP INDEX IF EXISTS state.log_topic2_idx; +DROP INDEX IF EXISTS state.log_topic3_idx; \ No newline at end of file diff --git a/db/migrations/state/0011_test.go b/db/migrations/state/0011_test.go new file mode 100644 index 0000000000..2ec3072c1d --- /dev/null +++ b/db/migrations/state/0011_test.go @@ -0,0 +1,59 @@ +package migrations_test + +import ( + "database/sql" + "testing" + + "github.com/stretchr/testify/assert" +) + +// this migration changes length of the token name +type migrationTest0011 struct{} + +func (m migrationTest0011) InsertData(db *sql.DB) error { + return nil +} + +func (m migrationTest0011) RunAssertsAfterMigrationUp(t *testing.T, db *sql.DB) { + indexes := []string{ + "l2block_created_at_idx", + "log_log_index_idx", + "log_topic0_idx", + "log_topic1_idx", + "log_topic2_idx", + "log_topic3_idx", + } + // Check indexes adding + for _, idx := range indexes { + // getIndex + const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` + row := db.QueryRow(getIndex, idx) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 1, result) + } +} + +func (m migrationTest0011) RunAssertsAfterMigrationDown(t *testing.T, db *sql.DB) { + indexes := []string{ + "l2block_created_at_idx", + "log_log_index_idx", + "log_topic0_idx", + "log_topic1_idx", + "log_topic2_idx", + "log_topic3_idx", + } + // Check indexes removing + for _, idx := range indexes { + // getIndex + const getIndex = `SELECT count(*) FROM pg_indexes WHERE indexname = $1;` + row := db.QueryRow(getIndex, idx) + var result int + assert.NoError(t, row.Scan(&result)) + assert.Equal(t, 0, result) + } +} + +func TestMigration0011(t *testing.T) { + runMigrationTest(t, 11, migrationTest0011{}) +} diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index d16ff3b38e..f402dad247 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -14,7 +14,7 @@
"300ms"
 

Default: "1m0s"Type: string

WriteTimeout is the HTTP server write timeout
check net/http.server.WriteTimeout


Examples:

"1m"
 
"300ms"
-

Default: 500Type: number

MaxRequestsPerIPAndSecond defines how much requests a single IP can
send within a single second


Default: ""Type: string

SequencerNodeURI is used allow Non-Sequencer nodes
to relay transactions to the Sequencer node


Default: 0Type: integer

MaxCumulativeGasUsed is the max gas allowed per batch


WebSockets configuration
Default: trueType: boolean

Enabled defines if the WebSocket requests are enabled or disabled


Default: "0.0.0.0"Type: string

Host defines the network adapter that will be used to serve the WS requests


Default: 8546Type: integer

Port defines the port to serve the endpoints via WS


Default: 104857600Type: integer

ReadLimit defines the maximum size of a message read from the client (in bytes)


Default: trueType: boolean

EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price.


Default: falseType: boolean

BatchRequestsEnabled defines if the Batch requests are enabled or disabled


Default: 20Type: integer

BatchRequestsLimit defines the limit of requests that can be incorporated into each batch request


Type: array of integer

L2Coinbase defines which address is going to receive the fees

Must contain a minimum of 20 items

Must contain a maximum of 20 items

Each item of this array must be:

Type: integer

Configuration of service `Syncrhonizer`. For this service is also really important the value of `IsTrustedSequencer` because depending of this values is going to ask to a trusted node for trusted transactions or not
Default: "1s"Type: string

SyncInterval is the delay interval between reading new rollup information


Examples:

"1m"
+

Default: 500Type: number

MaxRequestsPerIPAndSecond defines how much requests a single IP can
send within a single second


Default: ""Type: string

SequencerNodeURI is used allow Non-Sequencer nodes
to relay transactions to the Sequencer node


Default: 0Type: integer

MaxCumulativeGasUsed is the max gas allowed per batch


WebSockets configuration
Default: trueType: boolean

Enabled defines if the WebSocket requests are enabled or disabled


Default: "0.0.0.0"Type: string

Host defines the network adapter that will be used to serve the WS requests


Default: 8546Type: integer

Port defines the port to serve the endpoints via WS


Default: 104857600Type: integer

ReadLimit defines the maximum size of a message read from the client (in bytes)


Default: trueType: boolean

EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price.


Default: falseType: boolean

BatchRequestsEnabled defines if the Batch requests are enabled or disabled


Default: 20Type: integer

BatchRequestsLimit defines the limit of requests that can be incorporated into each batch request


Type: array of integer

L2Coinbase defines which address is going to receive the fees

Must contain a minimum of 20 items

Must contain a maximum of 20 items

Each item of this array must be:


Default: 10000Type: integer

MaxLogsCount is a configuration to set the max number of logs that can be returned
in a single call to the state, if zero it means no limit


Default: 10000Type: integer

MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs
logs in a single call to the state, if zero it means no limit


Configuration of service `Syncrhonizer`. For this service is also really important the value of `IsTrustedSequencer` because depending of this values is going to ask to a trusted node for trusted transactions or not
Default: "1s"Type: string

SyncInterval is the delay interval between reading new rollup information


Examples:

"1m"
 
"300ms"
 

Default: 100Type: integer

SyncChunkSize is the number of blocks to sync on each chunk


Default: ""Type: string

TrustedSequencerURL is the rpc url to connect and sync the trusted state


Default: trueType: boolean

L1ParallelSynchronization Use new L1 synchronization that do in parallel request to L1 and process the data
If false use the legacy sequential mode


L1ParallelSynchronization Configuration for parallel mode (if UseParallelModeForL1Synchronization is true)
Default: 2Type: integer

NumberOfParallelOfEthereumClients Number of clients used to synchronize with L1
(if UseParallelModeForL1Synchronization is true)


Default: 10Type: integer

CapacityOfBufferingRollupInfoFromL1 Size of the buffer used to store rollup information from L1, must be >= to NumberOfEthereumClientsToSync
sugested twice of NumberOfParallelOfEthereumClients
(if UseParallelModeForL1Synchronization is true)


Default: "5s"Type: string

TimeForCheckLastBlockOnL1Time is the time to wait to request the
last block to L1 to known if we need to retrieve more data.
This value only apply when the system is synchronized


Examples:

"1m"
 
"300ms"
@@ -78,4 +78,4 @@
 
"300ms"
 

Default: 100000000Type: integer

Configuration of the merkle tree client service. Not use in the node, only for testing
Default: "zkevm-prover:50061"Type: string

URI is the server URI.


Configuration of the metrics service, basically is where is going to publish the metrics
Default: "0.0.0.0"Type: string

Host is the address to bind the metrics server


Default: 9091Type: integer

Port is the port to bind the metrics server


Default: falseType: boolean

Enabled is the flag to enable/disable the metrics server


Default: ""Type: string

ProfilingHost is the address to bind the profiling server


Default: 0Type: integer

ProfilingPort is the port to bind the profiling server


Default: falseType: boolean

ProfilingEnabled is the flag to enable/disable the profiling server


Configuration of the event database connection

DB is the database configuration
Default: ""Type: string

Database name


Default: ""Type: string

Database User name


Default: ""Type: string

Database Password of the user


Default: ""Type: string

Host address of database


Default: ""Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 0Type: integer

MaxConns is the maximum number of connections in the pool.


Configuration of the hash database connection
Default: "prover_db"Type: string

Database name


Default: "prover_user"Type: string

Database User name


Default: "prover_pass"Type: string

Database Password of the user


Default: "zkevm-state-db"Type: string

Host address of database


Default: "5432"Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 200Type: integer

MaxConns is the maximum number of connections in the pool.


State service configuration
Default: 0Type: integer

MaxCumulativeGasUsed is the max gas allowed per batch


Default: 0Type: integer

ChainID is the L2 ChainID provided by the Network Config


Type: array of object

ForkIdIntervals is the list of fork id intervals

Each item of this array must be:


Default: 0Type: integer

MaxResourceExhaustedAttempts is the max number of attempts to make a transaction succeed because of resource exhaustion


Default: "0s"Type: string

WaitOnResourceExhaustion is the time to wait before retrying a transaction because of resource exhaustion


Examples:

"1m"
 
"300ms"
-

Default: 0Type: integer

Batch number from which there is a forkid change (fork upgrade)


Default: 0Type: integer

New fork id to be used for batches greaters than ForkUpgradeBatchNumber (fork upgrade)


DB is the database configuration
Default: "state_db"Type: string

Database name


Default: "state_user"Type: string

Database User name


Default: "state_password"Type: string

Database Password of the user


Default: "zkevm-state-db"Type: string

Host address of database


Default: "5432"Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 200Type: integer

MaxConns is the maximum number of connections in the pool.


Configuration for the batch constraints
Default: 300Type: integer

Default: 120000Type: integer

Default: 30000000Type: integer

Default: 2145Type: integer

Default: 252357Type: integer

Default: 135191Type: integer

Default: 236585Type: integer

Default: 236585Type: integer

Default: 473170Type: integer

Default: 7570538Type: integer

\ No newline at end of file +

Default: 0Type: integer

Batch number from which there is a forkid change (fork upgrade)


Default: 0Type: integer

New fork id to be used for batches greaters than ForkUpgradeBatchNumber (fork upgrade)


DB is the database configuration
Default: "state_db"Type: string

Database name


Default: "state_user"Type: string

Database User name


Default: "state_password"Type: string

Database Password of the user


Default: "zkevm-state-db"Type: string

Host address of database


Default: "5432"Type: string

Port Number of database


Default: falseType: boolean

EnableLog


Default: 200Type: integer

MaxConns is the maximum number of connections in the pool.


Configuration for the batch constraints
Default: 300Type: integer

Default: 120000Type: integer

Default: 30000000Type: integer

Default: 2145Type: integer

Default: 252357Type: integer

Default: 135191Type: integer

Default: 236585Type: integer

Default: 236585Type: integer

Default: 473170Type: integer

Default: 7570538Type: integer

Default: 0Type: integer

MaxLogsCount is a configuration to set the max number of logs that can be returned
in a single call to the state, if zero it means no limit


Default: 0Type: integer

MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs
logs in a single call to the state, if zero it means no limit


\ No newline at end of file diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md index 25a6762a26..07f50d875a 100644 --- a/docs/config-file/node-config-doc.md +++ b/docs/config-file/node-config-doc.md @@ -718,20 +718,22 @@ GlobalQueue=1024 **Type:** : `object` **Description:** Configuration for RPC service. THis one offers a extended Ethereum JSON-RPC API interface to interact with the node -| Property | Pattern | Type | Deprecated | Definition | Title/Description | -| ---------------------------------------------------------------------------- | ------- | ---------------- | ---------- | ---------- | ----------------------------------------------------------------------------------------------------------------- | -| - [Host](#RPC_Host ) | No | string | No | - | Host defines the network adapter that will be used to serve the HTTP requests | -| - [Port](#RPC_Port ) | No | integer | No | - | Port defines the port to serve the endpoints via HTTP | -| - [ReadTimeout](#RPC_ReadTimeout ) | No | string | No | - | Duration | -| - [WriteTimeout](#RPC_WriteTimeout ) | No | string | No | - | Duration | -| - [MaxRequestsPerIPAndSecond](#RPC_MaxRequestsPerIPAndSecond ) | No | number | No | - | MaxRequestsPerIPAndSecond defines how much requests a single IP can
send within a single second | -| - [SequencerNodeURI](#RPC_SequencerNodeURI ) | No | string | No | - | SequencerNodeURI is used allow Non-Sequencer nodes
to relay transactions to the Sequencer node | -| - [MaxCumulativeGasUsed](#RPC_MaxCumulativeGasUsed ) | No | integer | No | - | MaxCumulativeGasUsed is the max gas allowed per batch | -| - [WebSockets](#RPC_WebSockets ) | No | object | No | - | WebSockets configuration | -| - [EnableL2SuggestedGasPricePolling](#RPC_EnableL2SuggestedGasPricePolling ) | No | boolean | No | - | EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price. | -| - [BatchRequestsEnabled](#RPC_BatchRequestsEnabled ) | No | boolean | No | - | BatchRequestsEnabled defines if the Batch requests are enabled or disabled | -| - [BatchRequestsLimit](#RPC_BatchRequestsLimit ) | No | integer | No | - | BatchRequestsLimit defines the limit of requests that can be incorporated into each batch request | -| - [L2Coinbase](#RPC_L2Coinbase ) | No | array of integer | No | - | L2Coinbase defines which address is going to receive the fees | +| Property | Pattern | Type | Deprecated | Definition | Title/Description | +| ---------------------------------------------------------------------------- | ------- | ---------------- | ---------- | ---------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| - [Host](#RPC_Host ) | No | string | No | - | Host defines the network adapter that will be used to serve the HTTP requests | +| - [Port](#RPC_Port ) | No | integer | No | - | Port defines the port to serve the endpoints via HTTP | +| - [ReadTimeout](#RPC_ReadTimeout ) | No | string | No | - | Duration | +| - [WriteTimeout](#RPC_WriteTimeout ) | No | string | No | - | Duration | +| - [MaxRequestsPerIPAndSecond](#RPC_MaxRequestsPerIPAndSecond ) | No | number | No | - | MaxRequestsPerIPAndSecond defines how much requests a single IP can
send within a single second | +| - [SequencerNodeURI](#RPC_SequencerNodeURI ) | No | string | No | - | SequencerNodeURI is used allow Non-Sequencer nodes
to relay transactions to the Sequencer node | +| - [MaxCumulativeGasUsed](#RPC_MaxCumulativeGasUsed ) | No | integer | No | - | MaxCumulativeGasUsed is the max gas allowed per batch | +| - [WebSockets](#RPC_WebSockets ) | No | object | No | - | WebSockets configuration | +| - [EnableL2SuggestedGasPricePolling](#RPC_EnableL2SuggestedGasPricePolling ) | No | boolean | No | - | EnableL2SuggestedGasPricePolling enables polling of the L2 gas price to block tx in the RPC with lower gas price. | +| - [BatchRequestsEnabled](#RPC_BatchRequestsEnabled ) | No | boolean | No | - | BatchRequestsEnabled defines if the Batch requests are enabled or disabled | +| - [BatchRequestsLimit](#RPC_BatchRequestsLimit ) | No | integer | No | - | BatchRequestsLimit defines the limit of requests that can be incorporated into each batch request | +| - [L2Coinbase](#RPC_L2Coinbase ) | No | array of integer | No | - | L2Coinbase defines which address is going to receive the fees | +| - [MaxLogsCount](#RPC_MaxLogsCount ) | No | integer | No | - | MaxLogsCount is a configuration to set the max number of logs that can be returned
in a single call to the state, if zero it means no limit | +| - [MaxLogsBlockRange](#RPC_MaxLogsBlockRange ) | No | integer | No | - | MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs
logs in a single call to the state, if zero it means no limit | ### 8.1. `RPC.Host` @@ -974,6 +976,36 @@ BatchRequestsLimit=20 **Type:** : `array of integer` **Description:** L2Coinbase defines which address is going to receive the fees +### 8.13. `RPC.MaxLogsCount` + +**Type:** : `integer` + +**Default:** `10000` + +**Description:** MaxLogsCount is a configuration to set the max number of logs that can be returned +in a single call to the state, if zero it means no limit + +**Example setting the default value** (10000): +``` +[RPC] +MaxLogsCount=10000 +``` + +### 8.14. `RPC.MaxLogsBlockRange` + +**Type:** : `integer` + +**Default:** `10000` + +**Description:** MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs +logs in a single call to the state, if zero it means no limit + +**Example setting the default value** (10000): +``` +[RPC] +MaxLogsBlockRange=10000 +``` + ## 9. `[Synchronizer]` **Type:** : `object` @@ -3160,17 +3192,19 @@ MaxConns=200 **Type:** : `object` **Description:** State service configuration -| Property | Pattern | Type | Deprecated | Definition | Title/Description | -| ---------------------------------------------------------------------- | ------- | --------------- | ---------- | ---------- | ----------------------------------------------------------------------------------------------------------------------- | -| - [MaxCumulativeGasUsed](#State_MaxCumulativeGasUsed ) | No | integer | No | - | MaxCumulativeGasUsed is the max gas allowed per batch | -| - [ChainID](#State_ChainID ) | No | integer | No | - | ChainID is the L2 ChainID provided by the Network Config | -| - [ForkIDIntervals](#State_ForkIDIntervals ) | No | array of object | No | - | ForkIdIntervals is the list of fork id intervals | -| - [MaxResourceExhaustedAttempts](#State_MaxResourceExhaustedAttempts ) | No | integer | No | - | MaxResourceExhaustedAttempts is the max number of attempts to make a transaction succeed because of resource exhaustion | -| - [WaitOnResourceExhaustion](#State_WaitOnResourceExhaustion ) | No | string | No | - | Duration | -| - [ForkUpgradeBatchNumber](#State_ForkUpgradeBatchNumber ) | No | integer | No | - | Batch number from which there is a forkid change (fork upgrade) | -| - [ForkUpgradeNewForkId](#State_ForkUpgradeNewForkId ) | No | integer | No | - | New fork id to be used for batches greaters than ForkUpgradeBatchNumber (fork upgrade) | -| - [DB](#State_DB ) | No | object | No | - | DB is the database configuration | -| - [Batch](#State_Batch ) | No | object | No | - | Configuration for the batch constraints | +| Property | Pattern | Type | Deprecated | Definition | Title/Description | +| ---------------------------------------------------------------------- | ------- | --------------- | ---------- | ---------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| - [MaxCumulativeGasUsed](#State_MaxCumulativeGasUsed ) | No | integer | No | - | MaxCumulativeGasUsed is the max gas allowed per batch | +| - [ChainID](#State_ChainID ) | No | integer | No | - | ChainID is the L2 ChainID provided by the Network Config | +| - [ForkIDIntervals](#State_ForkIDIntervals ) | No | array of object | No | - | ForkIdIntervals is the list of fork id intervals | +| - [MaxResourceExhaustedAttempts](#State_MaxResourceExhaustedAttempts ) | No | integer | No | - | MaxResourceExhaustedAttempts is the max number of attempts to make a transaction succeed because of resource exhaustion | +| - [WaitOnResourceExhaustion](#State_WaitOnResourceExhaustion ) | No | string | No | - | Duration | +| - [ForkUpgradeBatchNumber](#State_ForkUpgradeBatchNumber ) | No | integer | No | - | Batch number from which there is a forkid change (fork upgrade) | +| - [ForkUpgradeNewForkId](#State_ForkUpgradeNewForkId ) | No | integer | No | - | New fork id to be used for batches greaters than ForkUpgradeBatchNumber (fork upgrade) | +| - [DB](#State_DB ) | No | object | No | - | DB is the database configuration | +| - [Batch](#State_Batch ) | No | object | No | - | Configuration for the batch constraints | +| - [MaxLogsCount](#State_MaxLogsCount ) | No | integer | No | - | MaxLogsCount is a configuration to set the max number of logs that can be returned
in a single call to the state, if zero it means no limit | +| - [MaxLogsBlockRange](#State_MaxLogsBlockRange ) | No | integer | No | - | MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs
logs in a single call to the state, if zero it means no limit | ### 20.1. `State.MaxCumulativeGasUsed` @@ -3577,5 +3611,35 @@ MaxBinaries=473170 MaxSteps=7570538 ``` +### 20.10. `State.MaxLogsCount` + +**Type:** : `integer` + +**Default:** `0` + +**Description:** MaxLogsCount is a configuration to set the max number of logs that can be returned +in a single call to the state, if zero it means no limit + +**Example setting the default value** (0): +``` +[State] +MaxLogsCount=0 +``` + +### 20.11. `State.MaxLogsBlockRange` + +**Type:** : `integer` + +**Default:** `0` + +**Description:** MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs +logs in a single call to the state, if zero it means no limit + +**Example setting the default value** (0): +``` +[State] +MaxLogsBlockRange=0 +``` + ---------------------------------------------------------------------------------------------------------------------------- Generated using [json-schema-for-humans](https://github.com/coveooss/json-schema-for-humans) diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json index 04ca2ba019..bd6e5e1424 100644 --- a/docs/config-file/node-config-schema.json +++ b/docs/config-file/node-config-schema.json @@ -368,6 +368,16 @@ "maxItems": 20, "minItems": 20, "description": "L2Coinbase defines which address is going to receive the fees" + }, + "MaxLogsCount": { + "type": "integer", + "description": "MaxLogsCount is a configuration to set the max number of logs that can be returned\nin a single call to the state, if zero it means no limit", + "default": 10000 + }, + "MaxLogsBlockRange": { + "type": "integer", + "description": "MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs\nlogs in a single call to the state, if zero it means no limit", + "default": 10000 } }, "additionalProperties": false, @@ -1457,6 +1467,16 @@ "additionalProperties": false, "type": "object", "description": "Configuration for the batch constraints" + }, + "MaxLogsCount": { + "type": "integer", + "description": "MaxLogsCount is a configuration to set the max number of logs that can be returned\nin a single call to the state, if zero it means no limit", + "default": 0 + }, + "MaxLogsBlockRange": { + "type": "integer", + "description": "MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs\nlogs in a single call to the state, if zero it means no limit", + "default": 0 } }, "additionalProperties": false, diff --git a/jsonrpc/config.go b/jsonrpc/config.go index 0908b3e7b2..ad92b8b6f0 100644 --- a/jsonrpc/config.go +++ b/jsonrpc/config.go @@ -46,6 +46,14 @@ type Config struct { // L2Coinbase defines which address is going to receive the fees L2Coinbase common.Address + + // MaxLogsCount is a configuration to set the max number of logs that can be returned + // in a single call to the state, if zero it means no limit + MaxLogsCount uint64 `mapstructure:"MaxLogsCount"` + + // MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs + // logs in a single call to the state, if zero it means no limit + MaxLogsBlockRange uint64 `mapstructure:"MaxLogsBlockRange"` } // WebSocketsConfig has parameters to config the rpc websocket support diff --git a/jsonrpc/endpoints_eth.go b/jsonrpc/endpoints_eth.go index 9139a98bf4..988cafd5af 100644 --- a/jsonrpc/endpoints_eth.go +++ b/jsonrpc/endpoints_eth.go @@ -499,7 +499,13 @@ func (e *EthEndpoints) internalGetLogs(ctx context.Context, dbTx pgx.Tx, filter } logs, err := e.state.GetLogs(ctx, fromBlock, toBlock, filter.Addresses, filter.Topics, filter.BlockHash, filter.Since, dbTx) - if err != nil { + if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) { + errMsg := fmt.Sprintf(state.ErrMaxLogsCountLimitExceeded.Error(), e.cfg.MaxLogsCount) + return RPCErrorResponse(types.InvalidParamsErrorCode, errMsg, nil, false) + } else if errors.Is(err, state.ErrMaxLogsBlockRangeLimitExceeded) { + errMsg := fmt.Sprintf(state.ErrMaxLogsBlockRangeLimitExceeded.Error(), e.cfg.MaxLogsBlockRange) + return RPCErrorResponse(types.InvalidParamsErrorCode, errMsg, nil, false) + } else if err != nil { return RPCErrorResponse(types.DefaultErrorCode, "failed to get logs from state", err, true) } @@ -829,11 +835,36 @@ func (e *EthEndpoints) newBlockFilter(wsConn *websocket.Conn) (interface{}, type // to notify when the state changes (logs). To check if the state // has changed, call eth_getFilterChanges. func (e *EthEndpoints) NewFilter(filter LogFilter) (interface{}, types.Error) { - return e.newFilter(nil, filter) + return e.txMan.NewDbTxScope(e.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) { + return e.newFilter(ctx, nil, filter, dbTx) + }) } // internal -func (e *EthEndpoints) newFilter(wsConn *websocket.Conn, filter LogFilter) (interface{}, types.Error) { +func (e *EthEndpoints) newFilter(ctx context.Context, wsConn *websocket.Conn, filter LogFilter, dbTx pgx.Tx) (interface{}, types.Error) { + shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil + + if shouldFilterByBlockRange { + toBlockNumber, rpcErr := filter.ToBlock.GetNumericBlockNumber(ctx, e.state, e.etherman, dbTx) + if rpcErr != nil { + return nil, rpcErr + } + fromBlockNumber, rpcErr := filter.FromBlock.GetNumericBlockNumber(ctx, e.state, e.etherman, dbTx) + if rpcErr != nil { + return nil, rpcErr + } + + if toBlockNumber < fromBlockNumber { + return RPCErrorResponse(types.InvalidParamsErrorCode, state.ErrInvalidBlockRange.Error(), nil, false) + } + + blockRange := toBlockNumber - fromBlockNumber + if e.cfg.MaxLogsBlockRange > 0 && blockRange > e.cfg.MaxLogsBlockRange { + errMsg := fmt.Sprintf(state.ErrMaxLogsBlockRangeLimitExceeded.Error(), e.cfg.MaxLogsBlockRange) + return RPCErrorResponse(types.InvalidParamsErrorCode, errMsg, nil, false) + } + } + id, err := e.storage.NewLogFilter(wsConn, filter) if errors.Is(err, ErrFilterInvalidPayload) { return RPCErrorResponse(types.InvalidParamsErrorCode, err.Error(), nil, false) @@ -1021,11 +1052,13 @@ func (e *EthEndpoints) Subscribe(wsConn *websocket.Conn, name string, logFilter case "newHeads": return e.newBlockFilter(wsConn) case "logs": - var lf LogFilter - if logFilter != nil { - lf = *logFilter - } - return e.newFilter(wsConn, lf) + return e.txMan.NewDbTxScope(e.state, func(ctx context.Context, dbTx pgx.Tx) (interface{}, types.Error) { + var lf LogFilter + if logFilter != nil { + lf = *logFilter + } + return e.newFilter(ctx, wsConn, lf, dbTx) + }) case "pendingTransactions", "newPendingTransactions": return e.newPendingTransactionFilter(wsConn) case "syncing": @@ -1048,6 +1081,13 @@ func (e *EthEndpoints) uninstallFilterByWSConn(wsConn *websocket.Conn) error { // onNewL2Block is triggered when the state triggers the event for a new l2 block func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { + e.processBlockFilters(event) + e.processLogFilters(event) +} + +// processBlockFilters answer filters subscribed for block updates when a new l2 block event +// is detected +func (e *EthEndpoints) processBlockFilters(event state.NewL2BlockEvent) { blockFilters, err := e.storage.GetAllBlockFiltersWithWSConn() if err != nil { log.Errorf("failed to get all block filters with web sockets connections: %v", err) @@ -1061,14 +1101,27 @@ func (e *EthEndpoints) onNewL2Block(event state.NewL2BlockEvent) { } } } +} +// processLogFilters answer filters subscribed for log updates when a new l2 block event +// is detected +func (e *EthEndpoints) processLogFilters(event state.NewL2BlockEvent) { logFilters, err := e.storage.GetAllLogFiltersWithWSConn() if err != nil { log.Errorf("failed to get all log filters with web sockets connections: %v", err) } else { for _, filter := range logFilters { changes, err := e.GetFilterChanges(filter.ID) - if err != nil { + if errors.Is(err, state.ErrMaxLogsCountLimitExceeded) { + log.Infof("failed to get filters changes for filter %v, the filter seems to be returning more results than allowed and was removed: %v", filter.ID, err) + err := e.storage.UninstallFilter(filter.ID) + if !errors.Is(err, ErrNotFound) && err != nil { + log.Errorf("failed to automatically uninstall filter %v: %v", filter.ID, err) + } else { + log.Infof("Filter %v automatically uninstalled", filter.ID) + } + continue + } else if err != nil { log.Errorf("failed to get filters changes for filter %v with web sockets connections: %v", filter.ID, err) continue } diff --git a/jsonrpc/endpoints_eth_test.go b/jsonrpc/endpoints_eth_test.go index 641709173e..54d6292484 100644 --- a/jsonrpc/endpoints_eth_test.go +++ b/jsonrpc/endpoints_eth_test.go @@ -3604,16 +3604,29 @@ func TestNewFilter(t *testing.T) { } hash := common.HexToHash("0x42") - blockNumber := "8" + blockNumber10 := "10" + blockNumber10010 := "10010" + blockNumber10011 := "10011" testCases := []testCase{ { - Name: "New filter created successfully", + Name: "New filter by block range created successfully", Request: types.LogFilterRequest{ - ToBlock: &blockNumber, + FromBlock: &blockNumber10, + ToBlock: &blockNumber10010, }, ExpectedResult: "1", ExpectedError: nil, SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Commit", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + m.Storage. On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). Return("1", nil). @@ -3621,32 +3634,89 @@ func TestNewFilter(t *testing.T) { }, }, { - Name: "failed to create new filter", + Name: "New filter by block hash created successfully", Request: types.LogFilterRequest{ BlockHash: &hash, }, - ExpectedResult: "", - ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new log filter"), + ExpectedResult: "1", + ExpectedError: nil, SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Commit", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + m.Storage. On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). - Return("", errors.New("failed to add new filter")). + Return("1", nil). + Once() + }, + }, + { + Name: "New filter not created due to from block greater than to block", + Request: types.LogFilterRequest{ + FromBlock: &blockNumber10010, + ToBlock: &blockNumber10, + }, + ExpectedResult: "", + ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "invalid block range"), + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + }, + }, + { + Name: "New filter not created due to block range bigger than allowed", + Request: types.LogFilterRequest{ + FromBlock: &blockNumber10, + ToBlock: &blockNumber10011, + }, + ExpectedResult: "", + ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "logs are limited to a 10000 block range"), + SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). Once() }, }, { - Name: "failed to create new filter because BlockHash and ToBlock are present", + Name: "failed to create new filter due to error to store", Request: types.LogFilterRequest{ BlockHash: &hash, - ToBlock: &blockNumber, }, ExpectedResult: "", - ExpectedError: types.NewRPCError(types.InvalidParamsErrorCode, "invalid argument 0: cannot specify both BlockHash and FromBlock/ToBlock, choose one or the other"), + ExpectedError: types.NewRPCError(types.DefaultErrorCode, "failed to create new log filter"), SetupMocks: func(m *mocksWrapper, tc testCase) { + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() m.Storage. On("NewLogFilter", mock.IsType(&websocket.Conn{}), mock.IsType(LogFilter{})). - Once(). - Return("", ErrFilterInvalidPayload). + Return("", errors.New("failed to add new filter")). Once() }, }, @@ -4011,6 +4081,64 @@ func TestGetLogs(t *testing.T) { Once() }, }, + { + Name: "Get logs fails due to max block range limit exceeded", + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + FromBlock: big.NewInt(1), ToBlock: big.NewInt(10002), + Addresses: []common.Address{common.HexToAddress("0x111")}, + Topics: [][]common.Hash{{common.HexToHash("0x222")}}, + } + tc.ExpectedResult = nil + tc.ExpectedError = types.NewRPCError(types.InvalidParamsErrorCode, "logs are limited to a 10000 block range") + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + var since *time.Time + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + + m.State. + On("GetLogs", context.Background(), tc.Filter.FromBlock.Uint64(), tc.Filter.ToBlock.Uint64(), tc.Filter.Addresses, tc.Filter.Topics, tc.Filter.BlockHash, since, m.DbTx). + Return(nil, state.ErrMaxLogsBlockRangeLimitExceeded). + Once() + }, + }, + { + Name: "Get logs fails due to max log count limit exceeded", + Prepare: func(t *testing.T, tc *testCase) { + tc.Filter = ethereum.FilterQuery{ + FromBlock: big.NewInt(1), ToBlock: big.NewInt(10002), + Addresses: []common.Address{common.HexToAddress("0x111")}, + Topics: [][]common.Hash{{common.HexToHash("0x222")}}, + } + tc.ExpectedResult = nil + tc.ExpectedError = types.NewRPCError(types.InvalidParamsErrorCode, "query returned more than 10000 results") + }, + SetupMocks: func(m *mocksWrapper, tc testCase) { + var since *time.Time + m.DbTx. + On("Rollback", context.Background()). + Return(nil). + Once() + + m.State. + On("BeginStateTransaction", context.Background()). + Return(m.DbTx, nil). + Once() + + m.State. + On("GetLogs", context.Background(), tc.Filter.FromBlock.Uint64(), tc.Filter.ToBlock.Uint64(), tc.Filter.Addresses, tc.Filter.Topics, tc.Filter.BlockHash, since, m.DbTx). + Return(nil, state.ErrMaxLogsCountLimitExceeded). + Once() + }, + }, } for _, testCase := range testCases { diff --git a/jsonrpc/server.go b/jsonrpc/server.go index f3459b062d..bd6da338b7 100644 --- a/jsonrpc/server.go +++ b/jsonrpc/server.go @@ -455,10 +455,12 @@ func RPCErrorResponse(code int, message string, err error, logError bool) (inter // RPCErrorResponseWithData formats error to be returned through RPC func RPCErrorResponseWithData(code int, message string, data *[]byte, err error, logError bool) (interface{}, types.Error) { - if err != nil { - log.Errorf("%v: %v", message, err.Error()) - } else { - log.Error(message) + if logError { + if err != nil { + log.Errorf("%v: %v", message, err.Error()) + } else { + log.Error(message) + } } return nil, types.NewRPCErrorWithData(code, message, data) } diff --git a/jsonrpc/server_test.go b/jsonrpc/server_test.go index 4cce938e20..c02aa6d00a 100644 --- a/jsonrpc/server_test.go +++ b/jsonrpc/server_test.go @@ -150,6 +150,8 @@ func getSequencerDefaultConfig() Config { MaxRequestsPerIPAndSecond: maxRequestsPerIPAndSecond, MaxCumulativeGasUsed: 300000, BatchRequestsEnabled: true, + MaxLogsCount: 10000, + MaxLogsBlockRange: 10000, } return cfg } diff --git a/jsonrpc/storage.go b/jsonrpc/storage.go index 32de18fc27..ada5f32ffe 100644 --- a/jsonrpc/storage.go +++ b/jsonrpc/storage.go @@ -32,7 +32,10 @@ func NewStorage() *Storage { // NewLogFilter persists a new log filter func (s *Storage) NewLogFilter(wsConn *websocket.Conn, filter LogFilter) (string, error) { - if filter.BlockHash != nil && (filter.FromBlock != nil || filter.ToBlock != nil) { + shouldFilterByBlockHash := filter.BlockHash != nil + shouldFilterByBlockRange := filter.FromBlock != nil || filter.ToBlock != nil + + if shouldFilterByBlockHash && shouldFilterByBlockRange { return "", ErrFilterInvalidPayload } diff --git a/pool/pool_test.go b/pool/pool_test.go index 618428332c..10a354eed9 100644 --- a/pool/pool_test.go +++ b/pool/pool_test.go @@ -1007,7 +1007,7 @@ func Test_TryAddIncompatibleTxs(t *testing.T) { func newState(sqlDB *pgxpool.Pool, eventLog *event.EventLog) *state.State { ctx := context.Background() - stateDb := state.NewPostgresStorage(sqlDB) + stateDb := state.NewPostgresStorage(state.Config{}, sqlDB) zkProverURI := testutils.GetEnv("ZKPROVER_URI", "localhost") executorServerConfig := executor.Config{URI: fmt.Sprintf("%s:50071", zkProverURI), MaxGRPCMessageSize: 100000000} diff --git a/sequencer/closingsignalsmanager_test.go b/sequencer/closingsignalsmanager_test.go index 2b6b8fc179..a47ca14730 100644 --- a/sequencer/closingsignalsmanager_test.go +++ b/sequencer/closingsignalsmanager_test.go @@ -72,7 +72,7 @@ func setupTest(t *testing.T) { eventLog := event.NewEventLog(event.Config{}, eventStorage) localStateTree := merkletree.NewStateTree(localMtDBServiceClient) - localState = state.NewState(stateCfg, state.NewPostgresStorage(localStateDb), localExecutorClient, localStateTree, eventLog) + localState = state.NewState(stateCfg, state.NewPostgresStorage(state.Config{}, localStateDb), localExecutorClient, localStateTree, eventLog) batchConstraints := state.BatchConstraintsCfg{ MaxTxsPerBatch: 300, diff --git a/sequencer/dbmanager_test.go b/sequencer/dbmanager_test.go index 7cfa8d8ff1..aa13d97adc 100644 --- a/sequencer/dbmanager_test.go +++ b/sequencer/dbmanager_test.go @@ -65,7 +65,7 @@ func setupDBManager() { eventLog := event.NewEventLog(event.Config{}, eventStorage) stateTree = merkletree.NewStateTree(mtDBServiceClient) - testState = state.NewState(stateCfg, state.NewPostgresStorage(stateDb), executorClient, stateTree, eventLog) + testState = state.NewState(stateCfg, state.NewPostgresStorage(state.Config{}, stateDb), executorClient, stateTree, eventLog) // DBManager closingSignalCh := ClosingSignalCh{ diff --git a/state/config.go b/state/config.go index 9ecc741d1c..140892ac08 100644 --- a/state/config.go +++ b/state/config.go @@ -33,6 +33,14 @@ type Config struct { // Configuration for the batch constraints Batch BatchConfig `mapstructure:"Batch"` + + // MaxLogsCount is a configuration to set the max number of logs that can be returned + // in a single call to the state, if zero it means no limit + MaxLogsCount uint64 + + // MaxLogsBlockRange is a configuration to set the max range for block number when querying TXs + // logs in a single call to the state, if zero it means no limit + MaxLogsBlockRange uint64 } // BatchConfig represents the configuration of the batch constraints diff --git a/state/errors.go b/state/errors.go index 606ad910f7..22a99e08d0 100644 --- a/state/errors.go +++ b/state/errors.go @@ -57,6 +57,15 @@ var ( ErrInvalidData = errors.New("invalid data") // ErrBatchResourceBytesUnderflow happens when the batch runs out of Bytes ErrBatchResourceBytesUnderflow = NewBatchRemainingResourcesUnderflowError(nil, "Bytes") + // ErrInvalidBlockRange returned when the selected block range is invalid, generally + // because the toBlock is bigger than the fromBlock + ErrInvalidBlockRange = errors.New("invalid block range") + // ErrMaxLogsCountLimitExceeded returned when the number of logs is bigger than the + // configured limit + ErrMaxLogsCountLimitExceeded = errors.New("query returned more than %v results") + // ErrMaxLogsBlockRangeLimitExceeded returned when the range between block number range + // to filter logs is bigger than the configured limit + ErrMaxLogsBlockRangeLimitExceeded = errors.New("logs are limited to a %v block range") zkCounterErrPrefix = "ZKCounter: " ) diff --git a/state/pgstatestorage.go b/state/pgstatestorage.go index a97c0c9e7e..fb55f75aa8 100644 --- a/state/pgstatestorage.go +++ b/state/pgstatestorage.go @@ -25,12 +25,14 @@ const ( // PostgresStorage implements the Storage interface type PostgresStorage struct { + cfg Config *pgxpool.Pool } // NewPostgresStorage creates a new StateDB -func NewPostgresStorage(db *pgxpool.Pool) *PostgresStorage { +func NewPostgresStorage(cfg Config, db *pgxpool.Pool) *PostgresStorage { return &PostgresStorage{ + cfg, db, } } @@ -1933,49 +1935,57 @@ func (p *PostgresStorage) IsL2BlockVirtualized(ctx context.Context, blockNumber // GetLogs returns the logs that match the filter func (p *PostgresStorage) GetLogs(ctx context.Context, fromBlock uint64, toBlock uint64, addresses []common.Address, topics [][]common.Hash, blockHash *common.Hash, since *time.Time, dbTx pgx.Tx) ([]*types.Log, error) { - const getLogsByBlockHashSQL = ` - SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3 - FROM state.log l - INNER JOIN state.transaction t ON t.hash = l.tx_hash - INNER JOIN state.l2block b ON b.block_num = t.l2_block_num - WHERE b.block_hash = $1 - AND (l.address = any($2) OR $2 IS NULL) - AND (l.topic0 = any($3) OR $3 IS NULL) - AND (l.topic1 = any($4) OR $4 IS NULL) - AND (l.topic2 = any($5) OR $5 IS NULL) - AND (l.topic3 = any($6) OR $6 IS NULL) - AND (b.created_at >= $7 OR $7 IS NULL) - ORDER BY b.block_num ASC, l.log_index ASC` - const getLogsByBlockNumbersSQL = ` - SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3 - FROM state.log l + // query parts + const queryCount = `SELECT count(*) ` + const querySelect = `SELECT t.l2_block_num, b.block_hash, l.tx_hash, l.log_index, l.address, l.data, l.topic0, l.topic1, l.topic2, l.topic3 ` + + const queryBody = `FROM state.log l INNER JOIN state.transaction t ON t.hash = l.tx_hash INNER JOIN state.l2block b ON b.block_num = t.l2_block_num - WHERE b.block_num BETWEEN $1 AND $2 - AND (l.address = any($3) OR $3 IS NULL) - AND (l.topic0 = any($4) OR $4 IS NULL) - AND (l.topic1 = any($5) OR $5 IS NULL) - AND (l.topic2 = any($6) OR $6 IS NULL) - AND (l.topic3 = any($7) OR $7 IS NULL) - AND (b.created_at >= $8 OR $8 IS NULL) - ORDER BY b.block_num ASC, l.log_index ASC` - - var args []interface{} - var query string - if blockHash != nil { - args = []interface{}{blockHash.String()} - query = getLogsByBlockHashSQL - } else { - args = []interface{}{fromBlock, toBlock} - query = getLogsByBlockNumbersSQL - } - + WHERE (l.address = any($1) OR $1 IS NULL) + AND (l.topic0 = any($2) OR $2 IS NULL) + AND (l.topic1 = any($3) OR $3 IS NULL) + AND (l.topic2 = any($4) OR $4 IS NULL) + AND (l.topic3 = any($5) OR $5 IS NULL) + AND (b.created_at >= $6 OR $6 IS NULL) ` + + const queryFilterByBlockHash = `AND b.block_hash = $7 ` + const queryFilterByBlockNumbers = `AND b.block_num BETWEEN $7 AND $8 ` + + const queryOrder = `ORDER BY b.block_num ASC, l.log_index ASC` + + // count queries + const queryToCountLogsByBlockHash = "" + + queryCount + + queryBody + + queryFilterByBlockHash + const queryToCountLogsByBlockNumbers = "" + + queryCount + + queryBody + + queryFilterByBlockNumbers + + // select queries + const queryToSelectLogsByBlockHash = "" + + querySelect + + queryBody + + queryFilterByBlockHash + + queryOrder + const queryToSelectLogsByBlockNumbers = "" + + querySelect + + queryBody + + queryFilterByBlockNumbers + + queryOrder + + args := []interface{}{} + + // address filter if len(addresses) > 0 { args = append(args, p.addressesToHex(addresses)) } else { args = append(args, nil) } + // topic filters for i := 0; i < maxTopics; i++ { if len(topics) > i && len(topics[i]) > 0 { args = append(args, p.hashesToHex(topics[i])) @@ -1984,11 +1994,45 @@ func (p *PostgresStorage) GetLogs(ctx context.Context, fromBlock uint64, toBlock } } + // since filter args = append(args, since) + // block filter + var queryToCount string + var queryToSelect string + if blockHash != nil { + args = append(args, blockHash.String()) + queryToCount = queryToCountLogsByBlockHash + queryToSelect = queryToSelectLogsByBlockHash + } else { + if toBlock < fromBlock { + return nil, ErrInvalidBlockRange + } + + blockRange := toBlock - fromBlock + if p.cfg.MaxLogsBlockRange > 0 && blockRange > p.cfg.MaxLogsBlockRange { + return nil, ErrMaxLogsBlockRangeLimitExceeded + } + + args = append(args, fromBlock, toBlock) + queryToCount = queryToCountLogsByBlockNumbers + queryToSelect = queryToSelectLogsByBlockNumbers + } + q := p.getExecQuerier(dbTx) - rows, err := q.Query(ctx, query, args...) + if p.cfg.MaxLogsCount > 0 { + var count uint64 + err := q.QueryRow(ctx, queryToCount, args...).Scan(&count) + if err != nil { + return nil, err + } + + if count > p.cfg.MaxLogsCount { + return nil, ErrMaxLogsCountLimitExceeded + } + } + rows, err := q.Query(ctx, queryToSelect, args...) if err != nil { return nil, err } diff --git a/state/pgstatestorage_test.go b/state/pgstatestorage_test.go index d33461d741..575e300c62 100644 --- a/state/pgstatestorage_test.go +++ b/state/pgstatestorage_test.go @@ -27,7 +27,11 @@ var ( ) func setup() { - pgStateStorage = state.NewPostgresStorage(stateDb) + cfg := state.Config{ + MaxLogsCount: 10000, + MaxLogsBlockRange: 10000, + } + pgStateStorage = state.NewPostgresStorage(cfg, stateDb) } func TestGetBatchByL2BlockNumber(t *testing.T) { @@ -676,3 +680,126 @@ func TestGetBatchByNumber(t *testing.T) { require.NoError(t, dbTx.Commit(ctx)) } + +func TestGetLogs(t *testing.T) { + initOrResetDB() + + ctx := context.Background() + + cfg := state.Config{ + MaxLogsCount: 8, + MaxLogsBlockRange: 10, + } + pgStateStorage = state.NewPostgresStorage(cfg, stateDb) + testState.PostgresStorage = pgStateStorage + + dbTx, err := testState.BeginStateTransaction(ctx) + require.NoError(t, err) + err = testState.AddBlock(ctx, block, dbTx) + assert.NoError(t, err) + + batchNumber := uint64(1) + _, err = testState.PostgresStorage.Exec(ctx, "INSERT INTO state.batch (batch_num) VALUES ($1)", batchNumber) + assert.NoError(t, err) + + time := time.Now() + blockNumber := big.NewInt(1) + + for i := 0; i < 3; i++ { + tx := types.NewTx(&types.LegacyTx{ + Nonce: uint64(i), + To: nil, + Value: new(big.Int), + Gas: 0, + GasPrice: big.NewInt(0), + }) + + logs := []*types.Log{} + for j := 0; j < 4; j++ { + logs = append(logs, &types.Log{TxHash: tx.Hash(), Index: uint(j)}) + } + + receipt := &types.Receipt{ + Type: uint8(tx.Type()), + PostState: state.ZeroHash.Bytes(), + CumulativeGasUsed: 0, + EffectiveGasPrice: big.NewInt(0), + BlockNumber: blockNumber, + GasUsed: tx.Gas(), + TxHash: tx.Hash(), + TransactionIndex: 0, + Status: types.ReceiptStatusSuccessful, + Logs: logs, + } + + transactions := []*types.Transaction{tx} + receipts := []*types.Receipt{receipt} + + header := &types.Header{ + Number: big.NewInt(int64(i) + 1), + ParentHash: state.ZeroHash, + Coinbase: state.ZeroAddress, + Root: state.ZeroHash, + GasUsed: 1, + GasLimit: 10, + Time: uint64(time.Unix()), + } + + l2Block := types.NewBlock(header, transactions, []*types.Header{}, receipts, &trie.StackTrie{}) + for _, receipt := range receipts { + receipt.BlockHash = l2Block.Hash() + } + + err = testState.AddL2Block(ctx, batchNumber, l2Block, receipts, state.MaxEffectivePercentage, dbTx) + require.NoError(t, err) + } + + type testCase struct { + name string + from uint64 + to uint64 + logCount int + expectedError error + } + + testCases := []testCase{ + { + name: "invalid block range", + from: 2, + to: 1, + logCount: 0, + expectedError: state.ErrInvalidBlockRange, + }, + { + name: "block range bigger than allowed", + from: 1, + to: 12, + logCount: 0, + expectedError: state.ErrMaxLogsBlockRangeLimitExceeded, + }, + { + name: "log count bigger than allowed", + from: 1, + to: 3, + logCount: 0, + expectedError: state.ErrMaxLogsCountLimitExceeded, + }, + { + name: "logs returned successfully", + from: 1, + to: 2, + logCount: 8, + expectedError: nil, + }, + } + + for _, testCase := range testCases { + t.Run(testCase.name, func(t *testing.T) { + logs, err := testState.GetLogs(ctx, testCase.from, testCase.to, []common.Address{}, [][]common.Hash{}, nil, nil, dbTx) + + assert.Equal(t, testCase.logCount, len(logs)) + assert.Equal(t, testCase.expectedError, err) + }) + } + require.NoError(t, dbTx.Commit(ctx)) +} diff --git a/state/state_test.go b/state/state_test.go index c8209d1cd6..3edb434040 100644 --- a/state/state_test.go +++ b/state/state_test.go @@ -58,6 +58,8 @@ var ( stateCfg = state.Config{ MaxCumulativeGasUsed: 800000, ChainID: 1000, + MaxLogsCount: 10000, + MaxLogsBlockRange: 10000, ForkIDIntervals: []state.ForkIDInterval{{ FromBatchNumber: 0, ToBatchNumber: math.MaxUint64, @@ -117,7 +119,7 @@ func TestMain(m *testing.M) { } eventLog := event.NewEventLog(event.Config{}, eventStorage) - testState = state.NewState(stateCfg, state.NewPostgresStorage(stateDb), executorClient, stateTree, eventLog) + testState = state.NewState(stateCfg, state.NewPostgresStorage(stateCfg, stateDb), executorClient, stateTree, eventLog) result := m.Run() diff --git a/test/operations/manager.go b/test/operations/manager.go index f14df086fa..6a35485be6 100644 --- a/test/operations/manager.go +++ b/test/operations/manager.go @@ -462,16 +462,16 @@ func initState(maxCumulativeGasUsed uint64) (*state.State, error) { return nil, err } + stateCfg := state.Config{ + MaxCumulativeGasUsed: maxCumulativeGasUsed, + } + ctx := context.Background() - stateDb := state.NewPostgresStorage(sqlDB) + stateDb := state.NewPostgresStorage(stateCfg, sqlDB) executorClient, _, _ := executor.NewExecutorClient(ctx, executorConfig) stateDBClient, _, _ := merkletree.NewMTDBServiceClient(ctx, merkleTreeConfig) stateTree := merkletree.NewStateTree(stateDBClient) - stateCfg := state.Config{ - MaxCumulativeGasUsed: maxCumulativeGasUsed, - } - eventStorage, err := nileventstorage.NewNilEventStorage() if err != nil { return nil, err diff --git a/tools/state/reprocess_cmd.go b/tools/state/reprocess_cmd.go index 2d7200f309..88f03afb2a 100644 --- a/tools/state/reprocess_cmd.go +++ b/tools/state/reprocess_cmd.go @@ -138,7 +138,7 @@ func getL2ChainID(cliCtx *cli.Context, c *config.Config) uint64 { } func newState(ctx context.Context, c *config.Config, l2ChainID uint64, forkIDIntervals []state.ForkIDInterval, sqlDB *pgxpool.Pool, eventLog *event.EventLog, needsExecutor, needsStateTree bool) *state.State { - stateDb := state.NewPostgresStorage(sqlDB) + stateDb := state.NewPostgresStorage(state.Config{}, sqlDB) // Executor var executorClient executor.ExecutorServiceClient