Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: a new ChainIndexer to index tipsets, messages and events #12421

Open
wants to merge 67 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
fb19d79
chain index complete for msgs and txns
aarshkshah1992 Aug 29, 2024
58569d6
dont need observer changes for now
aarshkshah1992 Aug 29, 2024
5a3f76f
changes
aarshkshah1992 Aug 29, 2024
9ea48f3
fix tests
aarshkshah1992 Aug 29, 2024
1e3a9d5
fix tests
aarshkshah1992 Aug 29, 2024
4c34bc7
use th right context
aarshkshah1992 Aug 29, 2024
285ce26
index empty tipsets correctly
aarshkshah1992 Aug 30, 2024
12e67fe
implement automated backfilling
aarshkshah1992 Aug 30, 2024
3377987
add event indexing and remove all old indices
aarshkshah1992 Sep 4, 2024
e6331da
fix test
aarshkshah1992 Sep 4, 2024
f1f24c8
revert deployment test changes
aarshkshah1992 Sep 5, 2024
3f09e1e
revert test changes and better error handling for eth tx index lookups
aarshkshah1992 Sep 5, 2024
c3865b7
fix sql statments naming convention
aarshkshah1992 Sep 5, 2024
8ec7cd2
address review for Index GC
aarshkshah1992 Sep 5, 2024
a1c5201
more changes as per review
aarshkshah1992 Sep 5, 2024
e0831ee
changes as per review
aarshkshah1992 Sep 5, 2024
06ca87a
fix config
aarshkshah1992 Sep 5, 2024
c30079e
mark events as reverted during reconciliation
aarshkshah1992 Sep 5, 2024
01d78e3
better reconciliation; pens down and code complete; also reconcile ev…
aarshkshah1992 Sep 6, 2024
994a717
fix tests
aarshkshah1992 Sep 6, 2024
ad9bcb1
improve config and docs
aarshkshah1992 Sep 6, 2024
0c9a0ca
improve docs and error handling
aarshkshah1992 Sep 6, 2024
89cedb2
improve read logic
aarshkshah1992 Sep 6, 2024
a2a2b76
improve docs
aarshkshah1992 Sep 6, 2024
44af9f8
better logging and handle ennable event storage
aarshkshah1992 Sep 9, 2024
6608b80
improve logs and index init proc
aarshkshah1992 Sep 9, 2024
625d8c8
better logging
aarshkshah1992 Sep 9, 2024
fed08b0
fix bugs based on calibnet testing
aarshkshah1992 Sep 9, 2024
a5c56c1
create sqliite Indices
aarshkshah1992 Sep 10, 2024
7acd481
gc should be based on epochs
aarshkshah1992 Sep 10, 2024
821dcd4
fix event query
aarshkshah1992 Sep 10, 2024
cde46cb
foreign keys should be enabled on the DB
aarshkshah1992 Sep 10, 2024
727dae3
reverted tipsets should be removed as part of GC
aarshkshah1992 Sep 10, 2024
c07784d
release read lock
aarshkshah1992 Sep 10, 2024
896048a
make it easy to backfill an empty index using reconciliation
aarshkshah1992 Sep 10, 2024
602f660
better docs for reconciliation
aarshkshah1992 Sep 10, 2024
9750571
Merge remote-tracking branch 'origin/master' into feat/msg-eth-tx-index
aarshkshah1992 Sep 12, 2024
13c2824
fix conflicts with master
aarshkshah1992 Sep 12, 2024
37d6746
Apply suggestions from code review
aarshkshah1992 Sep 13, 2024
c4490bb
fix go mod
aarshkshah1992 Sep 13, 2024
93a8b76
fix formatting
aarshkshah1992 Sep 13, 2024
6f8530e
revert config changes
aarshkshah1992 Sep 13, 2024
627aff2
address changes in observer
aarshkshah1992 Sep 13, 2024
7244b66
remove top level chainindex package
aarshkshah1992 Sep 13, 2024
531cd38
changes as per review
aarshkshah1992 Sep 13, 2024
77fc462
changes as per review
aarshkshah1992 Sep 13, 2024
c2e5f68
changes as per review
aarshkshah1992 Sep 13, 2024
286af22
handle index with reverted tipsets during reconciliation
aarshkshah1992 Sep 13, 2024
d67a30a
changes as per review
aarshkshah1992 Sep 13, 2024
5f5ef3a
fix type of max reconcile epoch
aarshkshah1992 Sep 13, 2024
f5a5c61
changes to reconciliation as per review
aarshkshah1992 Sep 14, 2024
730d00a
log ipld error
aarshkshah1992 Sep 14, 2024
c099abf
better logging of progress
aarshkshah1992 Sep 14, 2024
951ce77
disable chain indexer hydrate from snapshot based on config
aarshkshah1992 Sep 14, 2024
ad6c086
always populate index
aarshkshah1992 Sep 14, 2024
52e104d
make config easy to reason about
aarshkshah1992 Sep 14, 2024
9c6c728
fix config
aarshkshah1992 Sep 14, 2024
1da1e07
fix messaging
aarshkshah1992 Sep 14, 2024
efe90f8
revert config changes
aarshkshah1992 Sep 14, 2024
c121321
Apply suggestions from code review
aarshkshah1992 Sep 16, 2024
2ff1d42
changes as per review
aarshkshah1992 Sep 16, 2024
c945bb5
make error messages homogenous
aarshkshah1992 Sep 16, 2024
432e09a
fix indentation
aarshkshah1992 Sep 16, 2024
af9bc23
changes as per review
aarshkshah1992 Sep 16, 2024
6d84b03
feat: recompute tipset to generate missing events if event indexing i…
aarshkshah1992 Sep 16, 2024
b9f1583
better docs for gc retention epoch
aarshkshah1992 Sep 16, 2024
1921abd
imrpove DB handling (#12485)
aarshkshah1992 Sep 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 0 additions & 150 deletions chain/ethhashlookup/eth_transaction_hash_lookup.go

This file was deleted.

60 changes: 24 additions & 36 deletions chain/events/filter/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/go-state-types/abi"
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"

"github.com/filecoin-project/lotus/chain/index"
cstore "github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
)
Expand All @@ -32,7 +33,7 @@ type AddressResolver func(context.Context, abi.ActorID, *types.TipSet) (address.
type EventFilter interface {
Filter

TakeCollectedEvents(context.Context) []*CollectedEvent
TakeCollectedEvents(context.Context) []*index.CollectedEvent
CollectEvents(context.Context, *TipSetEvents, bool, AddressResolver) error
}

Expand All @@ -47,24 +48,13 @@ type eventFilter struct {
maxResults int // maximum number of results to collect, 0 is unlimited

mu sync.Mutex
collected []*CollectedEvent
collected []*index.CollectedEvent
lastTaken time.Time
ch chan<- interface{}
}

var _ Filter = (*eventFilter)(nil)

type CollectedEvent struct {
rvagg marked this conversation as resolved.
Show resolved Hide resolved
Entries []types.EventEntry
EmitterAddr address.Address // address of emitter
EventIdx int // index of the event within the list of emitted events in a given tipset
Reverted bool
Height abi.ChainEpoch
TipSetKey types.TipSetKey // tipset that contained the message
MsgIdx int // index of the message in the tipset
MsgCid cid.Cid // cid of message that produced event
}

func (f *eventFilter) ID() types.FilterID {
return f.id
}
Expand Down Expand Up @@ -119,7 +109,7 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
}

