Skip to content

Commit

Permalink
Extract sequence tracking from the Broadcaster (#12353)
Browse files Browse the repository at this point in the history
* Extracted sequence tracking from the Broadcaster into a separate component

* Fixed test and linting

* Updated NonceTracker to use TXM client

* Fixed tests

* Moved NonceTracker initialization into the EVM Broadcaster builder

* Fixed issue with in-progress tx during startup

* Fixed linting

* Added changeset

* Updated changeset message

---------

Co-authored-by: Prashant Yadav <34992934+prashantkumar1982@users.noreply.github.com>
  • Loading branch information
amit-momin and prashantkumar1982 committed Mar 19, 2024
1 parent 331c1cb commit 07c9f6c
Show file tree
Hide file tree
Showing 17 changed files with 625 additions and 739 deletions.
5 changes: 5 additions & 0 deletions .changeset/silver-months-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

Fixed a race condition bug around EVM nonce management, which could cause the Node to skip a nonce and get stuck.
172 changes: 14 additions & 158 deletions common/txmgr/broadcaster.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"database/sql"
"errors"
"fmt"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -112,13 +111,13 @@ type Broadcaster[
txStore txmgrtypes.TransactionStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, SEQ, FEE]
client txmgrtypes.TransactionClient[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
config txmgrtypes.BroadcasterChainConfig
feeConfig txmgrtypes.BroadcasterFeeConfig
txConfig txmgrtypes.BroadcasterTransactionsConfig
listenerConfig txmgrtypes.BroadcasterListenerConfig
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ]
resumeCallback ResumeCallback
chainID CHAIN_ID
config txmgrtypes.BroadcasterChainConfig
feeConfig txmgrtypes.BroadcasterFeeConfig
txConfig txmgrtypes.BroadcasterTransactionsConfig
listenerConfig txmgrtypes.BroadcasterListenerConfig

// autoSyncSequence, if set, will cause Broadcaster to fast-forward the sequence
// when Start is called
Expand All @@ -141,10 +140,6 @@ type Broadcaster[

initSync sync.Mutex
isStarted bool

sequenceLock sync.RWMutex
nextSequenceMap map[ADDR]SEQ
generateNextSequence types.GenerateNextSequenceFunc[SEQ]
}

func NewBroadcaster[
Expand All @@ -164,19 +159,17 @@ func NewBroadcaster[
listenerConfig txmgrtypes.BroadcasterListenerConfig,
keystore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
sequenceTracker txmgrtypes.SequenceTracker[ADDR, SEQ],
lggr logger.Logger,
checkerFactory TransmitCheckerFactory[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
autoSyncSequence bool,
generateNextSequence types.GenerateNextSequenceFunc[SEQ],
) *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] {
lggr = logger.Named(lggr, "Broadcaster")
b := &Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]{
lggr: logger.Sugared(lggr),
txStore: txStore,
client: client,
TxAttemptBuilder: txAttemptBuilder,
sequenceSyncer: sequenceSyncer,
chainID: client.ConfiguredChainID(),
config: config,
feeConfig: feeConfig,
Expand All @@ -185,10 +178,10 @@ func NewBroadcaster[
ks: keystore,
checkerFactory: checkerFactory,
autoSyncSequence: autoSyncSequence,
sequenceTracker: sequenceTracker,
}

b.processUnstartedTxsImpl = b.processUnstartedTxs
b.generateNextSequence = generateNextSequence
return b
}

Expand Down Expand Up @@ -222,9 +215,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) star
eb.wg = sync.WaitGroup{}
eb.wg.Add(len(eb.enabledAddresses))
eb.triggers = make(map[ADDR]chan struct{})
eb.sequenceLock.Lock()
eb.nextSequenceMap = eb.loadNextSequenceMap(ctx, eb.enabledAddresses)
eb.sequenceLock.Unlock()
eb.sequenceTracker.LoadNextSequences(ctx, eb.enabledAddresses)
for _, addr := range eb.enabledAddresses {
triggerCh := make(chan struct{}, 1)
eb.triggers[addr] = triggerCh
Expand Down Expand Up @@ -284,46 +275,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) Trig
}
}

