diff --git a/config/config_test.go b/config/config_test.go index 6ba1131a3b..d6c40ed30a 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -169,6 +169,10 @@ func Test_Defaults(t *testing.T) { path: "Sequencer.StreamServer.Version", expectedValue: uint8(0), }, + { + path: "Sequencer.StreamServer.WriteTimeout", + expectedValue: types.NewDuration(5 * time.Second), + }, { path: "Sequencer.StreamServer.Enabled", expectedValue: false, diff --git a/config/default.go b/config/default.go index 0b73699125..bff3d90a70 100644 --- a/config/default.go +++ b/config/default.go @@ -163,6 +163,7 @@ StateConsistencyCheckInterval = "5s" Port = 0 Filename = "" Version = 0 + WriteTimeout = "5s" Enabled = false [SequenceSender] diff --git a/config/environments/local/local.node.config.toml b/config/environments/local/local.node.config.toml index 11077845ca..c403a0718a 100644 --- a/config/environments/local/local.node.config.toml +++ b/config/environments/local/local.node.config.toml @@ -112,6 +112,7 @@ StateConsistencyCheckInterval = "5s" [Sequencer.StreamServer] Port = 0 Filename = "" + WriteTimeout = "5s" Enabled = false [SequenceSender] diff --git a/docs/config-file/node-config-doc.html b/docs/config-file/node-config-doc.html index 79d14baf44..14800feab3 100644 --- a/docs/config-file/node-config-doc.html +++ b/docs/config-file/node-config-doc.html @@ -56,7 +56,9 @@
"300ms"
 

Default: 0Type: integer

HaltOnBatchNumber specifies the batch number where the Sequencer will stop to process more transactions and generate new batches.
The Sequencer will halt after it closes the batch equal to this number


Default: falseType: boolean

SequentialBatchSanityCheck indicates if the reprocess of a closed batch (sanity check) must be done in a
sequential way (instead than in parallel)


Default: falseType: boolean

SequentialProcessL2Block indicates if the processing of a L2 Block must be done in the same finalizer go func instead
in the processPendingL2Blocks go func


Metrics is the config for the sequencer metrics
Default: "1h0m0s"Type: string

Interval is the interval of time to calculate sequencer metrics


Examples:

"1m"
 
"300ms"
-

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
+

Default: trueType: boolean

EnableLog is a flag to enable/disable metrics logs


StreamServerCfg is the config for the stream server
Default: 0Type: integer

Port to listen on


Default: ""Type: string

Filename of the binary data file


Default: 0Type: integer

Version of the binary data file


Default: 0Type: integer

ChainID is the chain ID


Default: falseType: boolean

Enabled is a flag to enable/disable the data streamer


Log is the log configuration
Default: ""Type: enum (of string)

Must be one of:

  • "production"
  • "development"

Default: ""Type: enum (of string)

Must be one of:

  • "debug"
  • "info"
  • "warn"
  • "error"
  • "dpanic"
  • "panic"
  • "fatal"

Type: array of string

Each item of this array must be:


Default: 0Type: integer

UpgradeEtrogBatchNumber is the batch number of the upgrade etrog


Default: "5s"Type: string

WriteTimeout is the TCP write timeout when sending data to a datastream client


Examples:

"1m"
+
"300ms"
+

Configuration of the sequence sender service
Default: "5s"Type: string

WaitPeriodSendSequence is the time the sequencer waits until
trying to send a sequence to L1


Examples:

"1m"
 
"300ms"
 

Default: "5s"Type: string

LastBatchVirtualizationTimeMaxWaitPeriod is time since sequences should be sent


Examples:

"1m"
 
"300ms"
diff --git a/docs/config-file/node-config-doc.md b/docs/config-file/node-config-doc.md
index afe68fdc2f..3dab591984 100644
--- a/docs/config-file/node-config-doc.md
+++ b/docs/config-file/node-config-doc.md
@@ -2487,6 +2487,7 @@ EnableLog=true
 | - [Enabled](#Sequencer_StreamServer_Enabled )                                 | No      | boolean | No         | -          | Enabled is a flag to enable/disable the data streamer            |
 | - [Log](#Sequencer_StreamServer_Log )                                         | No      | object  | No         | -          | Log is the log configuration                                     |
 | - [UpgradeEtrogBatchNumber](#Sequencer_StreamServer_UpgradeEtrogBatchNumber ) | No      | integer | No         | -          | UpgradeEtrogBatchNumber is the batch number of the upgrade etrog |
+| - [WriteTimeout](#Sequencer_StreamServer_WriteTimeout )                       | No      | string  | No         | -          | Duration                                                         |
 
 #### 10.9.1. `Sequencer.StreamServer.Port`
 
@@ -2624,6 +2625,32 @@ Must be one of:
 UpgradeEtrogBatchNumber=0
 ```
 
+#### 10.9.8. `Sequencer.StreamServer.WriteTimeout`
+
+**Title:** Duration
+
+**Type:** : `string`
+
+**Default:** `"5s"`
+
+**Description:** WriteTimeout is the TCP write timeout when sending data to a datastream client
+
+**Examples:** 
+
+```json
+"1m"
+```
+
+```json
+"300ms"
+```
+
+**Example setting the default value** ("5s"):
+```
+[Sequencer.StreamServer]
+WriteTimeout="5s"
+```
+
 ## 11. `[SequenceSender]`
 
 **Type:** : `object`
diff --git a/docs/config-file/node-config-schema.json b/docs/config-file/node-config-schema.json
index 5096caac0e..489ee17274 100644
--- a/docs/config-file/node-config-schema.json
+++ b/docs/config-file/node-config-schema.json
@@ -998,6 +998,16 @@
 							"type": "integer",
 							"description": "UpgradeEtrogBatchNumber is the batch number of the upgrade etrog",
 							"default": 0
+						},
+						"WriteTimeout": {
+							"type": "string",
+							"title": "Duration",
+							"description": "WriteTimeout is the TCP write timeout when sending data to a datastream client",
+							"default": "5s",
+							"examples": [
+								"1m",
+								"300ms"
+							]
 						}
 					},
 					"additionalProperties": false,
diff --git a/go.mod b/go.mod
index c94597b1ab..64ced2a895 100644
--- a/go.mod
+++ b/go.mod
@@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
 go 1.21
 
 require (
-	github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
+	github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4
 	github.com/didip/tollbooth/v6 v6.1.2
 	github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
 	github.com/ethereum/go-ethereum v1.13.11
diff --git a/go.sum b/go.sum
index bff27c6313..da7a1cbead 100644
--- a/go.sum
+++ b/go.sum
@@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
 dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
 dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
-github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
-github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
+github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 h1:+4K+xSzv0ImbK30B/T9FauNTrTFUmWcNKYhIgwsE4C4=
+github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
 github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
 github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
diff --git a/sequencer/config.go b/sequencer/config.go
index 03aeeb740b..5f34cad1d9 100644
--- a/sequencer/config.go
+++ b/sequencer/config.go
@@ -52,6 +52,8 @@ type StreamServerCfg struct {
 	Log log.Config `mapstructure:"Log"`
 	// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
 	UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
+	// WriteTimeout is the TCP write timeout when sending data to a datastream client
+	WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
 }
 
 // FinalizerCfg contains the finalizer's configuration properties
diff --git a/sequencer/l2block.go b/sequencer/l2block.go
index 7cc21fc928..511d233988 100644
--- a/sequencer/l2block.go
+++ b/sequencer/l2block.go
@@ -244,7 +244,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
 		if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
 			return fmt.Errorf("error subtracting L2 block %d [%d] needed resources from the batch %d, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
 				blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
-				f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
+				f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
 		}
 
 		l2Block.batch.finalHighReservedZKCounters = newHighZKCounters
@@ -252,7 +252,7 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
 	} else {
 		overflowLog := fmt.Sprintf("L2 block %d [%d] needed resources exceeds the remaining batch %d resources, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
 			blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
-			f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
+			f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
 
 		f.LogEvent(ctx, event.Level_Warning, event.EventID_ReservedZKCountersOverflow, overflowLog, nil)
 
diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go
index f191973767..92909e2946 100644
--- a/sequencer/sequencer.go
+++ b/sequencer/sequencer.go
@@ -17,7 +17,7 @@ import (
 )
 
 const (
-	datastreamChannelBufferSize = 20
+	datastreamChannelBufferSize = 50
 )
 
 // Sequencer represents a sequencer
@@ -72,7 +72,7 @@ func (s *Sequencer) Start(ctx context.Context) {
 
 	// Start stream server if enabled
 	if s.cfg.StreamServer.Enabled {
-		s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log)
+		s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, s.cfg.StreamServer.WriteTimeout.Duration, &s.cfg.StreamServer.Log)
 		if err != nil {
 			log.Fatalf("failed to create stream server, error: %v", err)
 		}
diff --git a/test/config/debug.node.config.toml b/test/config/debug.node.config.toml
index 68f7dd17ce..54ca62a789 100644
--- a/test/config/debug.node.config.toml
+++ b/test/config/debug.node.config.toml
@@ -112,6 +112,7 @@ StateConsistencyCheckInterval = "5s"
 		Port = 6900
 		Filename = "/datastreamer/datastream.bin"
 		Version = 1
+		WriteTimeout = "5s"
 		Enabled = true
 
 [SequenceSender]
diff --git a/test/config/test.node.config.toml b/test/config/test.node.config.toml
index 1df2fc1882..0358afa59e 100644
--- a/test/config/test.node.config.toml
+++ b/test/config/test.node.config.toml
@@ -128,6 +128,7 @@ StateConsistencyCheckInterval = "5s"
 		Filename = "/datastreamer/datastream.bin"
 		Version = 1
 		ChainID = 1337
+		WriteTimeout = "5s"
 		Enabled = true
 
 [SequenceSender]
diff --git a/tools/datastreamer/config/config.go b/tools/datastreamer/config/config.go
index 0acb225cf9..b6c841e591 100644
--- a/tools/datastreamer/config/config.go
+++ b/tools/datastreamer/config/config.go
@@ -7,6 +7,7 @@ import (
 
 	"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
 	"github.com/0xPolygonHermez/zkevm-data-streamer/log"
+	"github.com/0xPolygonHermez/zkevm-node/config/types"
 	"github.com/0xPolygonHermez/zkevm-node/db"
 	"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
 	"github.com/mitchellh/mapstructure"
@@ -48,6 +49,8 @@ type StreamServerCfg struct {
 	Log log.Config `mapstructure:"Log"`
 	// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
 	UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
+	// WriteTimeout is the TCP write timeout when sending data to a datastream client
+	WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
 }
 
 // Config is the configuration for the tool
diff --git a/tools/datastreamer/config/tool.config.toml b/tools/datastreamer/config/tool.config.toml
index 0e8fc09fc9..f5530b8271 100644
--- a/tools/datastreamer/config/tool.config.toml
+++ b/tools/datastreamer/config/tool.config.toml
@@ -7,6 +7,7 @@ Port = 6901
 Filename = "datastream.bin"
 Version = 3
 ChainID = 1440
+WriteTimeout = "5s"
 UpgradeEtrogBatchNumber = 0
 
 [StateDB]
diff --git a/tools/datastreamer/main.go b/tools/datastreamer/main.go
index a2e3d19003..fc069117af 100644
--- a/tools/datastreamer/main.go
+++ b/tools/datastreamer/main.go
@@ -184,7 +184,7 @@ func main() {
 
 func initializeStreamServer(c *config.Config) (*datastreamer.StreamServer, error) {
 	// Create a stream server
-	streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, &c.Log)
+	streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, c.Offline.WriteTimeout.Duration, &c.Log)
 	if err != nil {
 		return nil, err
 	}