Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

LogPoller CLI command to resolve reorg greater than finality depth #12867

Merged
merged 11 commits into from
Apr 26, 2024
7 changes: 7 additions & 0 deletions .changeset/brave-dots-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"chainlink": minor
---

Added a new CLI command, `blocks find-lca,` which finds the latest block that is available in both the database and on the chain for the specified chain.
Added a new CLI command, `node remove-blocks,` which removes all blocks and logs greater than or equal to the specified block number.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What was the rationale behind placing one of these commands under blocks and the other one under node. Seems like it might be more difficult to find them both if they're not in the same place?

Because these are both evm-specific commands, I think they should probably both go under chainlink chains evm... although I do notice there are some commands, such as chainlink blocks replay which are also evm-specific.
(We should probably move that one under chainlink chains evm soon also. I'll mention that to those more involved with LOOP arch planning.)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that it's confusing. I've just tried to follow existing structure of the commands.

Existing blocks replay interacts with the LogPoller and triggers it to replay blocks. find-lca - also interacts with the LogPoller and requests latest good block. Thus it made sense to me to group them together.
At the same time chains evm simply lists available evm chains.

Initially I've planed to have remove-blocks in the blocks section, but realised that it needs unique lock on the database. node/local sections seems to contain all commands that must be executed on local node only. It also contains rebroadcast-transactions, which is EVM/TXM related but still resides here. I figured that all local commands must be grouped together to highlight that they are executed on the local node.

Foundations are the owners of this package, so It's up to you to decide where to put the commands.
Should I move them?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, given that one of them is local and the other isn't I agree that it makes sense to have remove-blocks with rebroadcast-transactions and find-lca with replay. Unfortunately, I think we're going to want to move remove-blocks and rebroadcast-transactions soon to be under chainlink blocks evm and chainlink node evm. So I'm not sure whether it's better to add them now in the same place and move them later, or just add them in the right place now and move the other two later. Asking rest of my team for some advice on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dhaidashenko Alright, yeah let's just add them here now and we can move them all together later.

#nops #added
8 changes: 8 additions & 0 deletions core/chains/evm/logpoller/disabled.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,3 +114,11 @@ func (d disabled) LatestBlockByEventSigsAddrsWithConfs(ctx context.Context, from
func (d disabled) LogsDataWordBetween(ctx context.Context, eventSig common.Hash, address common.Address, wordIndexMin, wordIndexMax int, wordValue common.Hash, confs Confirmations) ([]Log, error) {
return nil, ErrDisabled
}

func (d disabled) FindLCA(ctx context.Context) (*LogPollerBlock, error) {
return nil, ErrDisabled
}

func (d disabled) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error {
return ErrDisabled
}
99 changes: 99 additions & 0 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ type LogPoller interface {
GetFilters() map[string]Filter
LatestBlock(ctx context.Context) (LogPollerBlock, error)
GetBlocksRange(ctx context.Context, numbers []uint64) ([]LogPollerBlock, error)
FindLCA(ctx context.Context) (*LogPollerBlock, error)
DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error

// General querying
Logs(ctx context.Context, start, end int64, eventSig common.Hash, address common.Address) ([]Log, error)
Expand Down Expand Up @@ -1422,6 +1424,103 @@ func (lp *logPoller) IndexedLogsWithSigsExcluding(ctx context.Context, address c
return lp.orm.SelectIndexedLogsWithSigsExcluding(ctx, eventSigA, eventSigB, topicIndex, address, fromBlock, toBlock, confs)
}

// DeleteLogsAndBlocksAfter - removes blocks and logs starting from the specified block
func (lp *logPoller) DeleteLogsAndBlocksAfter(ctx context.Context, start int64) error {
return lp.orm.DeleteLogsAndBlocksAfter(ctx, start)
}

func (lp *logPoller) FindLCA(ctx context.Context) (*LogPollerBlock, error) {
latest, err := lp.orm.SelectLatestBlock(ctx)
if err != nil {
return nil, fmt.Errorf("failed to select the latest block: %w", err)
}

oldest, err := lp.orm.SelectOldestBlock(ctx, 0)
if err != nil {
return nil, fmt.Errorf("failed to select the oldest block: %w", err)
}

if latest == nil || oldest == nil {
return nil, fmt.Errorf("expected at least one block to be present in DB")
}

lp.lggr.Debugf("Received request to find LCA. Searching in range [%d, %d]", oldest.BlockNumber, latest.BlockNumber)

// Find the largest block number for which block hash stored in the DB matches one that we get from the RPC.
// `sort.Find` expects slice of following format s = [1, 0, -1] and returns smallest index i for which s[i] = 0.
// To utilise `sort.Find` we represent range of blocks as slice [latestBlock, latestBlock-1, ..., olderBlock+1, oldestBlock]
// and return 1 if DB block was reorged or 0 if it's still present on chain.
lcaI, found := sort.Find(int(latest.BlockNumber-oldest.BlockNumber)+1, func(i int) int {
const notFound = 1
const found = 0
// if there is an error - stop the search
if err != nil {
return notFound
}

// canceled search
if ctx.Err() != nil {
err = fmt.Errorf("aborted, FindLCA request cancelled: %w", ctx.Err())
return notFound
}
iBlockNumber := latest.BlockNumber - int64(i)
var dbBlock *LogPollerBlock
// Block with specified block number might not exist in the database, to address that we check closest child
// of the iBlockNumber. If the child is present on chain, it's safe to assume that iBlockNumber is present too
dbBlock, err = lp.orm.SelectOldestBlock(ctx, iBlockNumber)
if err != nil {
err = fmt.Errorf("failed to select block %d by number: %w", iBlockNumber, err)
return notFound
}

if dbBlock == nil {
err = fmt.Errorf("expected block to exist with blockNumber >= %d as observed block with number %d", iBlockNumber, latest.BlockNumber)
return notFound
}

lp.lggr.Debugf("Looking for matching block on chain blockNumber: %d blockHash: %s",
dbBlock.BlockNumber, dbBlock.BlockHash)
var chainBlock *evmtypes.Head
chainBlock, err = lp.ec.HeadByHash(ctx, dbBlock.BlockHash)
// our block in DB does not exist on chain
if (chainBlock == nil && err == nil) || errors.Is(err, ethereum.NotFound) {
err = nil
return notFound
}
if err != nil {
err = fmt.Errorf("failed to get block %s from RPC: %w", dbBlock.BlockHash, err)
return notFound
}

if chainBlock.BlockNumber() != dbBlock.BlockNumber {
err = fmt.Errorf("expected block numbers to match (db: %d, chain: %d), if block hashes match "+
"(db: %s, chain: %s)", dbBlock.BlockNumber, chainBlock.BlockNumber(), dbBlock.BlockHash, chainBlock.Hash)
return notFound
}

return found
})
if err != nil {
return nil, fmt.Errorf("failed to find: %w", err)
}

if !found {
return nil, fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured")
}

lcaBlockNumber := latest.BlockNumber - int64(lcaI)
lca, err := lp.orm.SelectBlockByNumber(ctx, lcaBlockNumber)
if err != nil {
return nil, fmt.Errorf("failed to select lca from db: %w", err)
}

if lca == nil {
return nil, fmt.Errorf("expected lca (blockNum: %d) to exist in DB", lcaBlockNumber)
}

return lca, nil
}

func EvmWord(i uint64) common.Hash {
var b = make([]byte, 8)
binary.BigEndian.PutUint64(b, i)
Expand Down
116 changes: 116 additions & 0 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1921,3 +1921,119 @@ func markBlockAsFinalizedByHash(t *testing.T, th TestHarness, blockHash common.H
require.NoError(t, err)
th.Client.Blockchain().SetFinalized(b.Header())
}

func TestFindLCA(t *testing.T) {
ctx := testutils.Context(t)
ec := evmtest.NewEthClientMockWithDefaultChain(t)
lggr := logger.Test(t)
chainID := testutils.NewRandomEVMChainID()
db := pgtest.NewSqlxDB(t)

orm := logpoller.NewORM(chainID, db, lggr)

lpOpts := logpoller.Opts{
PollPeriod: time.Hour,
FinalityDepth: 2,
BackfillBatchSize: 20,
RpcBatchSize: 10,
KeepFinalizedBlocksDepth: 1000,
}

lp := logpoller.NewLogPoller(orm, ec, lggr, lpOpts)
t.Run("Fails, if failed to select oldest block", func(t *testing.T) {
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, "failed to select the latest block")
})
// oldest
require.NoError(t, orm.InsertBlock(ctx, common.HexToHash("0x123"), 10, time.Now(), 0))
// latest
latestBlockHash := common.HexToHash("0x124")
require.NoError(t, orm.InsertBlock(ctx, latestBlockHash, 16, time.Now(), 0))
t.Run("Fails, if caller's context canceled", func(t *testing.T) {
lCtx, cancel := context.WithCancel(ctx)
ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, nil).Run(func(_ mock.Arguments) {
cancel()
}).Once()
_, err := lp.FindLCA(lCtx)
require.ErrorContains(t, err, "aborted, FindLCA request cancelled")

})
t.Run("Fails, if RPC returns an error", func(t *testing.T) {
expectedError := fmt.Errorf("failed to call RPC")
ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(nil, expectedError).Once()
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, expectedError.Error())
})
t.Run("Fails, if block numbers do not match", func(t *testing.T) {
ec.On("HeadByHash", mock.Anything, latestBlockHash).Return(&evmtypes.Head{
Number: 123,
}, nil).Once()
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, "expected block numbers to match")
})
t.Run("Fails, if none of the blocks in db matches on chain", func(t *testing.T) {
ec.On("HeadByHash", mock.Anything, mock.Anything).Return(nil, nil).Times(3)
_, err := lp.FindLCA(ctx)
require.ErrorContains(t, err, "failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured")
})

type block struct {
BN int
Exists bool
}
testCases := []struct {
Name string
Blocks []block
ExpectedBlockNumber int
ExpectedError error
}{
{
Name: "All of the blocks are present on chain - returns the latest",
Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: true}},
ExpectedBlockNumber: 4,
},
{
Name: "None of the blocks exists on chain - returns an erro",
Blocks: []block{{BN: 1, Exists: false}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}},
ExpectedBlockNumber: 0,
ExpectedError: fmt.Errorf("failed to find LCA, this means that whole database LogPoller state was reorged out of chain or RPC/Core node is misconfigured"),
},
{
Name: "Only latest block does not exist",
Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: true}, {BN: 3, Exists: true}, {BN: 4, Exists: false}},
ExpectedBlockNumber: 3,
},
{
Name: "Only oldest block exists on chain",
Blocks: []block{{BN: 1, Exists: true}, {BN: 2, Exists: false}, {BN: 3, Exists: false}, {BN: 4, Exists: false}},
ExpectedBlockNumber: 1,
},
}