// Load the next sequence map using the tx table or on-chain (if not found in tx table)
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) loadNextSequenceMap(ctx context.Context, addresses []ADDR) map[ADDR]SEQ {
nextSequenceMap := make(map[ADDR]SEQ)
for _, address := range addresses {
seq, err := eb.getSequenceForAddr(ctx, address)
if err == nil {
nextSequenceMap[address] = seq
}
}

return nextSequenceMap
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) getSequenceForAddr(ctx context.Context, address ADDR) (seq SEQ, err error) {
// Get the highest sequence from the tx table
// Will need to be incremented since this sequence is already used
seq, err = eb.txStore.FindLatestSequence(ctx, address, eb.chainID)
if err == nil {
seq = eb.generateNextSequence(seq)
return seq, nil
}
// Look for nonce on-chain if no tx found for address in TxStore or if error occurred
// Returns the nonce that should be used for the next transaction so no need to increment
seq, err = eb.client.PendingSequenceAt(ctx, address)
if err == nil {
return seq, nil
}
eb.lggr.Criticalw("failed to retrieve next sequence from on-chain for address: ", "address", address.String())
return seq, err

}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newSequenceSyncBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 100 * time.Millisecond,
Max: 5 * time.Second,
Jitter: true,
}
}

func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) newResendBackoff() backoff.Backoff {
return backoff.Backoff{
Min: 1 * time.Second,
Expand All @@ -340,7 +291,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni

if eb.autoSyncSequence {
eb.lggr.Debugw("Auto-syncing sequence", "address", addr.String())
eb.SyncSequence(ctx, addr)
eb.sequenceTracker.SyncSequence(ctx, addr, eb.chStop)
if ctx.Err() != nil {
return
}
Expand Down Expand Up @@ -393,46 +344,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) moni
}
}

// syncSequence tries to sync the key sequence, retrying indefinitely until success or stop signal is sent
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SyncSequence(ctx context.Context, addr ADDR) {
sequenceSyncRetryBackoff := eb.newSequenceSyncBackoff()
localSequence, err := eb.GetNextSequence(ctx, addr)
// Address not found in map so skip sync
if err != nil {
eb.lggr.Criticalw("Failed to retrieve local next sequence for address", "address", addr.String(), "err", err)
return
}

// Enter loop with retries
var attempt int
for {
select {
case <-eb.chStop:
return
case <-time.After(sequenceSyncRetryBackoff.Duration()):
attempt++
newNextSequence, err := eb.sequenceSyncer.Sync(ctx, addr, localSequence)
if err != nil {
if attempt > 5 {
eb.lggr.Criticalw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
eb.SvcErrBuffer.Append(err)
} else {
eb.lggr.Warnw("Failed to sync with on-chain sequence", "address", addr.String(), "attempt", attempt, "err", err)
}
continue
}
// Found new sequence to use from on-chain
if localSequence.String() != newNextSequence.String() {
eb.lggr.Infow("Fast-forward sequence", "address", addr, "newNextSequence", newNextSequence, "oldNextSequence", localSequence)
// Set new sequence in the map
eb.SetNextSequence(addr, newNextSequence)
}
return

}
}
}

// ProcessUnstartedTxs picks up and handles all txes in the queue
// revive:disable:error-return
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) ProcessUnstartedTxs(ctx context.Context, addr ADDR) (retryable bool, err error) {
Expand Down Expand Up @@ -619,18 +530,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// and hand off to the confirmer to get the receipt (or mark as
// failed).
observeTimeUntilBroadcast(eb.chainID, etx.CreatedAt, time.Now())
// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
case client.Underpriced:
return eb.tryAgainBumpingGas(ctx, lgr, err, etx, attempt, initialBroadcastAt)
Expand Down Expand Up @@ -677,18 +582,12 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) hand
// transaction to have been accepted. In this case, the right thing to
// do is assume success and hand off to Confirmer

// Check if from_address exists in map to ensure it is valid before broadcasting
var sequence SEQ
sequence, err = eb.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return err, true
}
err = eb.txStore.UpdateTxAttemptInProgressToBroadcast(ctx, &etx, attempt, txmgrtypes.TxAttemptBroadcast)
if err != nil {
return err, true
}
// Increment sequence if successfully broadcasted
eb.IncrementNextSequence(etx.FromAddress, sequence)
eb.sequenceTracker.GenerateNextSequence(etx.FromAddress, *etx.Sequence)
return err, true
}
// Either the unknown error prevented the transaction from being mined, or
Expand Down Expand Up @@ -716,7 +615,7 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) next
return nil, fmt.Errorf("findNextUnstartedTransactionFromAddress failed: %w", err)
}

