Skip to content

Commit

Permalink
Cherry-pick 0xPolygonHermez#3669: Add WriteTimeout config parameter t…
Browse files Browse the repository at this point in the history
…o StreamServer (0xPolygonHermez#3690)

* Add WriteTimeout config parameter to StreamServer (0xPolygonHermez#3669)

* Add WriteTimeout config parameter to StreamServer. Update DS library

* update doc

* update default value for StreamServer.WriteTimeout config parameter. Increase buffer for datastream channel

* fix doc

* fix config test

* fix doc
  • Loading branch information
agnusmor authored and zjg555543 committed Jun 19, 2024
1 parent b87a614 commit 79ab633
Show file tree
Hide file tree
Showing 16 changed files with 379 additions and 1,764 deletions.
4 changes: 4 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ func Test_Defaults(t *testing.T) {
path: "Sequencer.StreamServer.Version",
expectedValue: uint8(0),
},
{
path: "Sequencer.StreamServer.WriteTimeout",
expectedValue: types.NewDuration(5 * time.Second),
},
{
path: "Sequencer.StreamServer.Enabled",
expectedValue: false,
Expand Down
1 change: 1 addition & 0 deletions config/default.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ StateConsistencyCheckInterval = "5s"
Port = 0
Filename = ""
Version = 0
WriteTimeout = "5s"
Enabled = false
[SequenceSender]
Expand Down
1 change: 1 addition & 0 deletions config/environments/local/local.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ StateConsistencyCheckInterval = "5s"
[Sequencer.StreamServer]
Port = 0
Filename = ""
WriteTimeout = "5s"
Enabled = false

[SequenceSender]
Expand Down
44 changes: 19 additions & 25 deletions docs/config-file/node-config-doc.html

Large diffs are not rendered by default.

2,055 changes: 327 additions & 1,728 deletions docs/config-file/node-config-doc.md

Large diffs are not rendered by default.

10 changes: 10 additions & 0 deletions docs/config-file/node-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -1402,6 +1402,16 @@
"type": "integer",
"description": "UpgradeEtrogBatchNumber is the batch number of the upgrade etrog",
"default": 0
},
"WriteTimeout": {
"type": "string",
"title": "Duration",
"description": "WriteTimeout is the TCP write timeout when sending data to a datastream client",
"default": "5s",
"examples": [
"1m",
"300ms"
]
}
},
"additionalProperties": false,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ github.com/0xPolygon/agglayer v0.0.1 h1:J6/DUo9rNUncDifquanouRCo2g7g069yvz0aFtu7
github.com/0xPolygon/agglayer v0.0.1/go.mod h1:UYp5O8THULoXVrUfzkRjVBzxHR5DxBdUN/Iq0EgxNxM=
github.com/0xPolygon/cdk-data-availability v0.0.5 h1://vg1oR/5tw2XfEIorpP+wIxLfNUmoKrdmX8YZvBKX4=
github.com/0xPolygon/cdk-data-availability v0.0.5/go.mod h1:aGwqHiJhL+mJbdepl3s58wsY18EuViDa9vZCpPuIYGw=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4 h1:+4K+xSzv0ImbK30B/T9FauNTrTFUmWcNKYhIgwsE4C4=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-RC4/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
Expand Down
2 changes: 2 additions & 0 deletions sequencer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ type StreamServerCfg struct {
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
// WriteTimeout is the TCP write timeout when sending data to a datastream client
WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
}

// FinalizerCfg contains the finalizer's configuration properties
Expand Down
4 changes: 2 additions & 2 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,15 +245,15 @@ func (f *finalizer) processL2Block(ctx context.Context, l2Block *L2Block) error
if subOverflow { // Sanity check, this cannot happen as reservedZKCounters should be >= that usedZKCounters
return fmt.Errorf("error subtracting L2 block %d [%d] needed resources from the batch %d, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))
}

l2Block.batch.finalHighReservedZKCounters = newHighZKCounters
l2Block.highReservedZKCounters = l2Block.batch.finalHighReservedZKCounters
} else {
overflowLog := fmt.Sprintf("L2 block %d [%d] needed resources exceeds the remaining batch %d resources, overflow resource: %s, batch bytes: %d, L2 block bytes: %d, counters: {batch: %s, used: %s, reserved: %s, needed: %s, high: %s}",
blockResponse.BlockNumber, l2Block.trackingNum, l2Block.batch.batchNumber, overflowResource, l2Block.batch.finalRemainingResources.Bytes, batchL2DataSize,
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.imHighReservedZKCounters))
f.logZKCounters(l2Block.batch.finalRemainingResources.ZKCounters), f.logZKCounters(batchResponse.UsedZkCounters), f.logZKCounters(batchResponse.ReservedZkCounters), f.logZKCounters(neededZKCounters), f.logZKCounters(l2Block.batch.finalHighReservedZKCounters))

