Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do fatal when datastream channel is full (workaround to fix datastream blocking issue) #3650

Merged
merged 2 commits into from
May 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/0xPolygonHermez/zkevm-node
go 1.21

require (
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b
github.com/didip/tollbooth/v6 v6.1.2
github.com/dop251/goja v0.0.0-20230806174421-c933cf95e127
github.com/ethereum/go-ethereum v1.13.11
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1 h1:4wbCJOGcZ8BTuOfNFrcZ1cAVfTWaX1W9EYHaDx3imLc=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240426122934-6f47d2485fc1/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b h1:BzQRXbSnW7BsFvJrnZbCgnxD5+nCGyrYUgqH+3vsnrM=
github.com/0xPolygonHermez/zkevm-data-streamer v0.2.3-0.20240527085154-ca3561dd370b/go.mod h1:0QkAXcFa92mFJrCbN3UPUJGJYes851yEgYHLONnaosE=
github.com/AndreasBriese/bbloom v0.0.0-20190306092124-e2d15f34fcf9/go.mod h1:bOvUY6CB00SOBii9/FifXqc0awNKxLFCL/+pkDPuyl8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
Expand Down
2 changes: 1 addition & 1 deletion sequencer/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ func (f *finalizer) insertSIPBatch(ctx context.Context, batchNumber uint64, stat
}

// Send batch bookmark to the datastream
f.DSSendBatchBookmark(batchNumber)
f.DSSendBatchBookmark(ctx, batchNumber)

// Check if synchronizer is up-to-date
//TODO: review if this is needed
Expand Down
23 changes: 19 additions & 4 deletions sequencer/datastreamer.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package sequencer

import (
"github.com/0xPolygonHermez/zkevm-node/log"
"context"
"fmt"

"github.com/0xPolygonHermez/zkevm-node/state"
)

func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
func (f *finalizer) DSSendL2Block(ctx context.Context, batchNumber uint64, blockResponse *state.ProcessBlockResponse, l1InfoTreeIndex uint32) error {
forkID := f.stateIntf.GetForkIDByBatchNumber(batchNumber)

// Send data to streamer
Expand Down Expand Up @@ -43,23 +45,36 @@ func (f *finalizer) DSSendL2Block(batchNumber uint64, blockResponse *state.Proce
l2Transactions = append(l2Transactions, l2Transaction)
}

log.Infof("[ds-debug] sending l2block %d to datastream channel", blockResponse.BlockNumber)
f.checkDSBufferIsFull(ctx)

f.dataToStream <- state.DSL2FullBlock{
DSL2Block: l2Block,
Txs: l2Transactions,
}

f.dataToStreamCount.Add(1)
}

return nil
}

func (f *finalizer) DSSendBatchBookmark(batchNumber uint64) {
func (f *finalizer) DSSendBatchBookmark(ctx context.Context, batchNumber uint64) {
// Check if stream server enabled
if f.streamServer != nil {
f.checkDSBufferIsFull(ctx)

// Send batch bookmark to the streamer
f.dataToStream <- state.DSBookMark{
Type: state.BookMarkTypeBatch,
Value: batchNumber,
}

f.dataToStreamCount.Add(1)
}
}

func (f *finalizer) checkDSBufferIsFull(ctx context.Context) {
if f.dataToStreamCount.Load() == datastreamChannelBufferSize {
f.Halt(ctx, fmt.Errorf("datastream channel buffer full"), true)
}
}
10 changes: 8 additions & 2 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,9 @@ type finalizer struct {
// interval metrics
metrics *intervalMetrics
// stream server
streamServer *datastreamer.StreamServer
dataToStream chan interface{}
streamServer *datastreamer.StreamServer
dataToStream chan interface{}
dataToStreamCount atomic.Int32
}

// newFinalizer returns a new instance of Finalizer.
Expand Down Expand Up @@ -878,6 +879,11 @@ func (f *finalizer) logZKCounters(counters state.ZKCounters) string {
counters.Binaries, counters.Sha256Hashes_V2, counters.Steps)
}

// Decrease datastreamChannelCount variable
func (f *finalizer) DatastreamChannelCountAdd(ct int32) {
f.dataToStreamCount.Add(ct)
}

// Halt halts the finalizer
func (f *finalizer) Halt(ctx context.Context, err error, isFatal bool) {
f.haltFinalizer.Store(true)
Expand Down
2 changes: 1 addition & 1 deletion sequencer/forcedbatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ func (f *finalizer) handleProcessForcedBatchResponse(ctx context.Context, newBat
}

// Send L2 block to data streamer
err = f.DSSendL2Block(newBatchNumber, forcedL2BlockResponse, 0)
err = f.DSSendL2Block(ctx, newBatchNumber, forcedL2BlockResponse, 0)
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d to data streamer, error: %v", forcedL2BlockResponse.BlockNumber, err)
Expand Down
11 changes: 1 addition & 10 deletions sequencer/l2block.go
Original file line number Diff line number Diff line change
Expand Up @@ -475,9 +475,6 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
return err
}