sequence, err := eb.GetNextSequence(ctx, etx.FromAddress)
sequence, err := eb.sequenceTracker.GetNextSequence(ctx, etx.FromAddress)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -805,49 +704,6 @@ func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) save
return eb.txStore.UpdateTxFatalError(ctx, etx)
}

// Used to get the next usable sequence for a transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) GetNextSequence(ctx context.Context, address ADDR) (seq SEQ, err error) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
// Get next sequence from map
seq, exists := eb.nextSequenceMap[address]
if exists {
return seq, nil
}

eb.lggr.Infow("address not found in local next sequence map. Attempting to search and populate sequence.", "address", address.String())
// Check if address is in the enabled address list
if !slices.Contains(eb.enabledAddresses, address) {
return seq, fmt.Errorf("address disabled: %s", address)
}

// Try to retrieve next sequence from tx table or on-chain to load the map
// A scenario could exist where loading the map during startup failed (e.g. All configured RPC's are unreachable at start)
// The expectation is that the node does not fail startup so sequences need to be loaded during runtime
foundSeq, err := eb.getSequenceForAddr(ctx, address)
if err != nil {
return seq, fmt.Errorf("failed to find next sequence for address: %s", address)
}

// Set sequence in map
eb.nextSequenceMap[address] = foundSeq
return foundSeq, nil
}

// Used to increment the sequence in the mapping to have the next usable one available for the next transaction
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) IncrementNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = eb.generateNextSequence(seq)
}

// Used to set the next sequence explicitly to a certain value
func (eb *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) SetNextSequence(address ADDR, seq SEQ) {
eb.sequenceLock.Lock()
defer eb.sequenceLock.Unlock()
eb.nextSequenceMap[address] = seq
}

func observeTimeUntilBroadcast[CHAIN_ID types.ID](chainID CHAIN_ID, createdAt, broadcastAt time.Time) {
duration := float64(broadcastAt.Sub(createdAt))
promTimeUntilBroadcast.WithLabelValues(chainID.String()).Observe(duration)
Expand Down
11 changes: 0 additions & 11 deletions common/txmgr/sequence_syncer.go

This file was deleted.

3 changes: 0 additions & 3 deletions common/txmgr/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ type Txm[
tracker *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]
fwdMgr txmgrtypes.ForwarderManager[ADDR]
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ]
}

func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) RegisterResumeCallback(fn ResumeCallback) {
Expand Down Expand Up @@ -136,7 +135,6 @@ func NewTxm[
fwdMgr txmgrtypes.ForwarderManager[ADDR],
txAttemptBuilder txmgrtypes.TxAttemptBuilder[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
sequenceSyncer SequenceSyncer[ADDR, TX_HASH, BLOCK_HASH, SEQ],
broadcaster *Broadcaster[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE],
confirmer *Confirmer[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
resender *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE],
Expand All @@ -157,7 +155,6 @@ func NewTxm[
reset: make(chan reset),
fwdMgr: fwdMgr,
txAttemptBuilder: txAttemptBuilder,
sequenceSyncer: sequenceSyncer,
broadcaster: broadcaster,
confirmer: confirmer,
resender: resender,
Expand Down
26 changes: 26 additions & 0 deletions common/txmgr/types/sequence_tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package types

import (
"context"

"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink/v2/common/types"
)

type SequenceTracker[
// Represents an account address, in native chain format.
ADDR types.Hashable,
// Represents the sequence type for a chain. For example, nonce for EVM.
SEQ types.Sequence,
] interface {
// Load the next sequence needed for transactions for all enabled addresses
LoadNextSequences(context.Context, []ADDR)
// Get the next sequence to assign to a transaction
GetNextSequence(context.Context, ADDR) (SEQ, error)
// Signals the existing sequence has been used so generates and stores the next sequence
// Can be a no-op depending on the chain
GenerateNextSequence(ADDR, SEQ)
// Syncs the local sequence with the one on-chain in case the address as been used externally
// Can be a no-op depending on the chain
SyncSequence(context.Context, ADDR, services.StopChan)
}
Loading

0 comments on commit 07c9f6c

Please sign in to comment.