diff --git a/go.mod b/go.mod index db2f719558..c94597b1ab 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node go 1.21 require ( - github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 + github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b github.com/didip/tollbooth/v6 v6.1.2 github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127 github.com/ethereum/go-ethereum v1.13.11 diff --git a/go.sum b/go.sum index a9de27ba91..bff27c6313 100644 --- a/go.sum +++ b/go.sum @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9 dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc= -github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE= +github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM= +github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE= github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= diff --git a/sequencer/batch.go b/sequencer/batch.go index ecaa93aca7..cd69343e26 100644 --- a/sequencer/batch.go +++ b/sequencer/batch.go @@ -335,7 +335,7 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat } // Send batch bookmark to the datastream - f.DSSendBatchBookmark(batchNumber) + f.DSSendBatchBookmark(ctx, batchNumber) // Check if synchronizer is up-to-date //TODO: review if this is needed diff --git a/sequencer/datastreamer.go b/sequencer/datastreamer.go index 7f5e7e763a..2c2b24ec13 100644 --- a/sequencer/datastreamer.go +++ b/sequencer/datastreamer.go @@ -1,11 +1,13 @@ package sequencer import ( - "github.com/0xPolygonHermez/zkevm-node/log" + "context" + "fmt" + "github.com/0xPolygonHermez/zkevm-node/state" ) -func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error { +func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error { forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber) // Send data to streamer @@ -43,23 +45,36 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce l2Transactions = append(l2Transactions, l2Transaction) } - log.Infof("[ds-debug] sending l2block %d to datastream channel", blockResponse.BlockNumber) + f.checkDSBufferIsFull(ctx) + f.dataToStream <- state.DSL2FullBlock{ DSL2Block: l2Block, Txs: l2Transactions, } + + f.dataToStreamCount.Add(1) } return nil } -func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) { +func (f *finalizer) DSSendBatchBookmark(ctx context.Context, batchNumber uint64) { // Check if stream server enabled if f.streamServer != nil { + f.checkDSBufferIsFull(ctx) + // Send batch bookmark to the streamer f.dataToStream <- state.DSBookMark{ Type: state.BookMarkTypeBatch, Value: batchNumber, } + + f.dataToStreamCount.Add(1) + } +} + +func (f *finalizer) checkDSBufferIsFull(ctx context.Context) { + if f.dataToStreamCount.Load() == datastreamChannelBufferSize { + f.Halt(ctx, fmt.Errorf("datastream channel buffer full"), true) } } diff --git a/sequencer/finalizer.go b/sequencer/finalizer.go index a06f2341fa..dc94b36810 100644 --- a/sequencer/finalizer.go +++ b/sequencer/finalizer.go @@ -83,8 +83,9 @@ type finalizer struct { // interval metrics metrics *intervalMetrics // stream server - streamServer *datastreamer.StreamServer - dataToStream chan interface{} + streamServer *datastreamer.StreamServer + dataToStream chan interface{} + dataToStreamCount atomic.Int32 } // newFinalizer returns a new instance of Finalizer. @@ -878,6 +879,11 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string { counters.Binaries, counters.Sha256Hashes_V2, counters.Steps) } +// Decrease datastreamChannelCount variable +func (f *finalizer) DatastreamChannelCountAdd(ct int32) { + f.dataToStreamCount.Add(ct) +} + // Halt halts the finalizer func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) { f.haltFinalizer.Store(true) diff --git a/sequencer/forcedbatch.go b/sequencer/forcedbatch.go index 85f74abee1..c34628611a 100644 --- a/sequencer/forcedbatch.go +++ b/sequencer/forcedbatch.go @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat } // Send L2 block to data streamer - err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0) + err = f.DSSendL2Block(ctx, newBatchNumber, forcedL2BlockResponse, 0) if err != nil { //TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer? log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err) diff --git a/sequencer/l2block.go b/sequencer/l2block.go index 8370918106..d9f7b1d8f6 100644 --- a/sequencer/l2block.go +++ b/sequencer/l2block.go @@ -475,9 +475,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { return err } - //TODO: remove this Log - log.Infof("[ds-debug] l2 block %d [%d] stored in statedb", blockResponse.BlockNumber, l2Block.trackingNum) - // Update txs status in the pool for _, txResponse := range blockResponse.TransactionResponses { // Change Tx status to selected @@ -487,19 +484,13 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error { } } - //TODO: remove this log - log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum) - // Send L2 block to data streamer - err = f.DSSendL2Block(l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex()) + err = f.DSSendL2Block(ctx, l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex()) if err != nil { //TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer? log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err) } - //TODO: remove this log - log.Infof("[ds-debug] l2 block %d [%d] sent to datastream", blockResponse.BlockNumber, l2Block.trackingNum) - for _, tx := range l2Block.transactions { // Delete the tx from the pending list in the worker (addrQueue) f.workerIntf.DeleteTxPendingToStore(tx.Hash, tx.From) diff --git a/sequencer/sequencer.go b/sequencer/sequencer.go index b79ad26c17..1e290a53e3 100644 --- a/sequencer/sequencer.go +++ b/sequencer/sequencer.go @@ -15,7 +15,7 @@ import ( ) const ( - datastreamChannelMultiplier = 2 + datastreamChannelBufferSize = 10 ) // Sequencer represents a sequencer @@ -59,9 +59,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP eventLog: eventLog, } - // TODO: Make configurable - channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd - sequencer.dataToStream = make(chan interface{}, channelBufferSize) + sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize) return sequencer, nil } @@ -270,8 +268,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) { case state.DSL2FullBlock: l2Block := data - //TODO: remove this log - log.Infof("[ds-debug] start atomic op for l2block %d", l2Block.L2BlockNumber) err = s.streamServer.StartAtomicOp() if err != nil { log.Errorf("failed to start atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err) @@ -283,8 +279,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) { Value: l2Block.L2BlockNumber, } - //TODO: remove this log - log.Infof("[ds-debug] add stream bookmark for l2block %d", l2Block.L2BlockNumber) _, err = s.streamServer.AddStreamBookmark(bookMark.Encode()) if err != nil { log.Errorf("failed to add stream bookmark for l2block %d, error: %v", l2Block.L2BlockNumber, err) @@ -299,8 +293,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) { Value: l2Block.L2BlockNumber - 1, } - //TODO: remove this log - log.Infof("[ds-debug] get previous l2block %d", l2Block.L2BlockNumber-1) previousL2BlockEntry, err := s.streamServer.GetFirstEventAfterBookmark(bookMark.Encode()) if err != nil { log.Errorf("failed to get previous l2block %d, error: %v", l2Block.L2BlockNumber-1, err) @@ -323,16 +315,12 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) { ChainID: uint32(chainID), } - //TODO: remove this log - log.Infof("[ds-debug] add l2blockStart stream entry for l2block %d", l2Block.L2BlockNumber) _, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode()) if err != nil { log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err) continue } - //TODO: remove this log - log.Infof("[ds-debug] adding l2tx stream entries for l2block %d", l2Block.L2BlockNumber) for _, l2Transaction := range l2Block.Txs { _, err = s.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode()) if err != nil { @@ -347,25 +335,17 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) { StateRoot: l2Block.StateRoot, } - //TODO: remove this log - log.Infof("[ds-debug] add l2blockEnd stream entry for l2block %d", l2Block.L2BlockNumber) _, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode()) if err != nil { log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err) continue } - //TODO: remove this log - log.Infof("[ds-debug] commit atomic op for l2block %d", l2Block.L2BlockNumber) err = s.streamServer.CommitAtomicOp() if err != nil { log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err) continue } - - //TODO: remove this log - log.Infof("[ds-debug] l2block %d sent to datastream", l2Block.L2BlockNumber) - // Stream a bookmark case state.DSBookMark: bookmark := data @@ -392,6 +372,8 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) { log.Errorf("invalid stream message type received") } } + + s.finalizer.DatastreamChannelCountAdd(-1) } }