//TODO: remove this Log
log.Infof("[ds-debug] l2 block %d [%d] stored in statedb", blockResponse.BlockNumber, l2Block.trackingNum)

// Update txs status in the pool
for _, txResponse := range blockResponse.TransactionResponses {
// Change Tx status to selected
Expand All @@ -487,19 +484,13 @@ func (f *finalizer) storeL2Block(ctx context.Context, l2Block *L2Block) error {
}
}

//TODO: remove this log
log.Infof("[ds-debug] l2 block %d [%d] transactions updated as selected in the pooldb", blockResponse.BlockNumber, l2Block.trackingNum)

// Send L2 block to data streamer
err = f.DSSendL2Block(l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
err = f.DSSendL2Block(ctx, l2Block.batch.batchNumber, blockResponse, l2Block.getL1InfoTreeIndex())
if err != nil {
//TODO: we need to halt/rollback the L2 block if we had an error sending to the data streamer?
log.Errorf("error sending L2 block %d [%d] to data streamer, error: %v", blockResponse.BlockNumber, l2Block.trackingNum, err)
}

//TODO: remove this log
log.Infof("[ds-debug] l2 block %d [%d] sent to datastream", blockResponse.BlockNumber, l2Block.trackingNum)

for _, tx := range l2Block.transactions {
// Delete the tx from the pending list in the worker (addrQueue)
f.workerIntf.DeleteTxPendingToStore(tx.Hash, tx.From)
Expand Down
26 changes: 4 additions & 22 deletions sequencer/sequencer.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
)

const (
datastreamChannelMultiplier = 2
datastreamChannelBufferSize = 10
)

// Sequencer represents a sequencer
Expand Down Expand Up @@ -59,9 +59,7 @@ func New(cfg Config, batchCfg state.BatchConfig, poolCfg pool.Config, txPool txP
eventLog: eventLog,
}

// TODO: Make configurable
channelBufferSize := 200 * datastreamChannelMultiplier // nolint:gomnd
sequencer.dataToStream = make(chan interface{}, channelBufferSize)
sequencer.dataToStream = make(chan interface{}, datastreamChannelBufferSize)

return sequencer, nil
}
Expand Down Expand Up @@ -270,8 +268,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
case state.DSL2FullBlock:
l2Block := data

//TODO: remove this log
log.Infof("[ds-debug] start atomic op for l2block %d", l2Block.L2BlockNumber)
err = s.streamServer.StartAtomicOp()
if err != nil {
log.Errorf("failed to start atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
Expand All @@ -283,8 +279,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
Value: l2Block.L2BlockNumber,
}

//TODO: remove this log
log.Infof("[ds-debug] add stream bookmark for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamBookmark(bookMark.Encode())
if err != nil {
log.Errorf("failed to add stream bookmark for l2block %d, error: %v", l2Block.L2BlockNumber, err)
Expand All @@ -299,8 +293,6 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
Value: l2Block.L2BlockNumber - 1,
}

//TODO: remove this log
log.Infof("[ds-debug] get previous l2block %d", l2Block.L2BlockNumber-1)
previousL2BlockEntry, err := s.streamServer.GetFirstEventAfterBookmark(bookMark.Encode())
if err != nil {
log.Errorf("failed to get previous l2block %d, error: %v", l2Block.L2BlockNumber-1, err)
Expand All @@ -323,16 +315,12 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
ChainID: uint32(chainID),
}

//TODO: remove this log
log.Infof("[ds-debug] add l2blockStart stream entry for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockStart, blockStart.Encode())
if err != nil {
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] adding l2tx stream entries for l2block %d", l2Block.L2BlockNumber)
for _, l2Transaction := range l2Block.Txs {
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2Tx, l2Transaction.Encode())
if err != nil {
Expand All @@ -347,25 +335,17 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
StateRoot: l2Block.StateRoot,
}

//TODO: remove this log
log.Infof("[ds-debug] add l2blockEnd stream entry for l2block %d", l2Block.L2BlockNumber)
_, err = s.streamServer.AddStreamEntry(state.EntryTypeL2BlockEnd, blockEnd.Encode())
if err != nil {
log.Errorf("failed to add stream entry for l2block %d, error: %v", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] commit atomic op for l2block %d", l2Block.L2BlockNumber)
err = s.streamServer.CommitAtomicOp()
if err != nil {
log.Errorf("failed to commit atomic op for l2block %d, error: %v ", l2Block.L2BlockNumber, err)
continue
}

//TODO: remove this log
log.Infof("[ds-debug] l2block %d sent to datastream", l2Block.L2BlockNumber)

// Stream a bookmark
case state.DSBookMark:
bookmark := data
Expand All @@ -392,6 +372,8 @@ func (s *Sequencer) sendDataToStreamer(chainID uint64) {
log.Errorf("invalid stream message type received")
}
}

s.finalizer.DatastreamChannelCountAdd(-1)
}
}

Expand Down
Loading