diff --git a/.changeset/gold-bottles-tell.md b/.changeset/gold-bottles-tell.md new file mode 100644 index 00000000000..5289f368a55 --- /dev/null +++ b/.changeset/gold-bottles-tell.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#added : Re-enable abandoned transaction tracker diff --git a/common/txmgr/resender.go b/common/txmgr/resender.go index 8c2dd6b827e..b752ec63f13 100644 --- a/common/txmgr/resender.go +++ b/common/txmgr/resender.go @@ -140,9 +140,6 @@ func (er *Resender[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) resendUnco return fmt.Errorf("Resender failed getting enabled keys for chain %s: %w", er.chainID.String(), err) } - // Tracker currently disabled for BCI-2638; refactor required - // resendAddresses = append(resendAddresses, er.tracker.GetAbandonedAddresses()...) - ageThreshold := er.txConfig.ResendAfterThreshold() maxInFlightTransactions := er.txConfig.MaxInFlight() olderThan := time.Now().Add(-ageThreshold) diff --git a/common/txmgr/tracker.go b/common/txmgr/tracker.go index c63d9c264fc..a7236472710 100644 --- a/common/txmgr/tracker.go +++ b/common/txmgr/tracker.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" feetypes "github.com/smartcontractkit/chainlink/v2/common/fee/types" @@ -22,17 +23,10 @@ const ( // handleTxesTimeout represents a sanity limit on how long handleTxesByState // should take to complete handleTxesTimeout = 10 * time.Minute + // batchSize is the number of txes to fetch from the txStore at once + batchSize = 1000 ) -// AbandonedTx is a transaction who's 'FromAddress' was removed from the KeyStore(by the Node Operator). -// Thus, any new attempts for this Tx can't be signed/created. This means no fee bumping can be done. -// However, the Tx may still have live attempts in the chain's mempool, and could get confirmed on the -// chain as-is. Thus, the TXM should not directly discard this Tx. -type AbandonedTx[ADDR types.Hashable] struct { - id int64 - fromAddress ADDR -} - // Tracker tracks all transactions which have abandoned fromAddresses. // The fromAddresses can be deleted by Node Operators from the KeyStore. In such cases, // existing in-flight transactions for these fromAddresses are considered abandoned too. @@ -48,19 +42,22 @@ type Tracker[ FEE feetypes.Fee, ] struct { services.StateMachine - txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] - keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] - chainID CHAIN_ID - lggr logger.Logger - enabledAddrs map[ADDR]bool - txCache map[int64]AbandonedTx[ADDR] - ttl time.Duration + txStore txmgrtypes.TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE] + keyStore txmgrtypes.KeyStore[ADDR, CHAIN_ID, SEQ] + chainID CHAIN_ID + lggr logger.Logger + lock sync.Mutex - mb *mailbox.Mailbox[int64] - wg sync.WaitGroup - isStarted bool - ctx context.Context - ctxCancel context.CancelFunc + enabledAddrs map[ADDR]bool + txCache map[int64]ADDR // cache tx fromAddress by txID + + ttl time.Duration + mb *mailbox.Mailbox[int64] + + initSync sync.Mutex + wg sync.WaitGroup + chStop services.StopChan + isStarted bool } func NewTracker[ @@ -83,7 +80,7 @@ func NewTracker[ chainID: chainID, lggr: logger.Named(lggr, "TxMgrTracker"), enabledAddrs: map[ADDR]bool{}, - txCache: map[int64]AbandonedTx[ADDR]{}, + txCache: map[int64]ADDR{}, ttl: defaultTTL, mb: mailbox.NewSingle[int64](), lock: sync.Mutex{}, @@ -99,75 +96,84 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx c } func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) startInternal(ctx context.Context) (err error) { - tr.lock.Lock() - defer tr.lock.Unlock() - - tr.ctx, tr.ctxCancel = context.WithCancel(context.Background()) + tr.initSync.Lock() + defer tr.initSync.Unlock() if err := tr.setEnabledAddresses(ctx); err != nil { return fmt.Errorf("failed to set enabled addresses: %w", err) } - tr.lggr.Info("Enabled addresses set") + tr.lggr.Infof("enabled addresses set for chainID %v", tr.chainID) - if err := tr.trackAbandonedTxes(ctx); err != nil { - return fmt.Errorf("failed to track abandoned txes: %w", err) - } - - tr.isStarted = true - if len(tr.txCache) == 0 { - tr.lggr.Info("no abandoned txes found, skipping runLoop") - return nil - } - - tr.lggr.Infof("%d abandoned txes found, starting runLoop", len(tr.txCache)) + tr.chStop = make(chan struct{}) tr.wg.Add(1) - go tr.runLoop() + go tr.runLoop(tr.chStop.NewCtx()) + tr.isStarted = true return nil } func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() error { - tr.lock.Lock() - defer tr.lock.Unlock() return tr.StopOnce("Tracker", func() error { return tr.closeInternal() }) } func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) closeInternal() error { + tr.initSync.Lock() + defer tr.initSync.Unlock() + tr.lggr.Info("stopping tracker") if !tr.isStarted { - return fmt.Errorf("tracker not started") + return fmt.Errorf("tracker is not started: %w", services.ErrAlreadyStopped) } - tr.ctxCancel() + + close(tr.chStop) tr.wg.Wait() tr.isStarted = false return nil } -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() { +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop(ctx context.Context, cancel context.CancelFunc) { defer tr.wg.Done() + defer cancel() + + if err := tr.trackAbandonedTxes(ctx); err != nil { + tr.lggr.Errorf("failed to track abandoned txes: %v", err) + return + } + if err := tr.handleTxesByState(ctx); err != nil { + tr.lggr.Errorf("failed to handle txes by state: %v", err) + return + } + if tr.AbandonedTxCount() == 0 { + tr.lggr.Info("no abandoned txes found, skipping runLoop") + return + } + tr.lggr.Infof("%d abandoned txes found, starting runLoop", tr.AbandonedTxCount()) + ttlExceeded := time.NewTicker(tr.ttl) defer ttlExceeded.Stop() for { select { case <-tr.mb.Notify(): for { - if tr.ctx.Err() != nil { - return - } - blockHeight, exists := tr.mb.Retrieve() - if !exists { + blockHeight := tr.mb.RetrieveLatestAndClear() + if blockHeight == 0 { break } - if err := tr.HandleTxesByState(tr.ctx, blockHeight); err != nil { - tr.lggr.Errorw(fmt.Errorf("failed to handle txes by state: %w", err).Error()) + if err := tr.handleTxesByState(ctx); err != nil { + tr.lggr.Errorf("failed to handle txes by state: %v", err) + return + } + if tr.AbandonedTxCount() == 0 { + tr.lggr.Info("all abandoned txes handled, stopping runLoop") + return } } case <-ttlExceeded.C: tr.lggr.Info("ttl exceeded") - tr.MarkAllTxesFatal(tr.ctx) + tr.markAllTxesFatal(ctx) return - case <-tr.ctx.Done(): + case <-ctx.Done(): return } } @@ -177,24 +183,31 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetAbandone tr.lock.Lock() defer tr.lock.Unlock() - if !tr.isStarted { - return []ADDR{} - } - abandonedAddrs := make([]ADDR, len(tr.txCache)) - for _, atx := range tr.txCache { - abandonedAddrs = append(abandonedAddrs, atx.fromAddress) + for _, fromAddress := range tr.txCache { + abandonedAddrs = append(abandonedAddrs, fromAddress) } return abandonedAddrs } -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsStarted() bool { +// AbandonedTxCount returns the number of abandoned txes currently being tracked +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) AbandonedTxCount() int { tr.lock.Lock() defer tr.lock.Unlock() + return len(tr.txCache) +} + +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) IsStarted() bool { + tr.initSync.Lock() + defer tr.initSync.Unlock() return tr.isStarted } +// setEnabledAddresses is called on startup to set the enabled addresses for the chain func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledAddresses(ctx context.Context) error { + tr.lock.Lock() + defer tr.lock.Unlock() + enabledAddrs, err := tr.keyStore.EnabledAddressesForChain(ctx, tr.chainID) if err != nil { return fmt.Errorf("failed to get enabled addresses for chain: %w", err) @@ -210,54 +223,58 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) setEnabledA return nil } -// trackAbandonedTxes called once to find and insert all abandoned txes into the tracker. +// trackAbandonedTxes called on startup to find and insert all abandoned txes into the tracker. func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) trackAbandonedTxes(ctx context.Context) (err error) { - if tr.isStarted { - return fmt.Errorf("tracker already started") - } - - tr.lggr.Info("Retrieving non fatal transactions from txStore") - nonFatalTxes, err := tr.txStore.GetNonFatalTransactions(ctx, tr.chainID) - if err != nil { - return fmt.Errorf("failed to get non fatal txes from txStore: %w", err) - } - - // insert abandoned txes - for _, tx := range nonFatalTxes { - if !tr.enabledAddrs[tx.FromAddress] { - tr.insertTx(tx) + return sqlutil.Batch(func(offset, limit uint) (count uint, err error) { + var enabledAddrs []ADDR + for addr := range tr.enabledAddrs { + enabledAddrs = append(enabledAddrs, addr) } - } - if err := tr.handleTxesByState(ctx, 0); err != nil { - return fmt.Errorf("failed to handle txes by state: %w", err) - } - - return nil + nonFatalTxes, err := tr.txStore.GetAbandonedTransactionsByBatch(ctx, tr.chainID, enabledAddrs, offset, limit) + if err != nil { + return 0, fmt.Errorf("failed to get non fatal txes from txStore: %w", err) + } + // insert abandoned txes + tr.lock.Lock() + for _, tx := range nonFatalTxes { + if !tr.enabledAddrs[tx.FromAddress] { + tr.txCache[tx.ID] = tx.FromAddress + tr.lggr.Debugf("inserted tx %v", tx.ID) + } + } + tr.lock.Unlock() + return uint(len(nonFatalTxes)), nil + }, batchSize) } -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) HandleTxesByState(ctx context.Context, blockHeight int64) error { +// handleTxesByState handles all txes in the txCache by their state +// It's called on every new blockHeight and also on startup to handle all txes in the txCache +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesByState(ctx context.Context) error { tr.lock.Lock() defer tr.lock.Unlock() - tr.ctx, tr.ctxCancel = context.WithTimeout(ctx, handleTxesTimeout) - defer tr.ctxCancel() - return tr.handleTxesByState(ctx, blockHeight) -} + ctx, cancel := context.WithTimeout(ctx, handleTxesTimeout) + defer cancel() -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesByState(ctx context.Context, blockHeight int64) error { - tr.lggr.Info("Handling transactions by state") + for id := range tr.txCache { + if ctx.Err() != nil { + return ctx.Err() + } - for id, atx := range tr.txCache { - tx, err := tr.txStore.GetTxByID(ctx, atx.id) + tx, err := tr.txStore.GetTxByID(ctx, id) if err != nil { - return fmt.Errorf("failed to get tx by ID: %w", err) + tr.lggr.Errorf("failed to get tx by ID: %v", err) + continue + } + if tx == nil { + tr.lggr.Warnf("tx with ID %v no longer exists, removing from tracker", id) + delete(tr.txCache, id) + continue } switch tx.State { case TxConfirmed: - if err := tr.handleConfirmedTx(tx, blockHeight); err != nil { - return fmt.Errorf("failed to handle confirmed txes: %w", err) - } + // TODO: Handle finalized state https://smartcontract-it.atlassian.net/browse/BCI-2920 case TxConfirmedMissingReceipt, TxUnconfirmed: // Keep tracking tx case TxInProgress, TxUnstarted: @@ -266,50 +283,20 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleTxesB // is deleted, we can't sign it. errMsg := "The FromAddress for this Tx was deleted before this Tx could be broadcast to the chain." if err := tr.markTxFatal(ctx, tx, errMsg); err != nil { - return fmt.Errorf("failed to mark tx as fatal: %w", err) + tr.lggr.Errorf("failed to mark tx as fatal: %v", err) + continue } delete(tr.txCache, id) case TxFatalError: delete(tr.txCache, id) default: - tr.lggr.Errorw(fmt.Sprintf("unhandled transaction state: %v", tx.State)) + tr.lggr.Errorf("unhandled transaction state: %v", tx.State) } } return nil } -// handleConfirmedTx removes a transaction from the tracker if it's been finalized on chain -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) handleConfirmedTx( - tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], - blockHeight int64, -) error { - finalized, err := tr.txStore.IsTxFinalized(tr.ctx, blockHeight, tx.ID, tr.chainID) - if err != nil { - return fmt.Errorf("failed to check if tx is finalized: %w", err) - } - - if finalized { - delete(tr.txCache, tx.ID) - } - - return nil -} - -// insertTx inserts a transaction into the tracker as an AbandonedTx -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) insertTx( - tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) { - if _, contains := tr.txCache[tx.ID]; contains { - return - } - - tr.txCache[tx.ID] = AbandonedTx[ADDR]{ - id: tx.ID, - fromAddress: tx.FromAddress, - } - tr.lggr.Debugw(fmt.Sprintf("inserted tx %v", tx.ID)) -} - func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markTxFatal(ctx context.Context, tx *txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], errMsg string) error { @@ -323,22 +310,26 @@ func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markTxFatal return nil } -func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) MarkAllTxesFatal(ctx context.Context) { +// markAllTxesFatal tries to mark all txes in the txCache as fatal and removes them from the cache +func (tr *Tracker[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) markAllTxesFatal(ctx context.Context) { tr.lock.Lock() defer tr.lock.Unlock() + errMsg := fmt.Sprintf( - "fromAddress for this Tx was deleted, and existing attempts onchain didn't finalize within %d hours, thus this Tx was abandoned.", + "tx abandoned: fromAddress for this tx was deleted and existing attempts didn't finalize onchain within %d hours", int(tr.ttl.Hours())) - for _, atx := range tr.txCache { - tx, err := tr.txStore.GetTxByID(ctx, atx.id) + for id := range tr.txCache { + tx, err := tr.txStore.GetTxByID(ctx, id) if err != nil { - tr.lggr.Errorw(fmt.Errorf("failed to get tx by ID: %w", err).Error()) + tr.lggr.Errorf("failed to get tx by ID: %v", err) + delete(tr.txCache, id) continue } if err := tr.markTxFatal(ctx, tx, errMsg); err != nil { - tr.lggr.Errorw(fmt.Errorf("failed to mark tx as abandoned: %w", err).Error()) + tr.lggr.Errorf("failed to mark tx as abandoned: %v", err) } + delete(tr.txCache, id) } } diff --git a/common/txmgr/txmgr.go b/common/txmgr/txmgr.go index 39895941ffd..4d4eabe5c40 100644 --- a/common/txmgr/txmgr.go +++ b/common/txmgr/txmgr.go @@ -190,12 +190,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Start(ctx return fmt.Errorf("Txm: Estimator failed to start: %w", err) } - /* Tracker currently disabled for BCI-2638; refactor required - b.logger.Info("Txm starting tracker") if err := ms.Start(ctx, b.tracker); err != nil { return fmt.Errorf("Txm: Tracker failed to start: %w", err) } - */ b.logger.Info("Txm starting runLoop") b.wg.Add(1) @@ -275,12 +272,6 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) Close() (m merr = errors.Join(merr, fmt.Errorf("Txm: failed to close TxAttemptBuilder: %w", err)) } - /* Tracker currently disabled for BCI-2638; refactor required - if err := b.tracker.Close(); err != nil { - merr = errors.Join(merr, fmt.Errorf("Txm: failed to close Tracker: %w", err)) - } - */ - return nil }) } @@ -329,6 +320,9 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() if err := b.broadcaster.closeInternal(); err != nil { b.logger.Panicw(fmt.Sprintf("Failed to Close Broadcaster: %v", err), "err", err) } + if err := b.tracker.closeInternal(); err != nil { + b.logger.Panicw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err) + } if err := b.confirmer.closeInternal(); err != nil { b.logger.Panicw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err) } @@ -337,16 +331,17 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() close(r.done) } var wg sync.WaitGroup - // two goroutines to handle independent backoff retries starting: + // three goroutines to handle independent backoff retries starting: // - Broadcaster // - Confirmer + // - Tracker // If chStop is closed, we mark stopped=true so that the main runloop // can check and exit early if necessary // // execReset will not return until either: - // 1. Both Broadcaster and Confirmer started successfully + // 1. Broadcaster, Confirmer, and Tracker all started successfully // 2. chStop was closed (txmgr exit) - wg.Add(2) + wg.Add(3) go func() { defer wg.Done() // Retry indefinitely on failure @@ -366,6 +361,25 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() } } }() + go func() { + defer wg.Done() + // Retry indefinitely on failure + backoff := iutils.NewRedialBackoff() + for { + select { + case <-time.After(backoff.Duration()): + if err := b.tracker.startInternal(ctx); err != nil { + b.logger.Criticalw("Failed to start Tracker", "err", err) + b.SvcErrBuffer.Append(err) + continue + } + return + case <-b.chStop: + stopOnce.Do(func() { stopped = true }) + return + } + } + }() go func() { defer wg.Done() // Retry indefinitely on failure @@ -395,8 +409,7 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() b.broadcaster.Trigger(address) case head := <-b.chHeads: b.confirmer.mb.Deliver(head) - // Tracker currently disabled for BCI-2638; refactor required - // b.tracker.mb.Deliver(head.BlockNumber()) + b.tracker.mb.Deliver(head.BlockNumber()) case reset := <-b.reset: // This check prevents the weird edge-case where you can select // into this block after chStop has already been closed and the @@ -424,12 +437,10 @@ func (b *Txm[CHAIN_ID, HEAD, ADDR, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) runLoop() if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { b.logger.Errorw(fmt.Sprintf("Failed to Close Confirmer: %v", err), "err", err) } - /* Tracker currently disabled for BCI-2638; refactor required err = b.tracker.Close() if err != nil && (!errors.Is(err, services.ErrAlreadyStopped) || !errors.Is(err, services.ErrCannotStopUnstarted)) { b.logger.Errorw(fmt.Sprintf("Failed to Close Tracker: %v", err), "err", err) } - */ return case <-keysChanged: // This check prevents the weird edge-case where you can select diff --git a/common/txmgr/types/mocks/tx_store.go b/common/txmgr/types/mocks/tx_store.go index 8d70fe6b5a9..64193afff5b 100644 --- a/common/txmgr/types/mocks/tx_store.go +++ b/common/txmgr/types/mocks/tx_store.go @@ -700,29 +700,29 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) FindTxsRequ return r0, r1 } -// GetInProgressTxAttempts provides a mock function with given fields: ctx, address, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - ret := _m.Called(ctx, address, chainID) +// GetAbandonedTransactionsByBatch provides a mock function with given fields: ctx, chainID, enabledAddrs, offset, limit +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetAbandonedTransactionsByBatch(ctx context.Context, chainID CHAIN_ID, enabledAddrs []ADDR, offset uint, limit uint) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, chainID, enabledAddrs, offset, limit) if len(ret) == 0 { - panic("no return value specified for GetInProgressTxAttempts") + panic("no return value specified for GetAbandonedTransactionsByBatch") } - var r0 []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r0 []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, ADDR, CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { - return rf(ctx, address, chainID) + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID, []ADDR, uint, uint) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, chainID, enabledAddrs, offset, limit) } - if rf, ok := ret.Get(0).(func(context.Context, ADDR, CHAIN_ID) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { - r0 = rf(ctx, address, chainID) + if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID, []ADDR, uint, uint) []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, chainID, enabledAddrs, offset, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + r0 = ret.Get(0).([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) } } - if rf, ok := ret.Get(1).(func(context.Context, ADDR, CHAIN_ID) error); ok { - r1 = rf(ctx, address, chainID) + if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID, []ADDR, uint, uint) error); ok { + r1 = rf(ctx, chainID, enabledAddrs, offset, limit) } else { r1 = ret.Error(1) } @@ -730,29 +730,29 @@ func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgre return r0, r1 } -// GetNonFatalTransactions provides a mock function with given fields: ctx, chainID -func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { - ret := _m.Called(ctx, chainID) +// GetInProgressTxAttempts provides a mock function with given fields: ctx, address, chainID +func (_m *TxStore[ADDR, CHAIN_ID, TX_HASH, BLOCK_HASH, R, SEQ, FEE]) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error) { + ret := _m.Called(ctx, address, chainID) if len(ret) == 0 { - panic("no return value specified for GetNonFatalTransactions") + panic("no return value specified for GetInProgressTxAttempts") } - var r0 []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] + var r0 []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) ([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { - return rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, ADDR, CHAIN_ID) ([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], error)); ok { + return rf(ctx, address, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, CHAIN_ID) []*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { - r0 = rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, ADDR, CHAIN_ID) []txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]); ok { + r0 = rf(ctx, address, chainID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*txmgrtypes.Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) + r0 = ret.Get(0).([]txmgrtypes.TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) } } - if rf, ok := ret.Get(1).(func(context.Context, CHAIN_ID) error); ok { - r1 = rf(ctx, chainID) + if rf, ok := ret.Get(1).(func(context.Context, ADDR, CHAIN_ID) error); ok { + r1 = rf(ctx, address, chainID) } else { r1 = ret.Error(1) } diff --git a/common/txmgr/types/tx_store.go b/common/txmgr/types/tx_store.go index 43d41cb4d31..bca2d1e3647 100644 --- a/common/txmgr/types/tx_store.go +++ b/common/txmgr/types/tx_store.go @@ -85,7 +85,7 @@ type TransactionStore[ FindEarliestUnconfirmedTxAttemptBlock(ctx context.Context, chainID CHAIN_ID) (null.Int, error) GetTxInProgress(ctx context.Context, fromAddress ADDR) (etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetInProgressTxAttempts(ctx context.Context, address ADDR, chainID CHAIN_ID) (attempts []TxAttempt[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) - GetNonFatalTransactions(ctx context.Context, chainID CHAIN_ID) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) + GetAbandonedTransactionsByBatch(ctx context.Context, chainID CHAIN_ID, enabledAddrs []ADDR, offset, limit uint) (txs []*Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) GetTxByID(ctx context.Context, id int64) (tx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE], err error) HasInProgressTransaction(ctx context.Context, account ADDR, chainID CHAIN_ID) (exists bool, err error) LoadTxAttempts(ctx context.Context, etx *Tx[CHAIN_ID, ADDR, TX_HASH, BLOCK_HASH, SEQ, FEE]) error diff --git a/core/chains/evm/txmgr/evm_tx_store.go b/core/chains/evm/txmgr/evm_tx_store.go index 7a05f32c9e4..c8e664e8cfe 100644 --- a/core/chains/evm/txmgr/evm_tx_store.go +++ b/core/chains/evm/txmgr/evm_tx_store.go @@ -1292,26 +1292,28 @@ func (o *evmTxStore) SaveInProgressAttempt(ctx context.Context, attempt *TxAttem return nil } -func (o *evmTxStore) GetNonFatalTransactions(ctx context.Context, chainID *big.Int) (txes []*Tx, err error) { +func (o *evmTxStore) GetAbandonedTransactionsByBatch(ctx context.Context, chainID *big.Int, enabledAddrs []common.Address, offset, limit uint) (txes []*Tx, err error) { var cancel context.CancelFunc ctx, cancel = o.mergeContexts(ctx) defer cancel() - err = o.Transaction(ctx, true, func(orm *evmTxStore) error { - stmt := `SELECT * FROM evm.txes WHERE state <> 'fatal_error' AND evm_chain_id = $1` - var dbEtxs []DbEthTx - if err = orm.q.SelectContext(ctx, &dbEtxs, stmt, chainID.String()); err != nil { - return fmt.Errorf("failed to load evm.txes: %w", err) - } - txes = make([]*Tx, len(dbEtxs)) - dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) - err = o.LoadTxesAttempts(ctx, txes) - if err != nil { - return fmt.Errorf("failed to load evm.txes: %w", err) - } - return nil - }) - return txes, nil + var enabledAddrsBytea [][]byte + for _, addr := range enabledAddrs { + enabledAddrsBytea = append(enabledAddrsBytea, addr[:]) + } + + // TODO: include confirmed txes https://smartcontract-it.atlassian.net/browse/BCI-2920 + query := `SELECT * FROM evm.txes WHERE state <> 'fatal_error' AND state <> 'confirmed' AND evm_chain_id = $1 + AND from_address <> ALL($2) ORDER BY nonce ASC OFFSET $3 LIMIT $4` + + var dbEtxs []DbEthTx + if err = o.q.SelectContext(ctx, &dbEtxs, query, chainID.String(), enabledAddrsBytea, offset, limit); err != nil { + return nil, fmt.Errorf("failed to load evm.txes: %w", err) + } + txes = make([]*Tx, len(dbEtxs)) + dbEthTxsToEvmEthTxPtrs(dbEtxs, txes) + + return txes, err } func (o *evmTxStore) GetTxByID(ctx context.Context, id int64) (txe *Tx, err error) { diff --git a/core/chains/evm/txmgr/evm_tx_store_test.go b/core/chains/evm/txmgr/evm_tx_store_test.go index 5d3fdcfafd8..ff5c7ec4abc 100644 --- a/core/chains/evm/txmgr/evm_tx_store_test.go +++ b/core/chains/evm/txmgr/evm_tx_store_test.go @@ -7,6 +7,8 @@ import ( "testing" "time" + "github.com/ethereum/go-ethereum/common" + commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -1470,7 +1472,7 @@ func TestORM_GetTxInProgress(t *testing.T) { }) } -func TestORM_GetNonFatalTransactions(t *testing.T) { +func TestORM_GetAbandonedTransactionsByBatch(t *testing.T) { t.Parallel() db := pgtest.NewSqlxDB(t) @@ -1479,9 +1481,19 @@ func TestORM_GetNonFatalTransactions(t *testing.T) { ethKeyStore := cltest.NewKeyStore(t, db, cfg.Database()).Eth() ethClient := evmtest.NewEthClientMockWithDefaultChain(t) _, fromAddress := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + _, enabled := cltest.MustInsertRandomKeyReturningState(t, ethKeyStore) + enabledAddrs := []common.Address{enabled} + + t.Run("get 0 abandoned transactions", func(t *testing.T) { + txes, err := txStore.GetAbandonedTransactionsByBatch(testutils.Context(t), ethClient.ConfiguredChainID(), enabledAddrs, 0, 10) + require.NoError(t, err) + require.Empty(t, txes) + }) - t.Run("gets 0 non finalized eth transaction", func(t *testing.T) { - txes, err := txStore.GetNonFatalTransactions(testutils.Context(t), ethClient.ConfiguredChainID()) + t.Run("do not return enabled addresses", func(t *testing.T) { + _ = mustInsertInProgressEthTxWithAttempt(t, txStore, 123, enabled) + _ = mustCreateUnstartedGeneratedTx(t, txStore, enabled, ethClient.ConfiguredChainID()) + txes, err := txStore.GetAbandonedTransactionsByBatch(testutils.Context(t), ethClient.ConfiguredChainID(), enabledAddrs, 0, 10) require.NoError(t, err) require.Empty(t, txes) }) @@ -1490,13 +1502,32 @@ func TestORM_GetNonFatalTransactions(t *testing.T) { inProgressTx := mustInsertInProgressEthTxWithAttempt(t, txStore, 123, fromAddress) unstartedTx := mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, ethClient.ConfiguredChainID()) - txes, err := txStore.GetNonFatalTransactions(testutils.Context(t), ethClient.ConfiguredChainID()) + txes, err := txStore.GetAbandonedTransactionsByBatch(testutils.Context(t), ethClient.ConfiguredChainID(), enabledAddrs, 0, 10) require.NoError(t, err) + require.Len(t, txes, 2) for _, tx := range txes { require.True(t, tx.ID == inProgressTx.ID || tx.ID == unstartedTx.ID) } }) + + t.Run("get batches of transactions", func(t *testing.T) { + var batchSize uint = 10 + numTxes := 55 + for i := 0; i < numTxes; i++ { + _ = mustCreateUnstartedGeneratedTx(t, txStore, fromAddress, ethClient.ConfiguredChainID()) + } + + allTxes := make([]*txmgr.Tx, 0) + err := sqlutil.Batch(func(offset, limit uint) (count uint, err error) { + batchTxes, err := txStore.GetAbandonedTransactionsByBatch(testutils.Context(t), ethClient.ConfiguredChainID(), enabledAddrs, offset, limit) + require.NoError(t, err) + allTxes = append(allTxes, batchTxes...) + return uint(len(batchTxes)), nil + }, batchSize) + require.NoError(t, err) + require.Len(t, allTxes, numTxes+2) + }) } func TestORM_GetTxByID(t *testing.T) { diff --git a/core/chains/evm/txmgr/mocks/evm_tx_store.go b/core/chains/evm/txmgr/mocks/evm_tx_store.go index b6806f34d76..a05f2a22c60 100644 --- a/core/chains/evm/txmgr/mocks/evm_tx_store.go +++ b/core/chains/evm/txmgr/mocks/evm_tx_store.go @@ -821,29 +821,29 @@ func (_m *EvmTxStore) FindTxsRequiringResubmissionDueToInsufficientFunds(ctx con return r0, r1 } -// GetInProgressTxAttempts provides a mock function with given fields: ctx, address, chainID -func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address common.Address, chainID *big.Int) ([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { - ret := _m.Called(ctx, address, chainID) +// GetAbandonedTransactionsByBatch provides a mock function with given fields: ctx, chainID, enabledAddrs, offset, limit +func (_m *EvmTxStore) GetAbandonedTransactionsByBatch(ctx context.Context, chainID *big.Int, enabledAddrs []common.Address, offset uint, limit uint) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, chainID, enabledAddrs, offset, limit) if len(ret) == 0 { - panic("no return value specified for GetInProgressTxAttempts") + panic("no return value specified for GetAbandonedTransactionsByBatch") } - var r0 []types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r0 []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { - return rf(ctx, address, chainID) + if rf, ok := ret.Get(0).(func(context.Context, *big.Int, []common.Address, uint, uint) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, chainID, enabledAddrs, offset, limit) } - if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) []types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { - r0 = rf(ctx, address, chainID) + if rf, ok := ret.Get(0).(func(context.Context, *big.Int, []common.Address, uint, uint) []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, chainID, enabledAddrs, offset, limit) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + r0 = ret.Get(0).([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) } } - if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { - r1 = rf(ctx, address, chainID) + if rf, ok := ret.Get(1).(func(context.Context, *big.Int, []common.Address, uint, uint) error); ok { + r1 = rf(ctx, chainID, enabledAddrs, offset, limit) } else { r1 = ret.Error(1) } @@ -851,29 +851,29 @@ func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address commo return r0, r1 } -// GetNonFatalTransactions provides a mock function with given fields: ctx, chainID -func (_m *EvmTxStore) GetNonFatalTransactions(ctx context.Context, chainID *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { - ret := _m.Called(ctx, chainID) +// GetInProgressTxAttempts provides a mock function with given fields: ctx, address, chainID +func (_m *EvmTxStore) GetInProgressTxAttempts(ctx context.Context, address common.Address, chainID *big.Int) ([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error) { + ret := _m.Called(ctx, address, chainID) if len(ret) == 0 { - panic("no return value specified for GetNonFatalTransactions") + panic("no return value specified for GetInProgressTxAttempts") } - var r0 []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] + var r0 []types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee] var r1 error - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) ([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { - return rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) ([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee], error)); ok { + return rf(ctx, address, chainID) } - if rf, ok := ret.Get(0).(func(context.Context, *big.Int) []*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { - r0 = rf(ctx, chainID) + if rf, ok := ret.Get(0).(func(context.Context, common.Address, *big.Int) []types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]); ok { + r0 = rf(ctx, address, chainID) } else { if ret.Get(0) != nil { - r0 = ret.Get(0).([]*types.Tx[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) + r0 = ret.Get(0).([]types.TxAttempt[*big.Int, common.Address, common.Hash, common.Hash, evmtypes.Nonce, gas.EvmFee]) } } - if rf, ok := ret.Get(1).(func(context.Context, *big.Int) error); ok { - r1 = rf(ctx, chainID) + if rf, ok := ret.Get(1).(func(context.Context, common.Address, *big.Int) error); ok { + r1 = rf(ctx, address, chainID) } else { r1 = ret.Error(1) } diff --git a/core/chains/evm/txmgr/tracker_test.go b/core/chains/evm/txmgr/tracker_test.go index e95c005dc77..eefd89c69eb 100644 --- a/core/chains/evm/txmgr/tracker_test.go +++ b/core/chains/evm/txmgr/tracker_test.go @@ -1,7 +1,6 @@ package txmgr_test import ( - "context" "math/big" "testing" "time" @@ -10,6 +9,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr" ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/evmtest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -44,25 +44,23 @@ func containsID(txes []*txmgr.Tx, id int64) bool { } func TestEvmTracker_Initialization(t *testing.T) { - t.Skip("BCI-2638 tracker disabled") t.Parallel() tracker, _, _, _ := newTestEvmTrackerSetup(t) + ctx := testutils.Context(t) - err := tracker.Start(context.Background()) - require.NoError(t, err) + require.NoError(t, tracker.Start(ctx)) require.True(t, tracker.IsStarted()) t.Run("stop tracker", func(t *testing.T) { - err := tracker.Close() - require.NoError(t, err) + require.NoError(t, tracker.Close()) require.False(t, tracker.IsStarted()) }) } func TestEvmTracker_AddressTracking(t *testing.T) { - t.Skip("BCI-2638 tracker disabled") t.Parallel() + ctx := testutils.Context(t) t.Run("track abandoned addresses", func(t *testing.T) { ethClient := evmtest.NewEthClientMockWithDefaultChain(t) @@ -76,33 +74,37 @@ func TestEvmTracker_AddressTracking(t *testing.T) { _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) _ = mustCreateUnstartedTx(t, txStore, unstartedAddr, cltest.MustGenerateRandomKey(t).Address, []byte{}, 0, big.Int{}, ethClient.ConfiguredChainID()) - err := tracker.Start(context.Background()) + err := tracker.Start(ctx) require.NoError(t, err) defer func(tracker *txmgr.Tracker) { err = tracker.Close() require.NoError(t, err) }(tracker) + time.Sleep(waitTime) addrs := tracker.GetAbandonedAddresses() require.NotContains(t, addrs, inProgressAddr) require.NotContains(t, addrs, unstartedAddr) - require.Contains(t, addrs, confirmedAddr) require.Contains(t, addrs, unconfirmedAddr) }) + /* TODO: finalized tx state https://smartcontract-it.atlassian.net/browse/BCI-2920 t.Run("stop tracking finalized tx", func(t *testing.T) { - t.Skip("BCI-2638 tracker disabled") tracker, txStore, _, _ := newTestEvmTrackerSetup(t) confirmedAddr := cltest.MustGenerateRandomKey(t).Address _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, confirmedAddr, 123, 1) - err := tracker.Start(context.Background()) + err := tracker.Start(ctx) require.NoError(t, err) defer func(tracker *txmgr.Tracker) { err = tracker.Close() require.NoError(t, err) }(tracker) + // deliver block before minConfirmations + tracker.XXXDeliverBlock(1) + time.Sleep(waitTime) + addrs := tracker.GetAbandonedAddresses() require.Contains(t, addrs, confirmedAddr) @@ -113,26 +115,12 @@ func TestEvmTracker_AddressTracking(t *testing.T) { addrs = tracker.GetAbandonedAddresses() require.NotContains(t, addrs, confirmedAddr) }) + */ } func TestEvmTracker_ExceedingTTL(t *testing.T) { - t.Skip("BCI-2638 tracker disabled") t.Parallel() - - t.Run("confirmed but unfinalized transaction still tracked", func(t *testing.T) { - tracker, txStore, _, _ := newTestEvmTrackerSetup(t) - addr1 := cltest.MustGenerateRandomKey(t).Address - _ = mustInsertConfirmedEthTxWithReceipt(t, txStore, addr1, 123, 1) - - err := tracker.Start(context.Background()) - require.NoError(t, err) - defer func(tracker *txmgr.Tracker) { - err = tracker.Close() - require.NoError(t, err) - }(tracker) - - require.Contains(t, tracker.GetAbandonedAddresses(), addr1) - }) + ctx := testutils.Context(t) t.Run("exceeding ttl", func(t *testing.T) { tracker, txStore, _, _ := newTestEvmTrackerSetup(t) @@ -142,17 +130,17 @@ func TestEvmTracker_ExceedingTTL(t *testing.T) { tx2 := cltest.MustInsertUnconfirmedEthTx(t, txStore, 123, addr2) tracker.XXXTestSetTTL(time.Nanosecond) - err := tracker.Start(context.Background()) + err := tracker.Start(ctx) require.NoError(t, err) defer func(tracker *txmgr.Tracker) { err = tracker.Close() require.NoError(t, err) }(tracker) - time.Sleep(waitTime) + time.Sleep(100 * waitTime) require.NotContains(t, tracker.GetAbandonedAddresses(), addr1, addr2) - fatalTxes, err := txStore.GetFatalTransactions(context.Background()) + fatalTxes, err := txStore.GetFatalTransactions(ctx) require.NoError(t, err) require.True(t, containsID(fatalTxes, tx1.ID)) require.True(t, containsID(fatalTxes, tx2.ID))