diff --git a/CHANGELOG.md b/CHANGELOG.md index 06882a793a4..c519d607778 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ - https://github.com/filecoin-project/lotus/pull/12292: feat: p2p: allow overriding bootstrap nodes with environmemnt variable - https://github.com/filecoin-project/lotus/pull/12319: feat: `lotus send CLI`: allow sending to ETH addresses - https://github.com/filecoin-project/lotus/pull/12332: fix: ETH RPC: receipts: use correct txtype in receipts +- https://github.com/filecoin-project/lotus/pull/12335: fix: lotus-shed: store processed tipset after backfilling events ## New features diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 812d8059bb7..f81f89f1eab 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -26,9 +26,10 @@ import ( const ( // same as in chain/events/index.go - eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` - insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` - insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` + eventExists = `SELECT MAX(id) FROM event WHERE height=? AND tipset_key=? AND tipset_key_cid=? AND emitter_addr=? AND event_index=? AND message_cid=? AND message_index=?` + insertEvent = `INSERT OR IGNORE INTO event(height, tipset_key, tipset_key_cid, emitter_addr, event_index, message_cid, message_index, reverted) VALUES(?, ?, ?, ?, ?, ?, ?, ?)` + insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` + upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -176,6 +177,11 @@ var backfillEventsCmd = &cli.Command{ return err } + stmtUpsertEventSeen, err := db.Prepare(upsertEventsSeen) + if err != nil { + return err + } + processHeight := func(ctx context.Context, cnt int, msgs []lapi.Message, receipts []*types.MessageReceipt) error { var tx *sql.Tx for { @@ -196,6 +202,11 @@ var backfillEventsCmd = &cli.Command{ var eventsAffected int64 var entriesAffected int64 + tsKeyCid, err := currTs.Key().Cid() + if err != nil { + return fmt.Errorf("failed to get tipset key cid: %w", err) + } + // loop over each message receipt and backfill the events for idx, receipt := range receipts { msg := msgs[idx] @@ -225,11 +236,6 @@ var backfillEventsCmd = &cli.Command{ addressLookups[event.Emitter] = addr } - tsKeyCid, err := currTs.Key().Cid() - if err != nil { - return fmt.Errorf("failed to get tipset key cid: %w", err) - } - // select the highest event id that exists in database, or null if none exists var entryID sql.NullInt64 err = tx.Stmt(stmtEventExists).QueryRow( @@ -299,6 +305,15 @@ var backfillEventsCmd = &cli.Command{ } } + // mark the tipset as processed + _, err = tx.Stmt(stmtUpsertEventSeen).Exec( + currTs.Height(), + tsKeyCid.Bytes(), + ) + if err != nil { + return xerrors.Errorf("exec upsert events seen: %w", err) + } + err = tx.Commit() if err != nil { return fmt.Errorf("failed to commit transaction: %w", err)