// event matches filter, so record it
cev := &CollectedEvent{
cev := &index.CollectedEvent{
Entries: ev.Entries,
EmitterAddr: addr,
EventIdx: eventCount,
Expand Down Expand Up @@ -151,13 +141,13 @@ func (f *eventFilter) CollectEvents(ctx context.Context, te *TipSetEvents, rever
return nil
}

func (f *eventFilter) setCollectedEvents(ces []*CollectedEvent) {
func (f *eventFilter) setCollectedEvents(ces []*index.CollectedEvent) {
f.mu.Lock()
f.collected = ces
f.mu.Unlock()
}

func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*CollectedEvent {
func (f *eventFilter) TakeCollectedEvents(ctx context.Context) []*index.CollectedEvent {
f.mu.Lock()
collected := f.collected
f.collected = nil
Expand Down Expand Up @@ -307,7 +297,7 @@ type EventFilterManager struct {
ChainStore *cstore.ChainStore
AddressResolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)
MaxFilterResults int
EventIndex *EventIndex
ChainIndexer index.Indexer

mu sync.Mutex // guards mutations to filters
filters map[types.FilterID]EventFilter
Expand All @@ -319,7 +309,7 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
defer m.mu.Unlock()
m.currentHeight = to.Height()

if len(m.filters) == 0 && m.EventIndex == nil {
if len(m.filters) == 0 {
return nil
}

Expand All @@ -329,12 +319,6 @@ func (m *EventFilterManager) Apply(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, false, m.AddressResolver); err != nil {
Expand All @@ -350,7 +334,7 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
defer m.mu.Unlock()
m.currentHeight = to.Height()

if len(m.filters) == 0 && m.EventIndex == nil {
if len(m.filters) == 0 {
return nil
}

Expand All @@ -360,12 +344,6 @@ func (m *EventFilterManager) Revert(ctx context.Context, from, to *types.TipSet)
load: m.loadExecutedMessages,
}

if m.EventIndex != nil {
if err := m.EventIndex.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
return err
}
}

// TODO: could run this loop in parallel with errgroup if there are many filters
for _, f := range m.filters {
if err := f.CollectEvents(ctx, tse, true, m.AddressResolver); err != nil {
Expand All @@ -386,7 +364,7 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
currentHeight := m.currentHeight
m.mu.Unlock()

if m.EventIndex == nil && minHeight != -1 && minHeight < currentHeight {
if m.ChainIndexer == nil && minHeight != -1 && minHeight < currentHeight {
return nil, xerrors.Errorf("historic event index disabled")
}

Expand All @@ -405,11 +383,21 @@ func (m *EventFilterManager) Install(ctx context.Context, minHeight, maxHeight a
maxResults: m.MaxFilterResults,
}

if m.EventIndex != nil && minHeight != -1 && minHeight < currentHeight {
// Filter needs historic events
if err := m.EventIndex.prefillFilter(ctx, f, excludeReverted); err != nil {
return nil, err
if m.ChainIndexer != nil && minHeight != -1 && minHeight < currentHeight {
ef := &index.EventFilter{
MinHeight: minHeight,
MaxHeight: maxHeight,
TipsetCid: tipsetCid,
Addresses: addresses,
KeysWithCodec: keysWithCodec,
MaxResults: m.MaxFilterResults,
}
ces, err := m.ChainIndexer.GetEventsForFilter(ctx, ef, excludeReverted)
if err != nil {
return nil, xerrors.Errorf("get events for filter: %w", err)
}

f.setCollectedEvents(ces)
}

m.mu.Lock()
Expand Down
7 changes: 4 additions & 3 deletions chain/events/filter/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/index"
"github.com/filecoin-project/lotus/chain/types"
)

Expand Down Expand Up @@ -70,8 +71,8 @@ func TestEventFilterCollectEvents(t *testing.T) {
cid14000, err := events14000.msgTs.Key().Cid()
require.NoError(t, err, "tipset cid")

noCollectedEvents := []*CollectedEvent{}
oneCollectedEvent := []*CollectedEvent{
noCollectedEvents := []*index.CollectedEvent{}
oneCollectedEvent := []*index.CollectedEvent{
{
Entries: ev1.Entries,
EmitterAddr: a1,
Expand All @@ -88,7 +89,7 @@ func TestEventFilterCollectEvents(t *testing.T) {
name string
filter *eventFilter
te *TipSetEvents
want []*CollectedEvent
want []*index.CollectedEvent
}{
{
name: "nomatch tipset min height",
Expand Down
Loading
Loading