f.LogEvent(ctx, event.Level_Warning, event.EventID_ReservedZKCountersOverflow, overflowLog, nil)

Expand Down
8 changes: 3 additions & 5 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

const (
datastreamChannelMultiplier = 2
datastreamChannelBufferSize = 50
)

// Sequencer represents a sequencer
Expand Down Expand Up @@ -61,9 +61,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP
eventLog: eventLog,
}

// TODO: Make configurable
channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd
sequencer.dataToStream = make(chan interface{}, channelBufferSize)
sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize)

return sequencer, nil
}
Expand All @@ -83,7 +81,7 @@ func (s *Sequencer) Start(ctx context.Context) {

// Start stream server if enabled
if s.cfg.StreamServer.Enabled {
s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, &s.cfg.StreamServer.Log)
s.streamServer, err = datastreamer.NewServer(s.cfg.StreamServer.Port, s.cfg.StreamServer.Version, s.cfg.StreamServer.ChainID, state.StreamTypeSequencer, s.cfg.StreamServer.Filename, s.cfg.StreamServer.WriteTimeout.Duration, &s.cfg.StreamServer.Log)
if err != nil {
log.Fatalf("failed to create stream server, error: %v", err)
}
Expand Down
1 change: 1 addition & 0 deletions test/config/debug.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ StateConsistencyCheckInterval = "5s"
Port = 6900
Filename = "/datastreamer/datastream.bin"
Version = 1
WriteTimeout = "5s"
Enabled = true

[SequenceSender]
Expand Down
1 change: 1 addition & 0 deletions test/config/test.node.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ StateConsistencyCheckInterval = "5s"
Filename = "/datastreamer/datastream.bin"
Version = 1
ChainID = 195
WriteTimeout = "5s"
Enabled = true

[SequenceSender]
Expand Down
3 changes: 3 additions & 0 deletions tools/datastreamer/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/0xPolygonHermez/zkevm-data-streamer/datastreamer"
"github.com/0xPolygonHermez/zkevm-data-streamer/log"
"github.com/0xPolygonHermez/zkevm-node/config/types"
"github.com/0xPolygonHermez/zkevm-node/db"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/mitchellh/mapstructure"
Expand Down Expand Up @@ -48,6 +49,8 @@ type StreamServerCfg struct {
Log log.Config `mapstructure:"Log"`
// UpgradeEtrogBatchNumber is the batch number of the upgrade etrog
UpgradeEtrogBatchNumber uint64 `mapstructure:"UpgradeEtrogBatchNumber"`
// WriteTimeout is the TCP write timeout when sending data to a datastream client
WriteTimeout types.Duration `mapstructure:"WriteTimeout"`
}

// Config is the configuration for the tool
Expand Down
1 change: 1 addition & 0 deletions tools/datastreamer/config/tool.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ Port = 6901
Filename = "datastream.bin"
Version = 1
ChainID = 1440
WriteTimeout = "5s"
UpgradeEtrogBatchNumber = 0

[StateDB]
Expand Down
2 changes: 1 addition & 1 deletion tools/datastreamer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func main() {

func initializeStreamServer(c *config.Config) (*datastreamer.StreamServer, error) {
// Create a stream server
streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, &c.Log)
streamServer, err := datastreamer.NewServer(c.Offline.Port, c.Offline.Version, c.Offline.ChainID, state.StreamTypeSequencer, c.Offline.Filename, c.Offline.WriteTimeout.Duration, &c.Log)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit 79ab633

Please sign in to comment.