blockHashI := int64(0)
for _, tc := range testCases {
t.Run(tc.Name, func(t *testing.T) {
// reset the database
require.NoError(t, orm.DeleteLogsAndBlocksAfter(ctx, 0))
for _, b := range tc.Blocks {
blockHashI++
hash := common.BigToHash(big.NewInt(blockHashI))
require.NoError(t, orm.InsertBlock(ctx, hash, int64(b.BN), time.Now(), 0))
// Hashes are unique for all test cases
var onChainBlock *evmtypes.Head
if b.Exists {
onChainBlock = &evmtypes.Head{Number: int64(b.BN)}
}
ec.On("HeadByHash", mock.Anything, hash).Return(onChainBlock, nil).Maybe()
}

result, err := lp.FindLCA(ctx)
if tc.ExpectedError != nil {
require.ErrorContains(t, err, tc.ExpectedError.Error())
} else {
require.NotNil(t, result)
require.Equal(t, result.BlockNumber, int64(tc.ExpectedBlockNumber), "expected block numbers to match")
}
})
}
}
48 changes: 48 additions & 0 deletions core/chains/evm/logpoller/mocks/log_poller.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions core/chains/evm/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,12 @@ func (o *ObservedORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, e
})
}

func (o *ObservedORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) {
return withObservedQuery(o, "SelectOldestBlock", func() (*LogPollerBlock, error) {
return o.ORM.SelectOldestBlock(ctx, minAllowedBlockNumber)
})
}

