Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

synchronizer: unittest and separation of pool_reorg to unittest that #3111

Merged
merged 9 commits into from
Jan 30, 2024
67 changes: 1 addition & 66 deletions synchronizer/actions/etrog/processor_l1_sequence_batches.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
syncCommon "github.com/0xPolygonHermez/zkevm-node/synchronizer/common"
"github.com/0xPolygonHermez/zkevm-node/synchronizer/common/syncinterfaces"
"github.com/ethereum/go-ethereum/common"
ethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/jackc/pgx/v4"
)

Expand All @@ -32,19 +31,9 @@ type stateProcessSequenceBatches interface {
AddSequence(ctx context.Context, sequence state.Sequence, dbTx pgx.Tx) error
AddVirtualBatch(ctx context.Context, virtualBatch *state.VirtualBatch, dbTx pgx.Tx) error
AddTrustedReorg(ctx context.Context, trustedReorg *state.TrustedReorg, dbTx pgx.Tx) error
GetReorgedTransactions(ctx context.Context, batchNumber uint64, dbTx pgx.Tx) ([]*ethTypes.Transaction, error)
GetL1InfoTreeDataFromBatchL2Data(ctx context.Context, batchL2Data []byte, dbTx pgx.Tx) (map[uint32]state.L1DataV2, common.Hash, common.Hash, error)
}

type ethermanProcessSequenceBatches interface {
GetLatestBatchNumber() (uint64, error)
}

type poolProcessSequenceBatchesInterface interface {
DeleteReorgedTransactions(ctx context.Context, txs []*ethTypes.Transaction) error
StoreTx(ctx context.Context, tx ethTypes.Transaction, ip string, isWIP bool) error
}

