Skip to content

Commit

Permalink
fix pool pre-execution nonce (#1704)
Browse files Browse the repository at this point in the history
* fix pool pre-execution nonce

* fix test

* fix query

* update prover image

* undo prover image

* undo prover image

* undo prover image

* undo prover image

* fix test
  • Loading branch information
ToniRamirezM authored Feb 27, 2023
1 parent 99ade28 commit 8b3ce02
Show file tree
Hide file tree
Showing 12 changed files with 145 additions and 52 deletions.
7 changes: 7 additions & 0 deletions db/migrations/pool/0002.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- +migrate Up
ALTER TABLE pool.transaction
ADD COLUMN is_wip BOOLEAN;

-- +migrate Down
ALTER TABLE pool.transaction
DROP COLUMN is_wip;
4 changes: 3 additions & 1 deletion pool/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ type storage interface {
GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error)
GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]Transaction, error)
GetTxsByStatus(ctx context.Context, state TxStatus, isClaims bool, limit uint64) ([]Transaction, error)
GetNonWIPTxsByStatus(ctx context.Context, status TxStatus, isClaims bool, limit uint64) ([]Transaction, error)
IsTxPending(ctx context.Context, hash common.Hash) (bool, error)
SetGasPrice(ctx context.Context, gasPrice uint64) error
UpdateTxsStatus(ctx context.Context, hashes []string, newStatus TxStatus) error
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus TxStatus) error
UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus TxStatus, isWIP bool) error
UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error
GetTxs(ctx context.Context, filterStatus TxStatus, isClaims bool, minGasPrice, limit uint64) ([]*Transaction, error)
GetTxFromAddressFromByHash(ctx context.Context, hash common.Hash) (common.Address, uint64, error)
GetTxByHash(ctx context.Context, hash common.Hash) (*Transaction, error)
Expand Down
95 changes: 74 additions & 21 deletions pool/pgpoolstorage/pgpoolstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
used_binaries,
used_steps,
received_at,
from_address
from_address,
is_wip
)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17)
($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18)
`

// Get FromAddress from the JSON data
Expand All @@ -106,7 +107,8 @@ func (p *PostgresPoolStorage) AddTx(ctx context.Context, tx pool.Transaction) er
tx.UsedBinaries,
tx.UsedSteps,
tx.ReceivedAt,
fromAddress); err != nil {
fromAddress,
tx.IsWIP); err != nil {
return err
}
return nil
Expand All @@ -122,10 +124,43 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
sql string
)
if limit == 0 {
sql = "SELECT encoded, status, received_at FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC"
sql = "SELECT encoded, status, received_at, is_wip FROM pool.transaction WHERE status = $1 ORDER BY gas_price DESC"
rows, err = p.db.Query(ctx, sql, status.String())
} else {
sql = "SELECT encoded, status, received_at FROM pool.transaction WHERE status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
sql = "SELECT encoded, status, received_at, is_wip FROM pool.transaction WHERE status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
rows, err = p.db.Query(ctx, sql, status.String(), isClaims, limit)
}
if err != nil {
return nil, err
}
defer rows.Close()

txs := make([]pool.Transaction, 0, len(rows.RawValues()))
for rows.Next() {
tx, err := scanTx(rows)
if err != nil {
return nil, err
}
txs = append(txs, *tx)
}

return txs, nil
}

// GetNonWIPTxsByStatus returns an array of transactions filtered by status
// limit parameter is used to limit amount txs from the db,
// if limit = 0, then there is no limit
func (p *PostgresPoolStorage) GetNonWIPTxsByStatus(ctx context.Context, status pool.TxStatus, isClaims bool, limit uint64) ([]pool.Transaction, error) {
var (
rows pgx.Rows
err error
sql string
)
if limit == 0 {
sql = "SELECT encoded, status, received_at, is_wip FROM pool.transaction WHERE is_wip IS FALSE and status = $1 ORDER BY gas_price DESC"
rows, err = p.db.Query(ctx, sql, status.String())
} else {
sql = "SELECT encoded, status, received_at, is_wip FROM pool.transaction WHERE is_wip IS FALSE and status = $1 AND is_claims = $2 ORDER BY gas_price DESC LIMIT $3"
rows, err = p.db.Query(ctx, sql, status.String(), isClaims, limit)
}
if err != nil {
Expand All @@ -146,10 +181,9 @@ func (p *PostgresPoolStorage) GetTxsByStatus(ctx context.Context, status pool.Tx
}

// GetPendingTxHashesSince returns the pending tx since the given time.
// It takes also into account the WIP txs
func (p *PostgresPoolStorage) GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]common.Hash, error) {
sql := "SELECT hash FROM pool.transaction WHERE status IN ($1, $2) AND received_at >= $3"
rows, err := p.db.Query(ctx, sql, pool.TxStatusPending, pool.TxStatusWIP, since)
sql := "SELECT hash FROM pool.transaction WHERE status = $1 AND received_at >= $2"
rows, err := p.db.Query(ctx, sql, pool.TxStatusPending, since)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -183,7 +217,8 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
used_steps,
received_at,
nonce,
failed_counter
failed_counter,
is_wip
FROM
pool.transaction p1
WHERE
Expand Down Expand Up @@ -211,7 +246,8 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
used_steps,
received_at,
nonce,
failed_counter
failed_counter,
is_wip
FROM
pool.transaction p1
WHERE
Expand All @@ -234,6 +270,7 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
usedKeccakHashes, usedPoseidonHashes, usedPoseidonPaddings,
usedMemAligns, usedArithmetics, usedBinaries, usedSteps uint32
nonce, failedCounter uint64
isWIP bool
)

args := []interface{}{filterStatus, minGasPrice, isClaims, limit}
Expand Down Expand Up @@ -261,6 +298,7 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
&receivedAt,
&nonce,
&failedCounter,
&isWIP,
)

if err != nil {
Expand Down Expand Up @@ -288,6 +326,7 @@ func (p *PostgresPoolStorage) GetTxs(ctx context.Context, filterStatus pool.TxSt
UsedSteps: usedSteps,
}
tx.FailedCounter = failedCounter
tx.IsWIP = isWIP

txs = append(txs, tx)
}
Expand All @@ -309,9 +348,9 @@ func (p *PostgresPoolStorage) CountTransactionsByStatus(ctx context.Context, sta

// UpdateTxStatus updates a transaction status accordingly to the
// provided status and hash
func (p *PostgresPoolStorage) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus) error {
sql := "UPDATE pool.transaction SET status = $1 WHERE hash = $2"
if _, err := p.db.Exec(ctx, sql, newStatus, hash.Hex()); err != nil {
func (p *PostgresPoolStorage) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool) error {
sql := "UPDATE pool.transaction SET status = $1, is_wip = $2 WHERE hash = $3"
if _, err := p.db.Exec(ctx, sql, newStatus, isWIP, hash.Hex()); err != nil {
return err
}
return nil
Expand Down Expand Up @@ -388,7 +427,7 @@ func (p *PostgresPoolStorage) IsTxPending(ctx context.Context, hash common.Hash)

// GetTxsByFromAndNonce get all the transactions from the pool with the same from and nonce
func (p *PostgresPoolStorage) GetTxsByFromAndNonce(ctx context.Context, from common.Address, nonce uint64) ([]pool.Transaction, error) {
sql := `SELECT encoded, status, received_at
sql := `SELECT encoded, status, received_at, is_wip
FROM pool.transaction
WHERE from_address = $1
AND nonce = $2`
Expand Down Expand Up @@ -445,7 +484,7 @@ func (p *PostgresPoolStorage) GetNonce(ctx context.Context, address common.Addre
sql := `SELECT MAX(nonce)
FROM pool.transaction
WHERE from_address = $1
AND (status = $2 OR status = $3)`
AND status IN ($2, $3)`
rows, err := p.db.Query(ctx, sql, address.String(), pool.TxStatusPending, pool.TxStatusSelected)
if errors.Is(err, pgx.ErrNoRows) {
return 0, nil
Expand Down Expand Up @@ -480,12 +519,13 @@ func (p *PostgresPoolStorage) GetTxByHash(ctx context.Context, hash common.Hash)
var (
encoded, status string
receivedAt time.Time
isWIP bool
)

sql := `SELECT encoded, status, received_at
sql := `SELECT encoded, status, received_at, is_wip
FROM pool.transaction
WHERE hash = $1`
err := p.db.QueryRow(ctx, sql, hash.String()).Scan(&encoded, &status, &receivedAt)
err := p.db.QueryRow(ctx, sql, hash.String()).Scan(&encoded, &status, &receivedAt, &isWIP)
if errors.Is(err, pgx.ErrNoRows) {
return nil, ErrNotFound
} else if err != nil {
Expand All @@ -506,6 +546,7 @@ func (p *PostgresPoolStorage) GetTxByHash(ctx context.Context, hash common.Hash)
ReceivedAt: receivedAt,
Status: pool.TxStatus(status),
Transaction: *tx,
IsWIP: isWIP,
}

return poolTx, nil
Expand All @@ -515,9 +556,10 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) {
var (
encoded, status string
receivedAt time.Time
isWIP bool
)

if err := rows.Scan(&encoded, &status, &receivedAt); err != nil {
if err := rows.Scan(&encoded, &status, &receivedAt, &isWIP); err != nil {
return nil, err
}

Expand All @@ -534,6 +576,7 @@ func scanTx(rows pgx.Rows) (*pool.Transaction, error) {

tx.Status = pool.TxStatus(status)
tx.ReceivedAt = receivedAt
tx.IsWIP = isWIP

return tx, nil
}
Expand Down Expand Up @@ -565,10 +608,20 @@ func (p *PostgresPoolStorage) GetTxZkCountersByHash(ctx context.Context, hash co
return &zkCounters, nil
}

// MarkWIPTxsAsPending updates WIP txs status to pending
// MarkWIPTxsAsPending updates WIP status to non WIP
func (p *PostgresPoolStorage) MarkWIPTxsAsPending(ctx context.Context) error {
const query = `UPDATE pool.transaction SET status = $1 WHERE status = $2`
if _, err := p.db.Exec(ctx, query, pool.TxStatusPending, pool.TxStatusWIP); err != nil {
const query = `UPDATE pool.transaction SET is_wip = false WHERE is_wip = true`
if _, err := p.db.Exec(ctx, query); err != nil {
return err
}
return nil
}

// UpdateTxWIPStatus updates a transaction wip status accordingly to the
// provided WIP status and hash
func (p *PostgresPoolStorage) UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error {
sql := "UPDATE pool.transaction SET is_wip = $1 WHERE hash = $2"
if _, err := p.db.Exec(ctx, sql, isWIP, hash.Hex()); err != nil {
return err
}
return nil
Expand Down
20 changes: 18 additions & 2 deletions pool/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/big"
"time"

"github.com/0xPolygonHermez/zkevm-node/log"
"github.com/0xPolygonHermez/zkevm-node/state"
"github.com/0xPolygonHermez/zkevm-node/state/runtime/executor"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -73,6 +74,7 @@ func (p *Pool) AddTx(ctx context.Context, tx types.Transaction) error {
Status: TxStatusPending,
IsClaims: false,
ReceivedAt: time.Now(),
IsWIP: false,
}

poolTx.IsClaims = poolTx.IsClaimTx(p.l2BridgeAddr, p.cfg.FreeClaimGasLimit)
Expand All @@ -99,6 +101,7 @@ func (p *Pool) PreExecuteTx(ctx context.Context, tx types.Transaction) (state.ZK

nonce, err := p.storage.GetNonce(ctx, sender)
if err != nil {
log.Errorf("Error getting nonce for sender %s: %v", sender.Hex(), err)
return state.ZKCounters{}, err
}

Expand All @@ -116,6 +119,13 @@ func (p *Pool) GetPendingTxs(ctx context.Context, isClaims bool, limit uint64) (
return p.storage.GetTxsByStatus(ctx, TxStatusPending, isClaims, limit)
}

// GetNonWIPPendingTxs from the pool
// limit parameter is used to limit amount of pending txs from the db,
// if limit = 0, then there is no limit
func (p *Pool) GetNonWIPPendingTxs(ctx context.Context, isClaims bool, limit uint64) ([]Transaction, error) {
return p.storage.GetNonWIPTxsByStatus(ctx, TxStatusPending, isClaims, limit)
}

// GetSelectedTxs gets selected txs from the pool db
func (p *Pool) GetSelectedTxs(ctx context.Context, limit uint64) ([]Transaction, error) {
return p.storage.GetTxsByStatus(ctx, TxStatusSelected, false, limit)
Expand All @@ -128,8 +138,8 @@ func (p *Pool) GetPendingTxHashesSince(ctx context.Context, since time.Time) ([]

// UpdateTxStatus updates a transaction state accordingly to the
// provided state and hash
func (p *Pool) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus TxStatus) error {
return p.storage.UpdateTxStatus(ctx, hash, newStatus)
func (p *Pool) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus TxStatus, isWIP bool) error {
return p.storage.UpdateTxStatus(ctx, hash, newStatus, isWIP)
}

// SetGasPrice allows an external component to define the gas price
Expand Down Expand Up @@ -296,6 +306,12 @@ func (p *Pool) DeleteReorgedTransactions(ctx context.Context, transactions []*ty
return p.storage.DeleteTransactionsByHashes(ctx, hashes)
}

// UpdateTxWIPStatus updates a transaction wip status accordingly to the
// provided WIP status and hash
func (p *Pool) UpdateTxWIPStatus(ctx context.Context, hash common.Hash, isWIP bool) error {
return p.storage.UpdateTxWIPStatus(ctx, hash, isWIP)
}

const (
txDataNonZeroGas uint64 = 16
txGasContractCreation uint64 = 53000
Expand Down
2 changes: 1 addition & 1 deletion pool/pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -618,7 +618,7 @@ func Test_UpdateTxStatus(t *testing.T) {
t.Error(err)
}

err = p.UpdateTxStatus(ctx, signedTx.Hash(), pool.TxStatusInvalid)
err = p.UpdateTxStatus(ctx, signedTx.Hash(), pool.TxStatusInvalid, false)
if err != nil {
t.Error(err)
}
Expand Down
3 changes: 1 addition & 2 deletions pool/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ const (
TxStatusSelected TxStatus = "selected"
// TxStatusFailed represents a tx that has been failed after processing, but can be processed in the future
TxStatusFailed TxStatus = "failed"
// TxStatusWIP represents a tx that is in a sequencer worker memory
TxStatusWIP TxStatus = "wip"
)

// TxStatus represents the state of a tx
Expand All @@ -39,6 +37,7 @@ type Transaction struct {
FailedCounter uint64
ReceivedAt time.Time
PreprocessedStateRoot common.Hash
IsWIP bool
}

// IsClaimTx checks, if tx is a claim tx
Expand Down
12 changes: 6 additions & 6 deletions sequencer/dbmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (d *dbManager) loadFromPool() {
// TODO: Move this to a config parameter
time.Sleep(wait * time.Second)

poolTransactions, err := d.txPool.GetPendingTxs(d.ctx, false, 0)
poolTransactions, err := d.txPool.GetNonWIPPendingTxs(d.ctx, false, 0)
if err != nil && err != pgpoolstorage.ErrNotFound {
log.Errorf("load tx from pool: %v", err)
}
Expand All @@ -119,7 +119,7 @@ func (d *dbManager) loadFromPool() {
}
}

poolClaims, err := d.txPool.GetPendingTxs(d.ctx, true, 0)
poolClaims, err := d.txPool.GetNonWIPPendingTxs(d.ctx, true, 0)
if err != nil && err != pgpoolstorage.ErrNotFound {
log.Errorf("load claims from pool: %v", err)
}
Expand All @@ -139,7 +139,7 @@ func (d *dbManager) addTxToWorker(tx pool.Transaction, isClaim bool) error {
return err
}
d.worker.AddTx(d.ctx, txTracker)
return d.txPool.UpdateTxStatus(d.ctx, tx.Hash(), pool.TxStatusWIP)
return d.txPool.UpdateTxWIPStatus(d.ctx, tx.Hash(), true)
}

// BeginStateTransaction starts a db transaction in the state
Expand Down Expand Up @@ -223,7 +223,7 @@ func (d *dbManager) storeProcessedTxAndDeleteFromPool() {
}

// Change Tx status to selected
err = d.txPool.UpdateTxStatus(d.ctx, txToStore.txResponse.TxHash, pool.TxStatusSelected)
err = d.txPool.UpdateTxStatus(d.ctx, txToStore.txResponse.TxHash, pool.TxStatusSelected, false)
if err != nil {
err = dbTx.Rollback(d.ctx)
if err != nil {
Expand Down Expand Up @@ -561,8 +561,8 @@ func (d *dbManager) GetTransactionsByBatchNumber(ctx context.Context, batchNumbe
return d.state.GetTransactionsByBatchNumber(ctx, batchNumber, nil)
}

func (d *dbManager) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus) error {
return d.txPool.UpdateTxStatus(ctx, hash, newStatus)
func (d *dbManager) UpdateTxStatus(ctx context.Context, hash common.Hash, newStatus pool.TxStatus, isWIP bool) error {
return d.txPool.UpdateTxStatus(ctx, hash, newStatus, isWIP)
}

// GetLatestVirtualBatchTimestamp gets last virtual batch timestamp
Expand Down
Loading

0 comments on commit 8b3ce02

Please sign in to comment.