func (o *ObservedORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) {
return withObservedQuery(o, "SelectLatestLogByEventSigWithConfs", func() (*Log, error) {
return o.ORM.SelectLatestLogByEventSigWithConfs(ctx, eventSig, address, confs)
Expand Down
9 changes: 9 additions & 0 deletions core/chains/evm/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ORM interface {
SelectBlockByNumber(ctx context.Context, blockNumber int64) (*LogPollerBlock, error)
SelectBlockByHash(ctx context.Context, hash common.Hash) (*LogPollerBlock, error)
SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error)

SelectLogs(ctx context.Context, start, end int64, address common.Address, eventSig common.Hash) ([]Log, error)
SelectLogsWithSigs(ctx context.Context, start, end int64, address common.Address, eventSigs []common.Hash) ([]Log, error)
Expand Down Expand Up @@ -202,6 +203,14 @@ func (o *DSORM) SelectLatestBlock(ctx context.Context) (*LogPollerBlock, error)
return &b, nil
}

func (o *DSORM) SelectOldestBlock(ctx context.Context, minAllowedBlockNumber int64) (*LogPollerBlock, error) {
var b LogPollerBlock
if err := o.ds.GetContext(ctx, &b, `SELECT * FROM evm.log_poller_blocks WHERE evm_chain_id = $1 AND block_number >= $2 ORDER BY block_number ASC LIMIT 1`, ubig.New(o.chainID), minAllowedBlockNumber); err != nil {
return nil, err
}
return &b, nil
}

func (o *DSORM) SelectLatestLogByEventSigWithConfs(ctx context.Context, eventSig common.Hash, address common.Address, confs Confirmations) (*Log, error) {
args, err := newQueryArgsForEvent(o.chainID, address, eventSig).
withConfs(confs).
Expand Down
30 changes: 30 additions & 0 deletions core/chains/evm/logpoller/orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1759,3 +1759,33 @@ func Benchmark_DeleteExpiredLogs(b *testing.B) {
assert.NoError(b, err1)
}
}

func TestSelectOldestBlock(t *testing.T) {
th := SetupTH(t, lpOpts)
o1 := th.ORM
o2 := th.ORM2
ctx := testutils.Context(t)
t.Run("Selects oldest within given chain", func(t *testing.T) {
// insert blocks
require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1231"), 11, time.Now(), 0))
require.NoError(t, o2.InsertBlock(ctx, common.HexToHash("0x1232"), 12, time.Now(), 0))
// insert newer block from different chain
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0))
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1231"), 14, time.Now(), 0))
block, err := o1.SelectOldestBlock(ctx, 0)
require.NoError(t, err)
require.NotNil(t, block)
require.Equal(t, block.BlockNumber, int64(13))
require.Equal(t, block.BlockHash, common.HexToHash("0x1233"))
})
t.Run("Does not select blocks older than specified limit", func(t *testing.T) {
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1232"), 11, time.Now(), 0))
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1233"), 13, time.Now(), 0))
require.NoError(t, o1.InsertBlock(ctx, common.HexToHash("0x1234"), 15, time.Now(), 0))
block, err := o1.SelectOldestBlock(ctx, 12)
require.NoError(t, err)
require.NotNil(t, block)
require.Equal(t, block.BlockNumber, int64(13))
require.Equal(t, block.BlockHash, common.HexToHash("0x1233"))
})
}
Loading
Loading