type syncProcessSequenceBatchesInterface interface {
PendingFlushID(flushID uint64, proverID string)
IsTrustedSequencer() bool
Expand All @@ -55,17 +44,13 @@ type syncProcessSequenceBatchesInterface interface {
type ProcessorL1SequenceBatchesEtrog struct {
actions.ProcessorBase[ProcessorL1SequenceBatchesEtrog]
state stateProcessSequenceBatches
etherMan ethermanProcessSequenceBatches
pool poolProcessSequenceBatchesInterface
sync syncProcessSequenceBatchesInterface
timeProvider syncCommon.TimeProvider
halter syncinterfaces.CriticalErrorHandler
}

// NewProcessorL1SequenceBatches returns instance of a processor for SequenceBatchesOrder
func NewProcessorL1SequenceBatches(state stateProcessSequenceBatches,
etherMan ethermanProcessSequenceBatches,
pool poolProcessSequenceBatchesInterface,
sync syncProcessSequenceBatchesInterface,
timeProvider syncCommon.TimeProvider,
halter syncinterfaces.CriticalErrorHandler) *ProcessorL1SequenceBatchesEtrog {
Expand All @@ -74,8 +59,6 @@ func NewProcessorL1SequenceBatches(state stateProcessSequenceBatches,
SupportedEvent: []etherman.EventOrder{etherman.SequenceBatchesOrder},
SupportedForkdIds: &ForksIdOnlyEtrog},
state: state,
etherMan: etherMan,
pool: pool,
sync: sync,
timeProvider: timeProvider,
halter: halter,
Expand Down Expand Up @@ -294,18 +277,6 @@ func (p *ProcessorL1SequenceBatchesEtrog) processSequenceBatches(ctx context.Con
// Call the check trusted state method to compare trusted and virtual state
status := p.checkTrustedState(ctx, batch, tBatch, newRoot, dbTx)
if status {
// Reorg Pool
err := p.reorgPool(ctx, dbTx)
if err != nil {
rollbackErr := dbTx.Rollback(ctx)
if rollbackErr != nil {
log.Errorf("error rolling back state. BatchNumber: %d, BlockNumber: %d, rollbackErr: %s, error : %v", tBatch.BatchNumber, blockNumber, rollbackErr.Error(), err)
return rollbackErr
}
log.Errorf("error: %v. BatchNumber: %d, BlockNumber: %d", err, tBatch.BatchNumber, blockNumber)
return err
}

// Clean trustedState sync variables to avoid sync the trusted state from the wrong starting point.
// This wrong starting point would force the trusted sync to clean the virtualization of the batch reaching an inconsistency.
p.sync.CleanTrustedState()
Expand Down Expand Up @@ -377,43 +348,6 @@ func (p *ProcessorL1SequenceBatchesEtrog) processSequenceBatches(ctx context.Con
return nil
}

func (p *ProcessorL1SequenceBatchesEtrog) reorgPool(ctx context.Context, dbTx pgx.Tx) error {
latestBatchNum, err := p.etherMan.GetLatestBatchNumber()
if err != nil {
log.Error("error getting the latestBatchNumber virtualized in the smc. Error: ", err)
return err
}
batchNumber := latestBatchNum + 1
// Get transactions that have to be included in the pool again
txs, err := p.state.GetReorgedTransactions(ctx, batchNumber, dbTx)
if err != nil {
log.Errorf("error getting txs from trusted state. BatchNumber: %d, error: %v", batchNumber, err)
return err
}
log.Debug("Reorged transactions: ", txs)

// Remove txs from the pool
err = p.pool.DeleteReorgedTransactions(ctx, txs)
if err != nil {
log.Errorf("error deleting txs from the pool. BatchNumber: %d, error: %v", batchNumber, err)
return err
}
log.Debug("Delete reorged transactions")

// Add txs to the pool
for _, tx := range txs {
// Insert tx in WIP status to avoid the sequencer to grab them before it gets restarted
// When the sequencer restarts, it will update the status to pending non-wip
err = p.pool.StoreTx(ctx, *tx, "", true)
if err != nil {
log.Errorf("error storing tx into the pool again. TxHash: %s. BatchNumber: %d, error: %v", tx.Hash().String(), batchNumber, err)
return err
}
log.Debug("Reorged transactions inserted in the pool: ", tx.Hash())
}
return nil
}

func (p *ProcessorL1SequenceBatchesEtrog) checkTrustedState(ctx context.Context, batch state.Batch, tBatch *state.Batch, newRoot common.Hash, dbTx pgx.Tx) bool {
//Compare virtual state with trusted state
var reorgReasons strings.Builder
Expand Down Expand Up @@ -450,6 +384,7 @@ func (p *ProcessorL1SequenceBatchesEtrog) checkTrustedState(ctx context.Context,
if p.sync.IsTrustedSequencer() {
log.Errorf("TRUSTED REORG DETECTED! Batch: %d reson:%s", batch.BatchNumber, reason)
p.halt(ctx, fmt.Errorf("TRUSTED REORG DETECTED! Batch: %d", batch.BatchNumber))
panic("TRUSTED REORG DETECTED: halt function never have to finish! it blocks the process")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this panic is unused code. This code won't be executed. Why to put this line? Please remove it

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This panic have to reason to be there:

  • As a safeguard: if, for some bug, halt funcion return is going to prevent to execute more code avoiding a potential risk
  • For unittest: there are no way to test a infite loop function, so the way of doing that is mocking halt object and requiring that produce a panic

}
if !tBatch.WIP {
log.Warnf("missmatch in trusted state detected for Batch Number: %d. Reasons: %s", tBatch.BatchNumber, reason)
Expand Down
186 changes: 162 additions & 24 deletions synchronizer/actions/etrog/processor_l1_sequence_batches_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/0xPolygonHermez/zkevm-node/etherman"
"github.com/0xPolygonHermez/zkevm-node/etherman/smartcontracts/polygonzkevm"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/0xPolygonHermez/zkevm-node/synchronizer/actions"
syncCommon "github.com/0xPolygonHermez/zkevm-node/synchronizer/common"
mock_syncinterfaces "github.com/0xPolygonHermez/zkevm-node/synchronizer/common/syncinterfaces/mocks"
Expand Down Expand Up @@ -41,21 +42,19 @@ type mocksEtrogProcessorL1 struct {

func createMocks(t *testing.T) *mocksEtrogProcessorL1 {
mocks := &mocksEtrogProcessorL1{
Etherman: mock_syncinterfaces.NewEthermanFullInterface(t),
State: mock_syncinterfaces.NewStateFullInterface(t),
Pool: mock_syncinterfaces.NewPoolInterface(t),
Synchronizer: mock_syncinterfaces.NewSynchronizerFullInterface(t),
DbTx: syncMocks.NewDbTxMock(t),
//ZKEVMClient: mock_syncinterfaces.NewZKEVMClientInterface(t),
Etherman: mock_syncinterfaces.NewEthermanFullInterface(t),
State: mock_syncinterfaces.NewStateFullInterface(t),
Pool: mock_syncinterfaces.NewPoolInterface(t),
joanestebanr marked this conversation as resolved.
Show resolved Hide resolved
Synchronizer: mock_syncinterfaces.NewSynchronizerFullInterface(t),
DbTx: syncMocks.NewDbTxMock(t),
TimeProvider: &syncCommon.MockTimerProvider{},
CriticalErrorHandler: mock_syncinterfaces.NewCriticalErrorHandler(t),
//EventLog: &eventLogMock{},
}
return mocks
}

func createSUT(mocks *mocksEtrogProcessorL1) *ProcessorL1SequenceBatchesEtrog {
return NewProcessorL1SequenceBatches(mocks.State, mocks.Etherman, mocks.Pool, mocks.Synchronizer,
return NewProcessorL1SequenceBatches(mocks.State, mocks.Synchronizer,
mocks.TimeProvider, mocks.CriticalErrorHandler)
}

Expand All @@ -82,34 +81,173 @@ func TestL1SequenceBatchesPermissionlessNewBatchSequenced(t *testing.T) {
mocks := createMocks(t)
sut := createSUT(mocks)
ctx := context.Background()
batch := newStateBatch(3)
l1InfoRoot := common.HexToHash(hashExamplesValues[0])
expectationsPreExecution(t, mocks, ctx, batch, state.ErrNotFound)
executionResponse := newProcessBatchResponseV2(batch)
expectationsProcessAndStoreClosedBatchV2(t, mocks, ctx, executionResponse, nil)
expectationsAddSequencedBatch(t, mocks, ctx, executionResponse)
mocks.Synchronizer.EXPECT().PendingFlushID(mock.Anything, mock.Anything)
err := sut.Process(ctx, etherman.Order{Pos: 1}, newL1Block(mocks, batch, l1InfoRoot), mocks.DbTx)
require.NoError(t, err)
}

func TestL1SequenceBatchesTrustedBatchSequencedThatAlreadyExistsHappyPath(t *testing.T) {
mocks := createMocks(t)
sut := createSUT(mocks)
ctx := context.Background()
batch := newStateBatch(3)
l1InfoRoot := common.HexToHash(hashExamplesValues[0])
l1Block := newL1Block(mocks, batch, l1InfoRoot)
expectationsPreExecution(t, mocks, ctx, batch, nil)
executionResponse := newProcessBatchResponseV2(batch)
expectationsForExecution(t, mocks, ctx, l1Block.SequencedBatches[1][0], executionResponse)
mocks.State.EXPECT().AddAccumulatedInputHash(ctx, executionResponse.NewBatchNum, common.BytesToHash(executionResponse.NewAccInputHash), mocks.DbTx).Return(nil)
expectationsAddSequencedBatch(t, mocks, ctx, executionResponse)
err := sut.Process(ctx, etherman.Order{Pos: 1}, l1Block, mocks.DbTx)
require.NoError(t, err)
}

func TestL1SequenceBatchesPermissionlessBatchSequencedThatAlreadyExistsHappyPath(t *testing.T) {
mocks := createMocks(t)
sut := createSUT(mocks)
ctx := context.Background()
batch := newStateBatch(3)
l1InfoRoot := common.HexToHash(hashExamplesValues[0])
l1Block := newL1Block(mocks, batch, l1InfoRoot)
expectationsPreExecution(t, mocks, ctx, batch, nil)
executionResponse := newProcessBatchResponseV2(batch)
expectationsForExecution(t, mocks, ctx, l1Block.SequencedBatches[1][0], executionResponse)
mocks.State.EXPECT().AddAccumulatedInputHash(ctx, executionResponse.NewBatchNum, common.BytesToHash(executionResponse.NewAccInputHash), mocks.DbTx).Return(nil)
expectationsAddSequencedBatch(t, mocks, ctx, executionResponse)
err := sut.Process(ctx, etherman.Order{Pos: 1}, l1Block, mocks.DbTx)
require.NoError(t, err)
}

// CASE: A permissionless process a L1 sequenced batch that already is in state (presumably synced from Trusted)
// - Execute it
// - Check if match state batch
// - Don't match -> Reorg Pool and reset trusted state
// - Reprocess again as a new batch
func TestL1SequenceBatchesPermissionlessBatchSequencedThatAlreadyExistsMismatch(t *testing.T) {
mocks := createMocks(t)
sut := createSUT(mocks)
ctx := context.Background()
batch := newStateBatch(3)
l1InfoRoot := common.HexToHash(hashExamplesValues[0])
l1Block := newL1Block(mocks, batch, l1InfoRoot)
expectationsPreExecution(t, mocks, ctx, batch, nil)
executionResponse := newProcessBatchResponseV2(batch)
executionResponse.NewStateRoot = common.HexToHash(hashExamplesValues[2]).Bytes()
expectationsForExecution(t, mocks, ctx, l1Block.SequencedBatches[1][0], executionResponse)
mocks.State.EXPECT().AddAccumulatedInputHash(ctx, executionResponse.NewBatchNum, common.BytesToHash(executionResponse.NewAccInputHash), mocks.DbTx).Return(nil)
mocks.Synchronizer.EXPECT().IsTrustedSequencer().Return(false)
mocks.State.EXPECT().AddTrustedReorg(ctx, mock.Anything, mocks.DbTx).Return(nil)
mocks.State.EXPECT().ResetTrustedState(ctx, batch.BatchNumber-1, mocks.DbTx).Return(nil)
mocks.Synchronizer.EXPECT().CleanTrustedState()

// Reexecute it as a new batch
expectationsProcessAndStoreClosedBatchV2(t, mocks, ctx, executionResponse, nil)
expectationsAddSequencedBatch(t, mocks, ctx, executionResponse)
mocks.Synchronizer.EXPECT().PendingFlushID(mock.Anything, mock.Anything)
err := sut.Process(ctx, etherman.Order{Pos: 1}, l1Block, mocks.DbTx)
require.NoError(t, err)
}

// CASE: A TRUSTED SYNCHRONIZER process a L1 sequenced batch that already is in state but it doesnt match with the trusted State
// - Execute it
// - Check if match state batch
// - Don't match -> HALT
func TestL1SequenceBatchesTrustedBatchSequencedThatAlreadyExistsMismatch(t *testing.T) {
mocks := createMocks(t)
sut := createSUT(mocks)
ctx := context.Background()
batch := newStateBatch(3)
l1InfoRoot := common.HexToHash(hashExamplesValues[0])
l1Block := newL1Block(mocks, batch, l1InfoRoot)
expectationsPreExecution(t, mocks, ctx, batch, nil)
executionResponse := newProcessBatchResponseV2(batch)
executionResponse.NewStateRoot = common.HexToHash(hashExamplesValues[2]).Bytes()
expectationsForExecution(t, mocks, ctx, l1Block.SequencedBatches[1][0], executionResponse)
mocks.State.EXPECT().AddAccumulatedInputHash(ctx, executionResponse.NewBatchNum, common.BytesToHash(executionResponse.NewAccInputHash), mocks.DbTx).Return(nil)
// Here it says that is a TRUSTED NODE
mocks.Synchronizer.EXPECT().IsTrustedSequencer().Return(true)
// TODO: Really don't have to write a entry to `trusted_reorgs` table? how the rest of servicies known about that??!?
//mocks.State.EXPECT().AddTrustedReorg(ctx, mock.Anything, mocks.DbTx).Return(nil)
mocks.CriticalErrorHandler.EXPECT().CriticalError(mock.Anything, mock.Anything)
assertPanic(t, func() { sut.Process(ctx, etherman.Order{Pos: 1}, l1Block, mocks.DbTx) }) //nolint
}

// --------------------- Helper functions ----------------------------------------------------------------------------------------------------

func expectationsPreExecution(t *testing.T, mocks *mocksEtrogProcessorL1, ctx context.Context, trustedBatch *state.Batch, responseError error) {
mocks.State.EXPECT().GetL1InfoTreeDataFromBatchL2Data(ctx, mock.Anything, mocks.DbTx).Return(map[uint32]state.L1DataV2{}, state.ZeroHash, state.ZeroHash, nil)
mocks.State.EXPECT().GetBatchByNumber(ctx, trustedBatch.BatchNumber, mocks.DbTx).Return(trustedBatch, responseError)
}

func expectationsAddSequencedBatch(t *testing.T, mocks *mocksEtrogProcessorL1, ctx context.Context, response *executor.ProcessBatchResponseV2) {
mocks.State.EXPECT().AddVirtualBatch(ctx, mock.Anything, mocks.DbTx).Return(nil)
mocks.State.EXPECT().AddSequence(ctx, state.Sequence{FromBatchNumber: 3, ToBatchNumber: 3}, mocks.DbTx).Return(nil)
}

func expectationsProcessAndStoreClosedBatchV2(t *testing.T, mocks *mocksEtrogProcessorL1, ctx context.Context, response *executor.ProcessBatchResponseV2, responseError error) {
newStateRoot := common.BytesToHash(response.NewStateRoot)
mocks.State.EXPECT().ProcessAndStoreClosedBatchV2(ctx, mock.Anything, mocks.DbTx, mock.Anything).Return(newStateRoot, response.FlushId, response.ProverId, responseError)
}

func expectationsForExecution(t *testing.T, mocks *mocksEtrogProcessorL1, ctx context.Context, sequencedBatch etherman.SequencedBatch, response *executor.ProcessBatchResponseV2) {
mocks.State.EXPECT().ExecuteBatchV2(ctx,
mock.Anything, *sequencedBatch.L1InfoRoot, mock.Anything, mock.Anything, false,
mock.Anything, mock.Anything, mocks.DbTx).Return(response, nil)
}

func newProcessBatchResponseV2(batch *state.Batch) *executor.ProcessBatchResponseV2 {
return &executor.ProcessBatchResponseV2{
NewBatchNum: batch.BatchNumber,
NewAccInputHash: batch.AccInputHash[:],
NewStateRoot: batch.StateRoot[:],
FlushId: uint64(1234),
ProverId: "prover-id",
}
}

func newStateBatch(number uint64) *state.Batch {
return &state.Batch{
BatchNumber: number,
StateRoot: common.HexToHash(hashExamplesValues[3]),
Coinbase: common.HexToAddress(addrExampleValues[0]),
}
}

func newL1Block(mocks *mocksEtrogProcessorL1, batch *state.Batch, l1InfoRoot common.Hash) *etherman.Block {
l1Block := etherman.Block{
BlockNumber: 123,
ReceivedAt: mocks.TimeProvider.Now(),
SequencedBatches: [][]etherman.SequencedBatch{},
}
l1InfoRoot := common.HexToHash(hashExamplesValues[0])
//l1InfoRoot := common.HexToHash(hashExamplesValues[0])
joanestebanr marked this conversation as resolved.
Show resolved Hide resolved
l1Block.SequencedBatches = append(l1Block.SequencedBatches, []etherman.SequencedBatch{})
l1Block.SequencedBatches = append(l1Block.SequencedBatches, []etherman.SequencedBatch{
{
BatchNumber: 3,
BatchNumber: batch.BatchNumber,
L1InfoRoot: &l1InfoRoot,
TxHash: common.HexToHash(hashExamplesValues[1]),
Coinbase: common.HexToAddress(addrExampleValues[0]),
SequencerAddr: common.HexToAddress(addrExampleValues[1]),
TxHash: state.HashByteArray(batch.BatchL2Data),
Coinbase: batch.Coinbase,
SequencerAddr: common.HexToAddress(addrExampleValues[0]),
PolygonRollupBaseEtrogBatchData: &polygonzkevm.PolygonRollupBaseEtrogBatchData{
Transactions: []byte{},
},
},
})
mocks.State.EXPECT().GetL1InfoTreeDataFromBatchL2Data(ctx, mock.Anything, mocks.DbTx).Return(map[uint32]state.L1DataV2{}, state.ZeroHash, state.ZeroHash, nil)
mocks.State.EXPECT().GetBatchByNumber(ctx, uint64(3), mocks.DbTx).Return(nil, state.ErrNotFound)
mocks.Synchronizer.EXPECT().PendingFlushID(mock.Anything, mock.Anything)
mocks.State.EXPECT().AddVirtualBatch(ctx, mock.Anything, mocks.DbTx).Return(nil)
mocks.State.EXPECT().AddSequence(ctx, mock.Anything, mocks.DbTx).Return(nil)
newStateRoot := common.HexToHash(hashExamplesValues[2])
flushID := uint64(1234)
proverID := "prover-id"
mocks.State.EXPECT().ProcessAndStoreClosedBatchV2(ctx, mock.Anything, mocks.DbTx, mock.Anything).Return(newStateRoot, flushID, proverID, nil)
err := sut.Process(ctx, etherman.Order{Pos: 1}, &l1Block, mocks.DbTx)
require.NoError(t, err)
return &l1Block
}

// https://stackoverflow.com/questions/31595791/how-to-test-panics
func assertPanic(t *testing.T, f func()) {
defer func() {
if r := recover(); r == nil {
t.Errorf("The code did not panic")
}
}()
f()
}
2 changes: 1 addition & 1 deletion synchronizer/default_l1processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func defaultsL1EventProcessors(sync *ClientSynchronizer) *processor_manager.L1Ev
p.Register(incaberry.NewProcessL1SequenceForcedBatches(sync.state, sync))
p.Register(incaberry.NewProcessorForkId(sync.state, sync))
p.Register(etrog.NewProcessorL1InfoTreeUpdate(sync.state))
p.Register(etrog.NewProcessorL1SequenceBatches(sync.state, sync.etherMan, sync.pool, sync, common.DefaultTimeProvider{}, sync.halter))
p.Register(etrog.NewProcessorL1SequenceBatches(sync.state, sync, common.DefaultTimeProvider{}, sync.halter))
p.Register(incaberry.NewProcessorL1VerifyBatch(sync.state))
p.Register(etrog.NewProcessorL1UpdateEtrogSequence(sync.state, sync, common.DefaultTimeProvider{}))
return p.Build()
Expand Down