From 9ad34e5c69e6480c12faa1af5f2857070cebc919 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Sat, 20 Jul 2024 14:35:52 +0200 Subject: [PATCH 1/4] core/filtermaps: two dimensional log filter --- core/filtermaps/filtermaps.go | 582 ++++++++++++++++++++++++++++++ core/filtermaps/indexer.go | 618 ++++++++++++++++++++++++++++++++ core/filtermaps/matcher.go | 500 ++++++++++++++++++++++++++ core/rawdb/accessors_indexes.go | 206 +++++++++++ core/rawdb/schema.go | 31 ++ eth/api_backend.go | 2 + eth/backend.go | 13 +- eth/filters/filter.go | 41 ++- eth/filters/filter_system.go | 5 + internal/ethapi/backend.go | 5 + 10 files changed, 1991 insertions(+), 12 deletions(-) create mode 100644 core/filtermaps/filtermaps.go create mode 100644 core/filtermaps/indexer.go create mode 100644 core/filtermaps/matcher.go diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go new file mode 100644 index 000000000000..a265696041fa --- /dev/null +++ b/core/filtermaps/filtermaps.go @@ -0,0 +1,582 @@ +package filtermaps + +import ( + "context" + "crypto/sha256" + "encoding/binary" + "errors" + "sort" + "sync" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/lru" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/log" +) + +const ( + logMapHeight = 12 // log2(mapHeight) + mapHeight = 1 << logMapHeight // filter map height (number of rows) + logMapsPerEpoch = 6 // log2(mmapsPerEpochapsPerEpoch) + mapsPerEpoch = 1 << logMapsPerEpoch // number of maps in an epoch + logValuesPerMap = 16 // log2(logValuesPerMap) + valuesPerMap = 1 << logValuesPerMap // number of log values marked on each filter map + + headCacheSize = 8 // maximum number of recent filter maps cached in memory +) + +// FilterMaps is the in-memory representation of the log index structure that is +// responsible for building and updating the index according to the canonical +// chain. +// Note that FilterMaps implements the same data structure as proposed in EIP-7745 +// without the tree hashing and consensus changes: +// https://eips.ethereum.org/EIPS/eip-7745 +type FilterMaps struct { + lock sync.RWMutex + db ethdb.KeyValueStore + closeCh chan chan struct{} + + filterMapsRange + chain *core.BlockChain + + // filterMapCache caches certain filter maps (headCacheSize most recent maps + // and one tail map) that are expected to be frequently accessed and modified + // while updating the structure. Note that the set of cached maps depends + // only on filterMapsRange and rows of other maps are not cached here. + filterMapLock sync.Mutex + filterMapCache map[uint32]*filterMap + blockPtrCache *lru.Cache[uint32, uint64] + lvPointerCache *lru.Cache[uint64, uint64] + revertPoints map[uint64]*revertPoint +} + +// filterMap is a full or partial in-memory representation of a filter map where +// rows are allowed to have a nil value meaning the row is not stored in the +// structure. Note that therefore a known empty row should be represented with +// a zero-length slice. +// It can be used as a memory cache or an overlay while preparing a batch of +// changes to the structure. In either case a nil value should be interpreted +// as transparent (uncached/unchanged). +type filterMap [mapHeight]FilterRow + +// FilterRow encodes a single row of a filter map as a list of column indices. +// Note that the values are always stored in the same order as they were added +// and if the same column index is added twice, it is also stored twice. +// Order of column indices and potential duplications do not matter when searching +// for a value but leaving the original order makes reverting to a previous state +// simpler. +type FilterRow []uint32 + +// emptyRow represents an empty FilterRow. Note that in case of decoded FilterRows +// nil has a special meaning (transparent; not stored in the cache/overlay map) +// and therefore an empty row is represented by a zero length slice. +var emptyRow = FilterRow{} + +// filterMapsRange describes the block range that has been indexed and the log +// value index range it has been mapped to. +type filterMapsRange struct { + initialized bool + headLvPointer, tailLvPointer uint64 + headBlockNumber, tailBlockNumber uint64 + headBlockHash, tailParentHash common.Hash +} + +// NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep +// the structure in sync with the given blockchain. +func NewFilterMaps(db ethdb.KeyValueStore, chain *core.BlockChain) *FilterMaps { + rs, err := rawdb.ReadFilterMapsRange(db) + if err != nil { + log.Error("Error reading log index range", "error", err) + } + fm := &FilterMaps{ + db: db, + chain: chain, + closeCh: make(chan chan struct{}), + filterMapsRange: filterMapsRange{ + initialized: rs.Initialized, + headLvPointer: rs.HeadLvPointer, + tailLvPointer: rs.TailLvPointer, + headBlockNumber: rs.HeadBlockNumber, + tailBlockNumber: rs.TailBlockNumber, + headBlockHash: rs.HeadBlockHash, + tailParentHash: rs.TailParentHash, + }, + filterMapCache: make(map[uint32]*filterMap), + blockPtrCache: lru.NewCache[uint32, uint64](1000), + lvPointerCache: lru.NewCache[uint64, uint64](1000), + revertPoints: make(map[uint64]*revertPoint), + } + if !fm.initialized { + fm.resetDb() + } + fm.updateMapCache() + if rp, err := fm.newUpdateBatch().makeRevertPoint(); err == nil { + fm.revertPoints[rp.blockNumber] = rp + } else { + log.Error("Error creating head revert point", "error", err) + } + go fm.updateLoop() + return fm +} + +// Close ensures that the indexer is fully stopped before returning. +func (f *FilterMaps) Close() { + ch := make(chan struct{}) + f.closeCh <- ch + <-ch +} + +// FilterMapsMatcherBackend implements MatcherBackend. +type FilterMapsMatcherBackend FilterMaps + +// GetFilterMapRow returns the given row of the given map. If the row is empty +// then a non-nil zero length row is returned. +// Note that the returned slices should not be modified, they should be copied +// on write. +// GetFilterMapRow implements MatcherBackend. +func (ff *FilterMapsMatcherBackend) GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) { + f := (*FilterMaps)(ff) + return f.getFilterMapRow(mapIndex, rowIndex) +} + +// GetBlockLvPointer returns the starting log value index where the log values +// generated by the given block are located. If blockNumber is beyond the current +// head then the first unoccupied log value index is returned. +// GetBlockLvPointer implements MatcherBackend. +func (ff *FilterMapsMatcherBackend) GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) { + f := (*FilterMaps)(ff) + f.lock.RLock() + defer f.lock.RUnlock() + + return f.getBlockLvPointer(blockNumber) +} + +// GetLogByLvIndex returns the log at the given log value index. If the index does +// not point to the first log value entry of a log then no log and no error are +// returned as this can happen when the log value index was a false positive. +// Note that this function assumes that the log index structure is consistent +// with the canonical chain at the point where the given log value index points. +// If this is not the case then an invalid result or an error may be returned. +// GetLogByLvIndex implements MatcherBackend. +func (ff *FilterMapsMatcherBackend) GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) { + f := (*FilterMaps)(ff) + f.lock.RLock() + defer f.lock.RUnlock() + + return f.getLogByLvIndex(lvIndex) +} + +// reset un-initializes the FilterMaps structure and removes all related data from +// the database. +// Note that this function assumes that the read/write lock is being held. +func (f *FilterMaps) reset() { + // deleting the range first ensures that resetDb will be called again at next + // startup and any leftover data will be removed even if it cannot finish now. + rawdb.DeleteFilterMapsRange(f.db) + f.resetDb() + f.filterMapsRange = filterMapsRange{} + f.filterMapCache = make(map[uint32]*filterMap) + f.revertPoints = make(map[uint64]*revertPoint) + f.blockPtrCache.Purge() + f.lvPointerCache.Purge() +} + +// resetDb removes all log index data from the database. +func (f *FilterMaps) resetDb() { + var logged bool + for { + it := f.db.NewIterator(rawdb.FilterMapsPrefix, nil) + batch := f.db.NewBatch() + var count int + for ; count < 10000 && it.Next(); count++ { + batch.Delete(it.Key()) + } + it.Release() + if count == 0 { + break + } + if !logged { + log.Info("Resetting log index database...") + logged = true + } + batch.Write() + } + if logged { + log.Info("Resetting log index database finished") + } +} + +// setRange updates the covered range and also adds the changes to the given batch. +// Note that this function assumes that the read/write lock is being held. +func (f *FilterMaps) setRange(batch ethdb.Batch, newRange filterMapsRange) { + f.filterMapsRange = newRange + rs := rawdb.FilterMapsRange{ + Initialized: newRange.initialized, + HeadLvPointer: newRange.headLvPointer, + TailLvPointer: newRange.tailLvPointer, + HeadBlockNumber: newRange.headBlockNumber, + TailBlockNumber: newRange.tailBlockNumber, + HeadBlockHash: newRange.headBlockHash, + TailParentHash: newRange.tailParentHash, + } + rawdb.WriteFilterMapsRange(batch, rs) + f.updateMapCache() +} + +// updateMapCache updates the maps covered by the filterMapCache according to the +// covered range. +// Note that this function assumes that the read lock is being held. +func (f *FilterMaps) updateMapCache() { + if !f.initialized { + return + } + f.filterMapLock.Lock() + defer f.filterMapLock.Unlock() + + newFilterMapCache := make(map[uint32]*filterMap) + firstMap, afterLastMap := uint32(f.tailLvPointer>>logValuesPerMap), uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) + headCacheFirst := firstMap + 1 + if afterLastMap > headCacheFirst+headCacheSize { + headCacheFirst = afterLastMap - headCacheSize + } + fm := f.filterMapCache[firstMap] + if fm == nil { + fm = new(filterMap) + } + newFilterMapCache[firstMap] = fm + for mapIndex := headCacheFirst; mapIndex < afterLastMap; mapIndex++ { + fm := f.filterMapCache[mapIndex] + if fm == nil { + fm = new(filterMap) + } + newFilterMapCache[mapIndex] = fm + } + f.filterMapCache = newFilterMapCache +} + +// getLogByLvIndex returns the log at the given log value index. If the index does +// not point to the first log value entry of a log then no log and no error are +// returned as this can happen when the log value index was a false positive. +// Note that this function assumes that the log index structure is consistent +// with the canonical chain at the point where the given log value index points. +// If this is not the case then an invalid result or an error may be returned. +// Note that this function assumes that the read lock is being held. +func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { + if lvIndex < f.tailLvPointer || lvIndex > f.headLvPointer { + return nil, errors.New("log value index outside available range") + } + // find possible block range based on map to block pointers + mapIndex := uint32(lvIndex >> logValuesPerMap) + firstBlockNumber, err := f.getMapBlockPtr(mapIndex) + if err != nil { + return nil, err + } + var lastBlockNumber uint64 + if mapIndex+1 < uint32((f.headLvPointer+valuesPerMap-1)>>logValuesPerMap) { + lastBlockNumber, err = f.getMapBlockPtr(mapIndex + 1) + if err != nil { + return nil, err + } + } else { + lastBlockNumber = f.headBlockNumber + } + // find block with binary search based on block to log value index pointers + for firstBlockNumber < lastBlockNumber { + midBlockNumber := (firstBlockNumber + lastBlockNumber + 1) / 2 + midLvPointer, err := f.getBlockLvPointer(midBlockNumber) + if err != nil { + return nil, err + } + if lvIndex < midLvPointer { + lastBlockNumber = midBlockNumber - 1 + } else { + firstBlockNumber = midBlockNumber + } + } + // get block receipts + hash := f.chain.GetCanonicalHash(firstBlockNumber) + receipts := f.chain.GetReceiptsByHash(hash) //TODO small cache + if receipts == nil { + return nil, errors.New("receipts not found") + } + lvPointer, err := f.getBlockLvPointer(firstBlockNumber) + if err != nil { + return nil, err + } + // iterate through receipts to find the exact log starting at lvIndex + for _, receipt := range receipts { + for _, log := range receipt.Logs { + if lvPointer > lvIndex { + // lvIndex does not point to the first log value (address value) + // generated by a log as true matches should always do, so it + // is considered a false positive (no log and no error returned). + return nil, nil + } + if lvPointer == lvIndex { + return log, nil // potential match + } + lvPointer += uint64(len(log.Topics) + 1) + } + } + return nil, errors.New("log value index not found") +} + +// getFilterMapRow returns the given row of the given map. If the row is empty +// then a non-nil zero length row is returned. +// Note that the returned slices should not be modified, they should be copied +// on write. +func (f *FilterMaps) getFilterMapRow(mapIndex, rowIndex uint32) (FilterRow, error) { + f.filterMapLock.Lock() + defer f.filterMapLock.Unlock() + + fm := f.filterMapCache[mapIndex] + if fm != nil && fm[rowIndex] != nil { + return fm[rowIndex], nil + } + row, err := rawdb.ReadFilterMapRow(f.db, mapRowIndex(mapIndex, rowIndex)) + if err != nil { + return nil, err + } + if fm != nil { + fm[rowIndex] = FilterRow(row) + } + return FilterRow(row), nil +} + +// storeFilterMapRow stores a row at the given row index of the given map and also +// caches it in filterMapCache if the given map is cached. +// Note that empty rows are not stored in the database and therefore there is no +// separate delete function; deleting a row is the same as storing an empty row. +func (f *FilterMaps) storeFilterMapRow(batch ethdb.Batch, mapIndex, rowIndex uint32, row FilterRow) { + f.filterMapLock.Lock() + defer f.filterMapLock.Unlock() + + if fm := f.filterMapCache[mapIndex]; fm != nil { + (*fm)[rowIndex] = row + } + rawdb.WriteFilterMapRow(batch, mapRowIndex(mapIndex, rowIndex), []uint32(row)) +} + +// mapRowIndex calculates the unified storage index where the given row of the +// given map is stored. Note that this indexing scheme is the same as the one +// proposed in EIP-7745 for tree-hashing the filter map structure and for the +// same data proximity reasons it is also suitable for database representation. +// See also: +// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure +func mapRowIndex(mapIndex, rowIndex uint32) uint64 { + epochIndex, mapSubIndex := mapIndex>>logMapsPerEpoch, mapIndex%mapsPerEpoch + return (uint64(epochIndex)< f.headBlockNumber { + return f.headLvPointer, nil + } + if lvPointer, ok := f.lvPointerCache.Get(blockNumber); ok { + return lvPointer, nil + } + lvPointer, err := rawdb.ReadBlockLvPointer(f.db, blockNumber) + if err != nil { + return 0, err + } + f.lvPointerCache.Add(blockNumber, lvPointer) + return lvPointer, nil +} + +// storeBlockLvPointer stores the starting log value index where the log values +// generated by the given block are located. +func (f *FilterMaps) storeBlockLvPointer(batch ethdb.Batch, blockNumber, lvPointer uint64) { + f.lvPointerCache.Add(blockNumber, lvPointer) + rawdb.WriteBlockLvPointer(batch, blockNumber, lvPointer) +} + +// deleteBlockLvPointer deletes the starting log value index where the log values +// generated by the given block are located. +func (f *FilterMaps) deleteBlockLvPointer(batch ethdb.Batch, blockNumber uint64) { + f.lvPointerCache.Remove(blockNumber) + rawdb.DeleteBlockLvPointer(batch, blockNumber) +} + +// getMapBlockPtr returns the number of the block that generated the first log +// value entry of the given map. +func (f *FilterMaps) getMapBlockPtr(mapIndex uint32) (uint64, error) { + if blockPtr, ok := f.blockPtrCache.Get(mapIndex); ok { + return blockPtr, nil + } + blockPtr, err := rawdb.ReadFilterMapBlockPtr(f.db, mapIndex) + if err != nil { + return 0, err + } + f.blockPtrCache.Add(mapIndex, blockPtr) + return blockPtr, nil +} + +// storeMapBlockPtr stores the number of the block that generated the first log +// value entry of the given map. +func (f *FilterMaps) storeMapBlockPtr(batch ethdb.Batch, mapIndex uint32, blockPtr uint64) { + f.blockPtrCache.Add(mapIndex, blockPtr) + rawdb.WriteFilterMapBlockPtr(batch, mapIndex, blockPtr) +} + +// deleteMapBlockPtr deletes the number of the block that generated the first log +// value entry of the given map. +func (f *FilterMaps) deleteMapBlockPtr(batch ethdb.Batch, mapIndex uint32) { + f.blockPtrCache.Remove(mapIndex) + rawdb.DeleteFilterMapBlockPtr(batch, mapIndex) +} + +// addressValue returns the log value hash of a log emitting address. +func addressValue(address common.Address) common.Hash { + var result common.Hash + hasher := sha256.New() + hasher.Write(address[:]) + hasher.Sum(result[:0]) + return result +} + +// topicValue returns the log value hash of a log topic. +func topicValue(topic common.Hash) common.Hash { + var result common.Hash + hasher := sha256.New() + hasher.Write(topic[:]) + hasher.Sum(result[:0]) + return result +} + +// rowIndex returns the row index in which the given log value should be marked +// during the given epoch. Note that row assignments are re-shuffled in every +// epoch in order to ensure that even though there are always a few more heavily +// used rows due to very popular addresses and topics, these will not make search +// for other log values very expensive. Even if certain values are occasionally +// sorted into these heavy rows, in most of the epochs they are placed in average +// length rows. +func rowIndex(epochIndex uint32, logValue common.Hash) uint32 { + hasher := sha256.New() + hasher.Write(logValue[:]) + var indexEnc [4]byte + binary.LittleEndian.PutUint32(indexEnc[:], epochIndex) + hasher.Write(indexEnc[:]) + var hash common.Hash + hasher.Sum(hash[:0]) + return binary.LittleEndian.Uint32(hash[:4]) % mapHeight +} + +// columnIndex returns the column index that should be added to the appropriate +// row in order to place a mark for the next log value. +func columnIndex(lvIndex uint64, logValue common.Hash) uint32 { + x := uint32(lvIndex % valuesPerMap) // log value sub-index + transformHash := transformHash(uint32(lvIndex/valuesPerMap), logValue) + // apply column index transformation function + x += binary.LittleEndian.Uint32(transformHash[0:4]) + x *= binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1 + x ^= binary.LittleEndian.Uint32(transformHash[8:12]) + x *= binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1 + x += binary.LittleEndian.Uint32(transformHash[16:20]) + x *= binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1 + x ^= binary.LittleEndian.Uint32(transformHash[24:28]) + x *= binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1 + return x +} + +// transformHash calculates a hash specific to a given map and log value hash +// that defines a bijective function on the uint32 range. This function is used +// to transform the log value sub-index (distance from the first index of the map) +// into a 32 bit column index, then applied in reverse when searching for potential +// matches for a given log value. +func transformHash(mapIndex uint32, logValue common.Hash) (result common.Hash) { + hasher := sha256.New() + hasher.Write(logValue[:]) + var indexEnc [4]byte + binary.LittleEndian.PutUint32(indexEnc[:], mapIndex) + hasher.Write(indexEnc[:]) + hasher.Sum(result[:0]) + return +} + +// potentialMatches returns the list of log value indices potentially matching +// the given log value hash in the range of the filter map the row belongs to. +// Note that the list of indices is always sorted and potential duplicates are +// removed. Though the column indices are stored in the same order they were +// added and therefore the true matches are automatically reverse transformed +// in the right order, false positives can ruin this property. Since these can +// only be separated from true matches after the combined pattern matching of the +// outputs of individual log value matchers and this pattern matcher assumes a +// sorted and duplicate-free list of indices, we should ensure these properties +// here. +func (row FilterRow) potentialMatches(mapIndex uint32, logValue common.Hash) potentialMatches { + results := make(potentialMatches, 0, 8) + transformHash := transformHash(mapIndex, logValue) + sub1 := binary.LittleEndian.Uint32(transformHash[0:4]) + mul1 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[4:8])*2 + 1) + xor1 := binary.LittleEndian.Uint32(transformHash[8:12]) + mul2 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[12:16])*2 + 1) + sub2 := binary.LittleEndian.Uint32(transformHash[16:20]) + mul3 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[20:24])*2 + 1) + xor2 := binary.LittleEndian.Uint32(transformHash[24:28]) + mul4 := uint32ModInverse(binary.LittleEndian.Uint32(transformHash[28:32])*2 + 1) + // perform reverse column index transformation on all column indices of the row. + // if a column index was added by the searched log value then the reverse + // transform will yield a valid log value sub-index of the given map. + // Column index is 32 bits long while there are 2**16 valid log value indices + // in the map's range, so this can also happen by accident with 1 in 2**16 + // chance, in which case we have a false positive. + for _, columnIndex := range row { + if potentialSubIndex := (((((((columnIndex * mul4) ^ xor2) * mul3) - sub2) * mul2) ^ xor1) * mul1) - sub1; potentialSubIndex < valuesPerMap { + results = append(results, uint64(mapIndex)*valuesPerMap+uint64(potentialSubIndex)) + } + } + sort.Sort(results) + // remove duplicates + j := 0 + for i, match := range results { + if i == 0 || match != results[i-1] { + results[j] = results[i] + j++ + } + } + return results[:j] +} + +// potentialMatches is a strictly monotonically increasing list of log value +// indices in the range of a filter map that are potential matches for certain +// filter criteria. +// Note that nil is used as a wildcard and therefore means that all log value +// indices in the filter map range are potential matches. If there are no +// potential matches in the given map's range then an empty slice should be used. +type potentialMatches []uint64 + +// noMatches means there are no potential matches in a given filter map's range. +var noMatches = potentialMatches{} + +func (p potentialMatches) Len() int { return len(p) } +func (p potentialMatches) Less(i, j int) bool { return p[i] < p[j] } +func (p potentialMatches) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +// uint32ModInverse takes an odd 32 bit number and returns its modular +// multiplicative inverse (mod 2**32), meaning that for any uint32 x and odd y +// x * y * uint32ModInverse(y) == 1. +func uint32ModInverse(v uint32) uint32 { + if v&1 == 0 { + panic("uint32ModInverse called with even argument") + } + m := int64(1) << 32 + m0 := m + a := int64(v) + x, y := int64(1), int64(0) + for a > 1 { + q := a / m + m, a = a%m, m + x, y = y, x-q*y + } + if x < 0 { + x += m0 + } + return uint32(x) +} diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go new file mode 100644 index 000000000000..789bf4c4004d --- /dev/null +++ b/core/filtermaps/indexer.go @@ -0,0 +1,618 @@ +package filtermaps + +import ( + "errors" + "math" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/rawdb" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +const ( + startLvPointer = valuesPerMap << 31 // log value index assigned to init block + removedPointer = math.MaxUint64 // used in updateBatch to signal removed items + revertPointFrequency = 256 // frequency of revert points in database + cachedRevertPoints = 64 // revert points for most recent blocks in memory +) + +// updateLoop initializes and updates the log index structure according to the +// canonical chain. +func (f *FilterMaps) updateLoop() { + headEventCh := make(chan core.ChainHeadEvent) + sub := f.chain.SubscribeChainHeadEvent(headEventCh) + defer sub.Unsubscribe() + + head := f.chain.CurrentBlock() + if head == nil { + select { + case ev := <-headEventCh: + head = ev.Block.Header() + case ch := <-f.closeCh: + close(ch) + return + } + } + fmr := f.getRange() + + var stop bool + wait := func() { + if stop { + return + } + select { + case ev := <-headEventCh: + head = ev.Block.Header() + case <-time.After(time.Second * 20): + // keep updating log index during syncing + head = f.chain.CurrentBlock() + case ch := <-f.closeCh: + close(ch) + stop = true + } + } + + for !stop { + if !fmr.initialized { + f.tryInit(head) + fmr = f.getRange() + if !fmr.initialized { + wait() + continue + } + } + // log index is initialized + if fmr.headBlockHash != head.Hash() { + f.tryUpdateHead(head) + fmr = f.getRange() + if fmr.headBlockHash != head.Hash() { + wait() + continue + } + } + // log index head is at latest chain head; process tail blocks if possible + f.tryExtendTail(func() bool { + // return true if tail processing needs to be stopped + select { + case ev := <-headEventCh: + head = ev.Block.Header() + case ch := <-f.closeCh: + close(ch) + stop = true + return true + default: + head = f.chain.CurrentBlock() + } + // stop if there is a new chain head (always prioritize head updates) + return fmr.headBlockHash != head.Hash() + }) + if fmr.headBlockHash == head.Hash() { + // if tail processing exited while there is no new head then no more + // tail blocks can be processed + wait() + } + } +} + +// getRange returns the current filterMapsRange. +func (f *FilterMaps) getRange() filterMapsRange { + f.lock.RLock() + defer f.lock.RUnlock() + + return f.filterMapsRange +} + +// tryInit attempts to initialize the log index structure. +func (f *FilterMaps) tryInit(head *types.Header) { + receipts := f.chain.GetReceiptsByHash(head.Hash()) + if receipts == nil { + log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) + return + } + update := f.newUpdateBatch() + if err := update.initWithBlock(head, receipts); err != nil { + log.Error("Could not initialize log index", "error", err) + } + f.applyUpdateBatch(update) +} + +// tryUpdateHead attempts to update the log index with a new head. If necessary, +// it reverts to a common ancestor with the old head before adding new block logs. +// If no suitable revert point is available (probably a reorg just after init) +// then it resets the index and tries to re-initialize with the new head. +func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { + // iterate back from new head until the log index head or a revert point and + // collect headers of blocks to be added + var ( + newHeaders []*types.Header + chainPtr = newHead + rp *revertPoint + ) + for { + if rp == nil || chainPtr.Number.Uint64() < rp.blockNumber { + var err error + rp, err = f.getRevertPoint(chainPtr.Number.Uint64()) + if err != nil { + log.Error("Error fetching revert point", "block number", chainPtr.Number.Uint64(), "error", err) + return + } + if rp == nil { + // there are no more revert points available so we should reset and re-initialize + log.Warn("No suitable revert point exists; re-initializing log index", "block number", newHead.Number.Uint64()) + f.reset() + f.tryInit(newHead) + return + } + } + if chainPtr.Hash() == rp.blockHash { + // revert point found at an ancestor of the new head + break + } + // keep iterating backwards and collecting headers + newHeaders = append(newHeaders, chainPtr) + chainPtr = f.chain.GetHeader(chainPtr.ParentHash, chainPtr.Number.Uint64()-1) + if chainPtr == nil { + log.Error("Canonical header not found", "number", chainPtr.Number.Uint64()-1, "hash", chainPtr.ParentHash) + return + } + } + if rp.blockHash != f.headBlockHash { + if rp.blockNumber+128 <= f.headBlockNumber { + log.Warn("Rolling back log index", "old head", f.headBlockNumber, "new head", chainPtr.Number.Uint64()) + } + if err := f.revertTo(rp); err != nil { + log.Error("Error applying revert point", "block number", chainPtr.Number.Uint64(), "error", err) + return + } + } + + if newHeaders == nil { + return + } + // add logs of new blocks in reverse order + update := f.newUpdateBatch() + for i := len(newHeaders) - 1; i >= 0; i-- { + newHeader := newHeaders[i] + receipts := f.chain.GetReceiptsByHash(newHeader.Hash()) + if receipts == nil { + log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash()) + break + } + if err := update.addBlockToHead(newHeader, receipts); err != nil { + log.Error("Error adding new block", "number", newHeader.Number, "hash", newHeader.Hash(), "error", err) + break + } + if update.updatedRangeLength() >= mapsPerEpoch { + // limit the amount of data updated in a single batch + f.applyUpdateBatch(update) + update = f.newUpdateBatch() + } + } + f.applyUpdateBatch(update) +} + +// tryExtendTail attempts to extend the log index backwards until it indexes the +// genesis block or cannot find more block receipts. Since this is a long process, +// stopFn is called after adding each tail block and if it returns true, the +// latest batch is written and the function returns. +func (f *FilterMaps) tryExtendTail(stopFn func() bool) { + fmr := f.getRange() + number, parentHash := fmr.tailBlockNumber, fmr.tailParentHash + if number == 0 { + return + } + update := f.newUpdateBatch() + lastTailEpoch := update.tailEpoch() + for number > 0 && !stopFn() { + if tailEpoch := update.tailEpoch(); tailEpoch < lastTailEpoch { + // limit the amount of data updated in a single batch + f.applyUpdateBatch(update) + update = f.newUpdateBatch() + lastTailEpoch = tailEpoch + } + newTail := f.chain.GetHeader(parentHash, number-1) + if newTail == nil { + log.Error("Tail header not found", "number", number-1, "hash", parentHash) + break + } + receipts := f.chain.GetReceiptsByHash(newTail.Hash()) + if receipts == nil { + log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash()) + break + } + if err := update.addBlockToTail(newTail, receipts); err != nil { + log.Error("Error adding tail block", "number", newTail.Number, "hash", newTail.Hash(), "error", err) + break + } + number, parentHash = newTail.Number.Uint64(), newTail.ParentHash + } + f.applyUpdateBatch(update) +} + +// updateBatch is a memory overlay collecting changes to the index log structure +// that can be written to the database in a single batch while the in-memory +// representations in FilterMaps are also updated. +type updateBatch struct { + filterMapsRange + maps map[uint32]*filterMap // nil rows are unchanged + getFilterMapRow func(mapIndex, rowIndex uint32) (FilterRow, error) + blockLvPointer map[uint64]uint64 // removedPointer means delete + mapBlockPtr map[uint32]uint64 // removedPointer means delete + revertPoints map[uint64]*revertPoint + firstMap, afterLastMap uint32 +} + +// newUpdateBatch creates a new updateBatch. +func (f *FilterMaps) newUpdateBatch() *updateBatch { + f.lock.RLock() + defer f.lock.RUnlock() + + return &updateBatch{ + filterMapsRange: f.filterMapsRange, + maps: make(map[uint32]*filterMap), + getFilterMapRow: f.getFilterMapRow, + blockLvPointer: make(map[uint64]uint64), + mapBlockPtr: make(map[uint32]uint64), + revertPoints: make(map[uint64]*revertPoint), + } +} + +// applyUpdateBatch writes creates a batch and writes all changes to the database +// and also updates the in-memory representations of log index data. +func (f *FilterMaps) applyUpdateBatch(u *updateBatch) { + f.lock.Lock() + defer f.lock.Unlock() + + batch := f.db.NewBatch() + // write or remove block to log value index pointers + for blockNumber, lvPointer := range u.blockLvPointer { + if lvPointer != removedPointer { + f.storeBlockLvPointer(batch, blockNumber, lvPointer) + } else { + f.deleteBlockLvPointer(batch, blockNumber) + } + } + // write or remove filter map to block number pointers + for mapIndex, blockNumber := range u.mapBlockPtr { + if blockNumber != removedPointer { + f.storeMapBlockPtr(batch, mapIndex, blockNumber) + } else { + f.deleteMapBlockPtr(batch, mapIndex) + } + } + // write filter map rows + for rowIndex := uint32(0); rowIndex < mapHeight; rowIndex++ { + for mapIndex := u.firstMap; mapIndex < u.afterLastMap; mapIndex++ { + if fm := u.maps[mapIndex]; fm != nil { + if row := (*fm)[rowIndex]; row != nil { + f.storeFilterMapRow(batch, mapIndex, rowIndex, row) + } + } + } + } + // delete removed revert points from the database + if u.headBlockNumber < f.headBlockNumber { + for b := u.headBlockNumber + 1; b <= f.headBlockNumber; b++ { + delete(f.revertPoints, b) + if b%revertPointFrequency == 0 { + rawdb.DeleteRevertPoint(batch, b) + } + } + } + // delete removed revert points from the memory cache + if u.headBlockNumber > f.headBlockNumber { + for b := f.headBlockNumber + 1; b <= u.headBlockNumber; b++ { + delete(f.revertPoints, b-cachedRevertPoints) + } + } + // store new revert points in database and/or memory + for b, rp := range u.revertPoints { + if b+cachedRevertPoints > u.headBlockNumber { + f.revertPoints[b] = rp + } + if b%revertPointFrequency == 0 { + rawdb.WriteRevertPoint(batch, b, &rawdb.RevertPoint{ + BlockHash: rp.blockHash, + MapIndex: rp.mapIndex, + RowLength: rp.rowLength[:], + }) + } + } + // update filterMapsRange + f.setRange(batch, u.filterMapsRange) + if err := batch.Write(); err != nil { + log.Crit("Could not write update batch", "error", err) + } + log.Info("Log index block range updated", "tail", u.tailBlockNumber, "head", u.headBlockNumber, "log values", u.headLvPointer-u.tailLvPointer) +} + +// updatedRangeLength returns the lenght of the updated filter map range. +func (u *updateBatch) updatedRangeLength() uint32 { + return u.afterLastMap - u.firstMap +} + +// tailEpoch returns the tail epoch index. +func (u *updateBatch) tailEpoch() uint32 { + return uint32(u.tailLvPointer >> (logValuesPerMap + logMapsPerEpoch)) +} + +// getRowPtr returns a pointer to a FilterRow that can be modified. If the batch +// did not have a modified version of the given row yet, it is retrieved using the +// request function from the backing FilterMaps cache or database and copied +// before modification. +func (u *updateBatch) getRowPtr(mapIndex, rowIndex uint32) (*FilterRow, error) { + fm := u.maps[mapIndex] + if fm == nil { + fm = new(filterMap) + u.maps[mapIndex] = fm + if mapIndex < u.firstMap || u.afterLastMap == 0 { + u.firstMap = mapIndex + } + if mapIndex >= u.afterLastMap { + u.afterLastMap = mapIndex + 1 + } + } + rowPtr := &(*fm)[rowIndex] + if *rowPtr == nil { + if filterRow, err := u.getFilterMapRow(mapIndex, rowIndex); err == nil { + // filterRow is read only, copy before write + *rowPtr = make(FilterRow, len(filterRow), len(filterRow)+8) + copy(*rowPtr, filterRow) + } else { + return nil, err + } + } + return rowPtr, nil +} + +// initWithBlock initializes the log index with the given block as head. +func (u *updateBatch) initWithBlock(header *types.Header, receipts types.Receipts) error { + if u.initialized { + return errors.New("already initialized") + } + u.initialized = true + u.headLvPointer, u.tailLvPointer = startLvPointer, startLvPointer + u.headBlockNumber, u.tailBlockNumber = header.Number.Uint64()-1, header.Number.Uint64() //TODO genesis? + u.headBlockHash, u.tailParentHash = header.ParentHash, header.ParentHash + u.addBlockToHead(header, receipts) + return nil +} + +// addValueToHead adds a single log value to the head of the log index. +func (u *updateBatch) addValueToHead(logValue common.Hash) error { + mapIndex := uint32(u.headLvPointer >> logValuesPerMap) + rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) + if err != nil { + return err + } + column := columnIndex(u.headLvPointer, logValue) + *rowPtr = append(*rowPtr, column) + u.headLvPointer++ + return nil +} + +// addBlockToHead adds the logs of the given block to the head of the log index. +// It also adds block to log value index and filter map to block pointers and +// a new revert point. +func (u *updateBatch) addBlockToHead(header *types.Header, receipts types.Receipts) error { + if !u.initialized { + return errors.New("not initialized") + } + if header.ParentHash != u.headBlockHash { + return errors.New("addBlockToHead parent mismatch") + } + number := header.Number.Uint64() + u.blockLvPointer[number] = u.headLvPointer + startMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) + if err := iterateReceipts(receipts, u.addValueToHead); err != nil { + return err + } + stopMap := uint32((u.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) + for m := startMap; m < stopMap; m++ { + u.mapBlockPtr[m] = number + } + u.headBlockNumber, u.headBlockHash = number, header.Hash() + if (u.headBlockNumber-cachedRevertPoints)%revertPointFrequency != 0 { + delete(u.revertPoints, u.headBlockNumber-cachedRevertPoints) + } + if rp, err := u.makeRevertPoint(); err != nil { + return err + } else if rp != nil { + u.revertPoints[u.headBlockNumber] = rp + } + return nil +} + +// addValueToTail adds a single log value to the tail of the log index. +func (u *updateBatch) addValueToTail(logValue common.Hash) error { + if u.tailLvPointer == 0 { + return errors.New("tail log value pointer underflow") + } + u.tailLvPointer-- + mapIndex := uint32(u.tailLvPointer >> logValuesPerMap) + rowPtr, err := u.getRowPtr(mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, logValue)) + if err != nil { + return err + } + column := columnIndex(u.tailLvPointer, logValue) + *rowPtr = append(*rowPtr, 0) + copy((*rowPtr)[1:], (*rowPtr)[:len(*rowPtr)-1]) + (*rowPtr)[0] = column + return nil +} + +// addBlockToTail adds the logs of the given block to the tail of the log index. +// It also adds block to log value index and filter map to block pointers. +func (u *updateBatch) addBlockToTail(header *types.Header, receipts types.Receipts) error { + if !u.initialized { + return errors.New("not initialized") + } + if header.Hash() != u.tailParentHash { + return errors.New("addBlockToTail parent mismatch") + } + number := header.Number.Uint64() + stopMap := uint32((u.tailLvPointer + valuesPerMap - 1) >> logValuesPerMap) + var cnt int + if err := iterateReceiptsReverse(receipts, func(lv common.Hash) error { + cnt++ + return u.addValueToTail(lv) + }); err != nil { + return err + } + startMap := uint32(u.tailLvPointer >> logValuesPerMap) + for m := startMap; m < stopMap; m++ { + u.mapBlockPtr[m] = number + } + u.blockLvPointer[number] = u.tailLvPointer + u.tailBlockNumber, u.tailParentHash = number, header.ParentHash + return nil +} + +// iterateReceipts iterates the given block receipts, generates log value hashes +// and passes them to the given callback function as a parameter. +func iterateReceipts(receipts types.Receipts, valueCb func(common.Hash) error) error { + for _, receipt := range receipts { + for _, log := range receipt.Logs { + if err := valueCb(addressValue(log.Address)); err != nil { + return err + } + for _, topic := range log.Topics { + if err := valueCb(topicValue(topic)); err != nil { + return err + } + } + } + } + return nil +} + +// iterateReceiptsReverse iterates the given block receipts, generates log value +// hashes in reverse order and passes them to the given callback function as a +// parameter. +func iterateReceiptsReverse(receipts types.Receipts, valueCb func(common.Hash) error) error { + for i := len(receipts) - 1; i >= 0; i-- { + logs := receipts[i].Logs + for j := len(logs) - 1; j >= 0; j-- { + log := logs[j] + for k := len(log.Topics) - 1; k >= 0; k-- { + if err := valueCb(topicValue(log.Topics[k])); err != nil { + return err + } + } + if err := valueCb(addressValue(log.Address)); err != nil { + return err + } + } + } + return nil +} + +// revertPoint can be used to revert the log index to a certain head block. +type revertPoint struct { + blockNumber uint64 + blockHash common.Hash + mapIndex uint32 + rowLength [mapHeight]uint +} + +// makeRevertPoint creates a new revertPoint. +func (u *updateBatch) makeRevertPoint() (*revertPoint, error) { + rp := &revertPoint{ + blockNumber: u.headBlockNumber, + blockHash: u.headBlockHash, + mapIndex: uint32(u.headLvPointer >> logValuesPerMap), + } + if u.tailLvPointer > uint64(rp.mapIndex)< f.headBlockNumber { + blockNumber = f.headBlockNumber + } + if rp := f.revertPoints[blockNumber]; rp != nil { + return rp, nil + } + blockNumber -= blockNumber % revertPointFrequency + rps, err := rawdb.ReadRevertPoint(f.db, blockNumber) + if err != nil { + return nil, err + } + if rps == nil { + return nil, nil + } + if len(rps.RowLength) != mapHeight { + return nil, errors.New("invalid number of rows in stored revert point") + } + rp := &revertPoint{ + blockNumber: blockNumber, + blockHash: rps.BlockHash, + mapIndex: rps.MapIndex, + } + copy(rp.rowLength[:], rps.RowLength) + return rp, nil +} + +// revertTo reverts the log index to the given revert point. +func (f *FilterMaps) revertTo(rp *revertPoint) error { + batch := f.db.NewBatch() + afterLastMap := uint32((f.headLvPointer + valuesPerMap - 1) >> logValuesPerMap) + if rp.mapIndex >= afterLastMap { + return errors.New("cannot revert (head map behind revert point)") + } + lvPointer := uint64(rp.mapIndex) << logValuesPerMap + for rowIndex, rowLen := range rp.rowLength[:] { + rowIndex := uint32(rowIndex) + row, err := f.getFilterMapRow(rp.mapIndex, rowIndex) + if err != nil { + return err + } + if uint(len(row)) < rowLen { + return errors.New("cannot revert (row too short)") + } + if uint(len(row)) > rowLen { + f.storeFilterMapRow(batch, rp.mapIndex, rowIndex, row[:rowLen]) + } + for mapIndex := rp.mapIndex + 1; mapIndex < afterLastMap; mapIndex++ { + f.storeFilterMapRow(batch, mapIndex, rowIndex, emptyRow) + } + lvPointer += uint64(rowLen) + } + for mapIndex := rp.mapIndex + 1; mapIndex < afterLastMap; mapIndex++ { + f.deleteMapBlockPtr(batch, mapIndex) + } + for blockNumber := rp.blockNumber + 1; blockNumber <= f.headBlockNumber; blockNumber++ { + f.deleteBlockLvPointer(batch, blockNumber) + } + newRange := f.filterMapsRange + newRange.headLvPointer = lvPointer + newRange.headBlockNumber = rp.blockNumber + newRange.headBlockHash = rp.blockHash + f.setRange(batch, newRange) + if err := batch.Write(); err != nil { + log.Crit("Could not write update batch", "error", err) + } + return nil +} diff --git a/core/filtermaps/matcher.go b/core/filtermaps/matcher.go new file mode 100644 index 000000000000..3bc08494e19a --- /dev/null +++ b/core/filtermaps/matcher.go @@ -0,0 +1,500 @@ +package filtermaps + +import ( + "context" + "math" + "sync" + "sync/atomic" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" +) + +// MatcherBackend defines the functions required for searching in the log index +// data structure. It is currently implemented by FilterMapsMatcherBackend but +// once EIP-7745 is implemented and active, these functions can also be trustlessly +// served by a remote prover. +type MatcherBackend interface { + GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) + GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (FilterRow, error) + GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) +} + +// GetPotentialMatches returns a list of logs that are potential matches for the +// given filter criteria. Note that the returned list may still contain false +// positives. +//TODO add protection against reorgs during search +func GetPotentialMatches(ctx context.Context, backend MatcherBackend, firstBlock, lastBlock uint64, addresses []common.Address, topics [][]common.Hash) ([]*types.Log, error) { + // find the log value index range to search + firstIndex, err := backend.GetBlockLvPointer(ctx, firstBlock) + if err != nil { + return nil, err + } + lastIndex, err := backend.GetBlockLvPointer(ctx, lastBlock+1) + if err != nil { + return nil, err + } + if lastIndex > 0 { + lastIndex-- + } + firstMap, lastMap := uint32(firstIndex>>logValuesPerMap), uint32(lastIndex>>logValuesPerMap) + firstEpoch, lastEpoch := firstMap>>logMapsPerEpoch, lastMap>>logMapsPerEpoch + + // build matcher according to the given filter criteria + matchers := make([]matcher, len(topics)+1) + // matchAddress signals a match when there is a match for any of the given + // addresses. + // If the list of addresses is empty then it creates a "wild card" matcher + // that signals every index as a potential match. + matchAddress := make(matchAny, len(addresses)) + for i, address := range addresses { + matchAddress[i] = &singleMatcher{backend: backend, value: addressValue(address)} + } + matchers[0] = matchAddress + for i, topicList := range topics { + // matchTopic signals a match when there is a match for any of the topics + // specified for the given position (topicList). + // If topicList is empty then it creates a "wild card" matcher that signals + // every index as a potential match. + matchTopic := make(matchAny, len(topicList)) + for j, topic := range topicList { + matchTopic[j] = &singleMatcher{backend: backend, value: topicValue(topic)} + } + matchers[i+1] = matchTopic + } + // matcher is the final sequence matcher that signals a match when all underlying + // matchers signal a match for consecutive log value indices. + matcher := newMatchSequence(matchers) + + // processEpoch returns the potentially matching logs from the given epoch. + processEpoch := func(epochIndex uint32) ([]*types.Log, error) { + var logs []*types.Log + // create a list of map indices to process + fm, lm := epochIndex< lastMap { + lm = lastMap + } + // + mapIndices := make([]uint32, lm+1-fm) + for i := range mapIndices { + mapIndices[i] = fm + uint32(i) + } + // find potential matches + matches, err := matcher.getMatches(ctx, mapIndices) + if err != nil { + return logs, err + } + // get the actual logs located at the matching log value indices + for _, m := range matches { + mlogs, err := getLogsFromMatches(ctx, backend, firstIndex, lastIndex, m) + if err != nil { + return logs, err + } + logs = append(logs, mlogs...) + } + return logs, nil + } + + type task struct { + epochIndex uint32 + logs []*types.Log + err error + done chan struct{} + } + + taskCh := make(chan *task) + var wg sync.WaitGroup + defer func() { + close(taskCh) + wg.Wait() + }() + + worker := func() { + for task := range taskCh { + if task == nil { + break + } + task.logs, task.err = processEpoch(task.epochIndex) + close(task.done) + } + wg.Done() + return + } + + for i := 0; i < 4; i++ { + wg.Add(1) + go worker() + } + + var logs []*types.Log + // startEpoch is the next task to send whenever a worker can accept it. + // waitEpoch is the next task we are waiting for to finish in order to append + // results in the correct order. + startEpoch, waitEpoch := firstEpoch, firstEpoch + tasks := make(map[uint32]*task) + tasks[startEpoch] = &task{epochIndex: startEpoch, done: make(chan struct{})} + for waitEpoch <= lastEpoch { + select { + case taskCh <- tasks[startEpoch]: + startEpoch++ + if startEpoch <= lastEpoch { + if tasks[startEpoch] == nil { + tasks[startEpoch] = &task{epochIndex: startEpoch, done: make(chan struct{})} + } + } + case <-tasks[waitEpoch].done: + logs = append(logs, tasks[waitEpoch].logs...) + if err := tasks[waitEpoch].err; err != nil { + return logs, err + } + delete(tasks, waitEpoch) + waitEpoch++ + if waitEpoch <= lastEpoch { + if tasks[waitEpoch] == nil { + tasks[waitEpoch] = &task{epochIndex: waitEpoch, done: make(chan struct{})} + } + } + } + } + return logs, nil +} + +// getLogsFromMatches returns the list of potentially matching logs located at +// the given list of matching log indices. Matches outside the firstIndex to +// lastIndex range are not returned. +func getLogsFromMatches(ctx context.Context, backend MatcherBackend, firstIndex, lastIndex uint64, matches potentialMatches) ([]*types.Log, error) { + var logs []*types.Log + for _, match := range matches { + if match < firstIndex || match > lastIndex { + continue + } + log, err := backend.GetLogByLvIndex(ctx, match) + if err != nil { + return logs, err + } + if log != nil { + logs = append(logs, log) + } + } + return logs, nil +} + +// matcher interface is defined so that individual address/topic matchers can be +// combined into a pattern matcher (see matchAny and matchSequence). +type matcher interface { + // getMatches takes a list of map indices and returns an equal number of + // potentialMatches, one for each corresponding map index. + // Note that the map index list is typically a list of the potentially + // interesting maps from an epoch, plus sometimes the first map of the next + // epoch if it is required for sequence matching. + getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) +} + +// singleMatcher implements matcher by returning matches for a single log value hash. +type singleMatcher struct { + backend MatcherBackend + value common.Hash +} + +// getMatches implements matcher +func (s *singleMatcher) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { + results := make([]potentialMatches, len(mapIndices)) + for i, mapIndex := range mapIndices { + filterRow, err := s.backend.GetFilterMapRow(ctx, mapIndex, rowIndex(mapIndex>>logMapsPerEpoch, s.value)) + if err != nil { + return nil, err + } + results[i] = filterRow.potentialMatches(mapIndex, s.value) + } + return results, nil +} + +// matchAny combinines a set of matchers and returns a match for every position +// where any of the underlying matchers signaled a match. A zero-length matchAny +// acts as a "wild card" that signals a potential match at every position. +type matchAny []matcher + +// getMatches implements matcher +func (m matchAny) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { + if len(m) == 0 { + // return "wild card" results (potentialMatches(nil) is interpreted as a + // potential match at every log value index of the map). + return make([]potentialMatches, len(mapIndices)), nil + } + if len(m) == 1 { + return m[0].getMatches(ctx, mapIndices) + } + matches := make([][]potentialMatches, len(m)) + for i, matcher := range m { + var err error + if matches[i], err = matcher.getMatches(ctx, mapIndices); err != nil { + return nil, err + } + } + results := make([]potentialMatches, len(mapIndices)) + merge := make([]potentialMatches, len(m)) + for i := range results { + for j := range merge { + merge[j] = matches[j][i] + } + results[i] = mergeResults(merge) + } + return results, nil +} + +// mergeResults merges multiple lists of matches into a single one, preserving +// ascending order and filtering out any duplicates. +func mergeResults(results []potentialMatches) potentialMatches { + if len(results) == 0 { + return nil + } + var sumLen int + for _, res := range results { + if res == nil { + // nil is a wild card; all indices in map range are potential matches + return nil + } + sumLen += len(res) + } + merged := make(potentialMatches, 0, sumLen) + for { + best := -1 + for i, res := range results { + if len(res) == 0 { + continue + } + if best < 0 || res[0] < results[best][0] { + best = i + } + } + if best < 0 { + return merged + } + if len(merged) == 0 || results[best][0] > merged[len(merged)-1] { + merged = append(merged, results[best][0]) + } + results[best] = results[best][1:] + } +} + +// matchSequence combines two matchers, a "base" and a "next" matcher with a +// positive integer offset so that the resulting matcher signals a match at log +// value index X when the base matcher returns a match at X and the next matcher +// gives a match at X+offset. Note that matchSequence can be used recursively to +// detect any log value sequence. +type matchSequence struct { + base, next matcher + offset uint64 + // *EmptyRate == totalCount << 32 + emptyCount (atomically accessed) + baseEmptyRate, nextEmptyRate uint64 +} + +// newMatchSequence creates a recursive sequence matcher from a list of underlying +// matchers. The resulting matcher signals a match at log value index X when each +// underlying matcher matchers[i] returns a match at X+i. +func newMatchSequence(matchers []matcher) matcher { + if len(matchers) == 0 { + panic("zero length sequence matchers are not allowed") + } + if len(matchers) == 1 { + return matchers[0] + } + return &matchSequence{ + base: newMatchSequence(matchers[:len(matchers)-1]), + next: matchers[len(matchers)-1], + offset: uint64(len(matchers) - 1), + } +} + +// getMatches implements matcher +func (m *matchSequence) getMatches(ctx context.Context, mapIndices []uint32) ([]potentialMatches, error) { + // decide whether to evaluate base or next matcher first + baseEmptyRate := atomic.LoadUint64(&m.baseEmptyRate) + nextEmptyRate := atomic.LoadUint64(&m.nextEmptyRate) + baseTotal, baseEmpty := baseEmptyRate>>32, uint64(uint32(baseEmptyRate)) + nextTotal, nextEmpty := nextEmptyRate>>32, uint64(uint32(nextEmptyRate)) + baseFirst := baseEmpty*nextTotal >= nextEmpty*baseTotal/2 + + var ( + baseRes, nextRes []potentialMatches + baseIndices []uint32 + ) + if baseFirst { + // base first mode; request base matcher + baseIndices = mapIndices + var err error + baseRes, err = m.base.getMatches(ctx, baseIndices) + if err != nil { + return nil, err + } + } + + // determine set of indices to request from next matcher + nextIndices := make([]uint32, 0, len(mapIndices)*3/2) + lastAdded := uint32(math.MaxUint32) + for i, mapIndex := range mapIndices { + if baseFirst && baseRes[i] != nil && len(baseRes[i]) == 0 { + // do not request map index from next matcher if no results from base matcher + continue + } + if lastAdded != mapIndex { + nextIndices = append(nextIndices, mapIndex) + lastAdded = mapIndex + } + if !baseFirst || baseRes[i] == nil || baseRes[i][len(baseRes[i])-1] >= (uint64(mapIndex+1)<= len(nextIndices) { + break + } + if nextIndices[nextPtr] != mapIndex || nextIndices[nextPtr+1] != mapIndex+1 { + panic("invalid nextIndices") + } + next1, next2 := nextRes[nextPtr], nextRes[nextPtr+1] + if next1 == nil || (len(next1) > 0 && next1[len(next1)-1] >= (uint64(mapIndex)< 0 && next2[0] < (uint64(mapIndex+1)< 0 { + // discard items from nextRes whose corresponding base matcher results + // with the negative offset applied would be located at mapIndex-1. + start := 0 + for start < len(nextRes) && nextRes[start] < uint64(mapIndex)< 0 { + // discard items from nextNextRes whose corresponding base matcher results + // with the negative offset applied would still be located at mapIndex+1. + stop := 0 + for stop < len(nextNextRes) && nextNextRes[stop] < uint64(mapIndex+1)< 0 && len(baseRes) > 0 { + if nextRes[0] > baseRes[0]+offset { + baseRes = baseRes[1:] + } else if nextRes[0] < baseRes[0]+offset { + nextRes = nextRes[1:] + } else { + matchedRes = append(matchedRes, baseRes[0]) + baseRes = baseRes[1:] + nextRes = nextRes[1:] + } + } + } else { + // baseRes is a wild card so just return next matcher results with + // negative offset. + for len(nextRes) > 0 { + matchedRes = append(matchedRes, nextRes[0]-offset) + nextRes = nextRes[1:] + } + } + } + return matchedRes +} diff --git a/core/rawdb/accessors_indexes.go b/core/rawdb/accessors_indexes.go index 4f2ef0a88083..d1b0cf5053a6 100644 --- a/core/rawdb/accessors_indexes.go +++ b/core/rawdb/accessors_indexes.go @@ -18,6 +18,8 @@ package rawdb import ( "bytes" + "encoding/binary" + "errors" "math/big" "github.com/ethereum/go-ethereum/common" @@ -179,3 +181,207 @@ func DeleteBloombits(db ethdb.Database, bit uint, from uint64, to uint64) { log.Crit("Failed to delete bloom bits", "err", it.Error()) } } + +var emptyRow = []uint32{} + +// ReadFilterMapRow retrieves a filter map row at the given mapRowIndex +// (see filtermaps.mapRowIndex for the storage index encoding). +// Note that zero length rows are not stored in the database and therefore all +// non-existent entries are interpreted as empty rows and return no error. +// Also note that the mapRowIndex indexing scheme is the same as the one +// proposed in EIP-7745 for tree-hashing the filter map structure and for the +// same data proximity reasons it is also suitable for database representation. +// See also: +// https://eips.ethereum.org/EIPS/eip-7745#hash-tree-structure +func ReadFilterMapRow(db ethdb.KeyValueReader, mapRowIndex uint64) ([]uint32, error) { + key := filterMapRowKey(mapRowIndex) + has, err := db.Has(key) + if err != nil { + return nil, err + } + if !has { + return emptyRow, nil + } + encRow, err := db.Get(key) + if err != nil { + return nil, err + } + if len(encRow)&3 != 0 { + return nil, errors.New("Invalid encoded filter row length") + } + row := make([]uint32, len(encRow)/4) + for i := range row { + row[i] = binary.LittleEndian.Uint32(encRow[i*4 : (i+1)*4]) + } + return row, nil +} + +// WriteFilterMapRow stores a filter map row at the given mapRowIndex or deletes +// any existing entry if the row is empty. +func WriteFilterMapRow(db ethdb.KeyValueWriter, mapRowIndex uint64, row []uint32) { + var err error + if len(row) > 0 { + encRow := make([]byte, len(row)*4) + for i, c := range row { + binary.LittleEndian.PutUint32(encRow[i*4:(i+1)*4], c) + } + err = db.Put(filterMapRowKey(mapRowIndex), encRow) + } else { + err = db.Delete(filterMapRowKey(mapRowIndex)) + } + if err != nil { + log.Crit("Failed to store filter map row", "err", err) + } +} + +// ReadFilterMapBlockPtr retrieves the number of the block that generated the +// first log value entry of the given map. +func ReadFilterMapBlockPtr(db ethdb.KeyValueReader, mapIndex uint32) (uint64, error) { + encPtr, err := db.Get(filterMapBlockPtrKey(mapIndex)) + if err != nil { + return 0, err + } + if len(encPtr) != 8 { + return 0, errors.New("Invalid block number encoding") + } + return binary.BigEndian.Uint64(encPtr), nil +} + +// WriteFilterMapBlockPtr stores the number of the block that generated the +// first log value entry of the given map. +func WriteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32, blockNumber uint64) { + var encPtr [8]byte + binary.BigEndian.PutUint64(encPtr[:], blockNumber) + if err := db.Put(filterMapBlockPtrKey(mapIndex), encPtr[:]); err != nil { + log.Crit("Failed to store filter map block pointer", "err", err) + } +} + +// DeleteFilterMapBlockPtr deletes the number of the block that generated the +// first log value entry of the given map. +func DeleteFilterMapBlockPtr(db ethdb.KeyValueWriter, mapIndex uint32) { + if err := db.Delete(filterMapBlockPtrKey(mapIndex)); err != nil { + log.Crit("Failed to delete filter map block pointer", "err", err) + } +} + +// ReadBlockLvPointer retrieves the starting log value index where the log values +// generated by the given block are located. +func ReadBlockLvPointer(db ethdb.KeyValueReader, blockNumber uint64) (uint64, error) { + encPtr, err := db.Get(blockLVKey(blockNumber)) + if err != nil { + return 0, err + } + if len(encPtr) != 8 { + return 0, errors.New("Invalid log value pointer encoding") + } + return binary.BigEndian.Uint64(encPtr), nil +} + +// WriteBlockLvPointer stores the starting log value index where the log values +// generated by the given block are located. +func WriteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber, lvPointer uint64) { + var encPtr [8]byte + binary.BigEndian.PutUint64(encPtr[:], lvPointer) + if err := db.Put(blockLVKey(blockNumber), encPtr[:]); err != nil { + log.Crit("Failed to store block log value pointer", "err", err) + } +} + +// DeleteBlockLvPointer deletes the starting log value index where the log values +// generated by the given block are located. +func DeleteBlockLvPointer(db ethdb.KeyValueWriter, blockNumber uint64) { + if err := db.Delete(blockLVKey(blockNumber)); err != nil { + log.Crit("Failed to delete block log value pointer", "err", err) + } +} + +// FilterMapsRange is a storage representation of the block range covered by the +// filter maps structure and the corresponting log value index range. +type FilterMapsRange struct { + Initialized bool + HeadLvPointer, TailLvPointer uint64 + HeadBlockNumber, TailBlockNumber uint64 + HeadBlockHash, TailParentHash common.Hash +} + +// ReadFilterMapsRange retrieves the filter maps range data. Note that if the +// database entry is not present, that is interpreted as a valid non-initialized +// state and returns a blank range structure and no error. +func ReadFilterMapsRange(db ethdb.KeyValueReader) (FilterMapsRange, error) { + if has, err := db.Has(filterMapsRangeKey); !has || err != nil { + return FilterMapsRange{}, err + } + encRange, err := db.Get(filterMapsRangeKey) + if err != nil { + return FilterMapsRange{}, err + } + var fmRange FilterMapsRange + if err := rlp.DecodeBytes(encRange, &fmRange); err != nil { + return FilterMapsRange{}, err + } + return fmRange, err +} + +// WriteFilterMapsRange stores the filter maps range data. +func WriteFilterMapsRange(db ethdb.KeyValueWriter, fmRange FilterMapsRange) { + encRange, err := rlp.EncodeToBytes(&fmRange) + if err != nil { + log.Crit("Failed to encode filter maps range", "err", err) + } + if err := db.Put(filterMapsRangeKey, encRange); err != nil { + log.Crit("Failed to store filter maps range", "err", err) + } +} + +// DeleteFilterMapsRange deletes the filter maps range data which is interpreted +// as reverting to the un-initialized state. +func DeleteFilterMapsRange(db ethdb.KeyValueWriter) { + if err := db.Delete(filterMapsRangeKey); err != nil { + log.Crit("Failed to delete filter maps range", "err", err) + } +} + +// RevertPoint is the storage representation of a filter maps revert point. +type RevertPoint struct { + BlockHash common.Hash + MapIndex uint32 + RowLength []uint +} + +// ReadRevertPoint retrieves the revert point for the given block number if +// present. Note that revert points may or may not exist for any block number +// and a non-existent entry causes no error. +func ReadRevertPoint(db ethdb.KeyValueReader, blockNumber uint64) (*RevertPoint, error) { + key := revertPointKey(blockNumber) + if has, err := db.Has(key); !has || err != nil { + return nil, err + } + enc, err := db.Get(key) + if err != nil { + return nil, err + } + rp := new(RevertPoint) + if err := rlp.DecodeBytes(enc, rp); err != nil { + return nil, err + } + return rp, nil +} + +// WriteRevertPoint stores a revert point for the given block number. +func WriteRevertPoint(db ethdb.KeyValueWriter, blockNumber uint64, rp *RevertPoint) { + enc, err := rlp.EncodeToBytes(rp) + if err != nil { + log.Crit("Failed to encode revert point", "err", err) + } + if err := db.Put(revertPointKey(blockNumber), enc); err != nil { + log.Crit("Failed to store revert point", "err", err) + } +} + +// DeleteRevertPoint deletes the given revert point. +func DeleteRevertPoint(db ethdb.KeyValueWriter, blockNumber uint64) { + if err := db.Delete(revertPointKey(blockNumber)); err != nil { + log.Crit("Failed to delete revert point", "err", err) + } +} diff --git a/core/rawdb/schema.go b/core/rawdb/schema.go index 04b5d0d6d2c8..25c4d88a5463 100644 --- a/core/rawdb/schema.go +++ b/core/rawdb/schema.go @@ -145,6 +145,13 @@ var ( FixedCommitteeRootKey = []byte("fixedRoot-") // bigEndian64(syncPeriod) -> committee root hash SyncCommitteeKey = []byte("committee-") // bigEndian64(syncPeriod) -> serialized committee + FilterMapsPrefix = []byte("fT5-") //TODO fm- + filterMapsRangeKey = append(FilterMapsPrefix, byte('R')) + filterMapRowPrefix = append(FilterMapsPrefix, byte('r')) // filterMapRowPrefix + mapRowIndex (uint64 big endian) -> filter row + filterMapBlockPtrPrefix = append(FilterMapsPrefix, byte('b')) // filterMapBlockPtrPrefix + mapIndex (uint32 big endian) -> block number (uint64 big endian) + blockLVPrefix = append(FilterMapsPrefix, byte('p')) // blockLVPrefix + num (uint64 big endian) -> log value pointer (uint64 big endian) + revertPointPrefix = append(FilterMapsPrefix, byte('v')) // revertPointPrefix + num (uint64 big endian) -> revert data + preimageCounter = metrics.NewRegisteredCounter("db/preimage/total", nil) preimageHitCounter = metrics.NewRegisteredCounter("db/preimage/hits", nil) ) @@ -346,3 +353,27 @@ func IsStorageTrieNode(key []byte) bool { ok, _, _ := ResolveStorageTrieNode(key) return ok } + +// filterMapRowKey = filterMapRowPrefix + mapRowIndex (uint64 big endian) +func filterMapRowKey(mapRowIndex uint64) []byte { + key := append(filterMapRowPrefix, make([]byte, 8)...) + binary.BigEndian.PutUint64(key[1:], mapRowIndex) + return key +} + +// filterMapBlockPtrKey = filterMapBlockPtrPrefix + mapIndex (uint32 big endian) +func filterMapBlockPtrKey(mapIndex uint32) []byte { + key := append(filterMapBlockPtrPrefix, make([]byte, 4)...) + binary.BigEndian.PutUint32(key[1:], mapIndex) + return key +} + +// blockLVKey = blockLVPrefix + num (uint64 big endian) +func blockLVKey(number uint64) []byte { + return append(blockLVPrefix, encodeBlockNumber(number)...) +} + +// revertPointKey = revertPointPrefix + num (uint64 big endian) +func revertPointKey(number uint64) []byte { + return append(revertPointPrefix, encodeBlockNumber(number)...) +} diff --git a/eth/api_backend.go b/eth/api_backend.go index 8a9898b956f3..f00b12d40ec2 100644 --- a/eth/api_backend.go +++ b/eth/api_backend.go @@ -29,6 +29,7 @@ import ( "github.com/ethereum/go-ethereum/consensus/misc/eip4844" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/txpool" @@ -44,6 +45,7 @@ import ( // EthAPIBackend implements ethapi.Backend and tracers.Backend for full nodes type EthAPIBackend struct { + *filtermaps.FilterMapsMatcherBackend extRPCEnabled bool allowUnprotectedTxs bool eth *Ethereum diff --git a/eth/backend.go b/eth/backend.go index f7b67be4dc7c..30472beb5f18 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/state/pruner" "github.com/ethereum/go-ethereum/core/txpool" @@ -83,6 +84,8 @@ type Ethereum struct { bloomIndexer *core.ChainIndexer // Bloom indexer operating during block imports closeBloomHandler chan struct{} + filterMaps *filtermaps.FilterMaps + APIBackend *EthAPIBackend miner *miner.Miner @@ -221,6 +224,7 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { return nil, err } eth.bloomIndexer.Start(eth.blockchain) + eth.filterMaps = filtermaps.NewFilterMaps(chainDb, eth.blockchain) if config.BlobPool.Datadir != "" { config.BlobPool.Datadir = stack.ResolvePath(config.BlobPool.Datadir) @@ -255,7 +259,13 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) { eth.miner = miner.New(eth, config.Miner, eth.engine) eth.miner.SetExtra(makeExtraData(config.Miner.ExtraData)) - eth.APIBackend = &EthAPIBackend{stack.Config().ExtRPCEnabled(), stack.Config().AllowUnprotectedTxs, eth, nil} + eth.APIBackend = &EthAPIBackend{ + FilterMapsMatcherBackend: (*filtermaps.FilterMapsMatcherBackend)(eth.filterMaps), + extRPCEnabled: stack.Config().ExtRPCEnabled(), + allowUnprotectedTxs: stack.Config().AllowUnprotectedTxs, + eth: eth, + gpo: nil, + } if eth.APIBackend.allowUnprotectedTxs { log.Info("Unprotected transactions allowed") } @@ -407,6 +417,7 @@ func (s *Ethereum) Stop() error { // Then stop everything else. s.bloomIndexer.Close() + s.filterMaps.Close() close(s.closeBloomHandler) s.txPool.Close() s.blockchain.Stop() diff --git a/eth/filters/filter.go b/eth/filters/filter.go index 09ccb939073a..e3d1adc5fec4 100644 --- a/eth/filters/filter.go +++ b/eth/filters/filter.go @@ -19,11 +19,16 @@ package filters import ( "context" "errors" + "fmt" "math/big" + + //"reflect" "slices" + "time" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" ) @@ -35,8 +40,9 @@ type Filter struct { addresses []common.Address topics [][]common.Hash - block *common.Hash // Block hash if filtering a single block - begin, end int64 // Range interval if filtering multiple blocks + block *common.Hash // Block hash if filtering a single block + begin, end int64 // Range interval if filtering multiple blocks + bbMatchCount uint64 matcher *bloombits.Matcher } @@ -148,16 +154,28 @@ func (f *Filter) Logs(ctx context.Context) ([]*types.Log, error) { return nil, err } - logChan, errChan := f.rangeLogsAsync(ctx) - var logs []*types.Log - for { - select { - case log := <-logChan: - logs = append(logs, log) - case err := <-errChan: - return logs, err + start := time.Now() + logs, err := filtermaps.GetPotentialMatches(ctx, f.sys.backend, uint64(f.begin), uint64(f.end), f.addresses, f.topics) + fmLogs := filterLogs(logs, nil, nil, f.addresses, f.topics) + fmt.Println("filtermaps (new) runtime", time.Since(start), "true matches", len(fmLogs), "false positives", len(logs)-len(fmLogs)) + + //TODO remove + /*f.bbMatchCount = 0 + start = time.Now() + logChan, errChan := f.rangeLogsAsync(ctx) + var bbLogs []*types.Log + loop: + for { + select { + case log := <-logChan: + bbLogs = append(bbLogs, log) + case <-errChan: + break loop + } } - } + fmt.Println("bloombits (old) runtime", time.Since(start), "true matches", len(bbLogs), "false positives", f.bbMatchCount-uint64(len(bbLogs))) + fmt.Println("DeepEqual", reflect.DeepEqual(fmLogs, bbLogs))*/ + return fmLogs, err } // rangeLogsAsync retrieves block-range logs that match the filter criteria asynchronously, @@ -218,6 +236,7 @@ func (f *Filter) indexedLogs(ctx context.Context, end uint64, logChan chan *type for { select { case number, ok := <-matches: + f.bbMatchCount++ // Abort if all matches have been fulfilled if !ok { err := session.Error() diff --git a/eth/filters/filter_system.go b/eth/filters/filter_system.go index a3a2787a4144..62f4833607b8 100644 --- a/eth/filters/filter_system.go +++ b/eth/filters/filter_system.go @@ -30,6 +30,7 @@ import ( "github.com/ethereum/go-ethereum/common/lru" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" "github.com/ethereum/go-ethereum/event" @@ -71,6 +72,10 @@ type Backend interface { BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) + GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error) + GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) } // FilterSystem holds resources shared by all filters. diff --git a/internal/ethapi/backend.go b/internal/ethapi/backend.go index 2a45ba09210f..2012f3d83520 100644 --- a/internal/ethapi/backend.go +++ b/internal/ethapi/backend.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/consensus" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/bloombits" + "github.com/ethereum/go-ethereum/core/filtermaps" "github.com/ethereum/go-ethereum/core/state" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" @@ -97,6 +98,10 @@ type Backend interface { SubscribeLogsEvent(ch chan<- []*types.Log) event.Subscription BloomStatus() (uint64, uint64) ServiceFilter(ctx context.Context, session *bloombits.MatcherSession) + + GetBlockLvPointer(ctx context.Context, blockNumber uint64) (uint64, error) + GetFilterMapRow(ctx context.Context, mapIndex, rowIndex uint32) (filtermaps.FilterRow, error) + GetLogByLvIndex(ctx context.Context, lvIndex uint64) (*types.Log, error) } func GetAPIs(apiBackend Backend) []rpc.API { From 950de72ed3c11da954179fc694fc52858fe77526 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Tue, 17 Sep 2024 09:55:55 +0200 Subject: [PATCH 2/4] core/filtermaps: use rawdb.ReadRawReceipts --- core/filtermaps/filtermaps.go | 6 +++--- core/filtermaps/indexer.go | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index a265696041fa..87fec8a0a3aa 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -36,7 +36,7 @@ const ( // https://eips.ethereum.org/EIPS/eip-7745 type FilterMaps struct { lock sync.RWMutex - db ethdb.KeyValueStore + db ethdb.Database closeCh chan chan struct{} filterMapsRange @@ -86,7 +86,7 @@ type filterMapsRange struct { // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // the structure in sync with the given blockchain. -func NewFilterMaps(db ethdb.KeyValueStore, chain *core.BlockChain) *FilterMaps { +func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps { rs, err := rawdb.ReadFilterMapsRange(db) if err != nil { log.Error("Error reading log index range", "error", err) @@ -298,7 +298,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) { } // get block receipts hash := f.chain.GetCanonicalHash(firstBlockNumber) - receipts := f.chain.GetReceiptsByHash(hash) //TODO small cache + receipts := rawdb.ReadRawReceipts(f.db, hash, firstBlockNumber) //TODO small cache if receipts == nil { return nil, errors.New("receipts not found") } diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 789bf4c4004d..0d050923ee3b 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -107,7 +107,7 @@ func (f *FilterMaps) getRange() filterMapsRange { // tryInit attempts to initialize the log index structure. func (f *FilterMaps) tryInit(head *types.Header) { - receipts := f.chain.GetReceiptsByHash(head.Hash()) + receipts := rawdb.ReadRawReceipts(f.db, head.Hash(), head.Number.Uint64()) if receipts == nil { log.Error("Could not retrieve block receipts for init block", "number", head.Number, "hash", head.Hash()) return @@ -176,7 +176,7 @@ func (f *FilterMaps) tryUpdateHead(newHead *types.Header) { update := f.newUpdateBatch() for i := len(newHeaders) - 1; i >= 0; i-- { newHeader := newHeaders[i] - receipts := f.chain.GetReceiptsByHash(newHeader.Hash()) + receipts := rawdb.ReadRawReceipts(f.db, newHeader.Hash(), newHeader.Number.Uint64()) if receipts == nil { log.Error("Could not retrieve block receipts for new block", "number", newHeader.Number, "hash", newHeader.Hash()) break @@ -218,7 +218,7 @@ func (f *FilterMaps) tryExtendTail(stopFn func() bool) { log.Error("Tail header not found", "number", number-1, "hash", parentHash) break } - receipts := f.chain.GetReceiptsByHash(newTail.Hash()) + receipts := rawdb.ReadRawReceipts(f.db, newTail.Hash(), newTail.Number.Uint64()) if receipts == nil { log.Error("Could not retrieve block receipts for tail block", "number", newTail.Number, "hash", newTail.Hash()) break From 3ca2602aa44137cf9eec226a28041ee394aa073a Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Tue, 17 Sep 2024 09:56:48 +0200 Subject: [PATCH 3/4] core/filtermaps: add filtermaps tests --- core/filtermaps/filtermaps_test.go | 112 +++++++++++++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 core/filtermaps/filtermaps_test.go diff --git a/core/filtermaps/filtermaps_test.go b/core/filtermaps/filtermaps_test.go new file mode 100644 index 000000000000..70a4ce3b14ca --- /dev/null +++ b/core/filtermaps/filtermaps_test.go @@ -0,0 +1,112 @@ +package filtermaps + +import ( + "math/rand" + "testing" + + "github.com/ethereum/go-ethereum/common" +) + +func TestSingleMatch(t *testing.T) { + for count := 0; count < 100000; count++ { + // generate a row with a single random entry + mapIndex := rand.Uint32() + lvIndex := uint64(mapIndex)< 0; i-- { + j := rand.Intn(i) + row[i], row[j] = row[j], row[i] + } + // check retrieved matches while also counting false positives + for i, lvHash := range lvHashes { + matches := row.potentialMatches(mapIndex, lvHash) + if i < testPmLen { + // check single entry match + if len(matches) < 1 { + t.Fatalf("Invalid length of matches (got %d, expected >=1)", len(matches)) + } + var found bool + for _, lvi := range matches { + if lvi == lvIndices[i] { + found = true + } else { + falsePositives++ + } + } + if !found { + t.Fatalf("Expected match not found (got %v, expected %d)", matches, lvIndices[i]) + } + } else { + // check "long series" match + if len(matches) < testPmLen { + t.Fatalf("Invalid length of matches (got %d, expected >=%d)", len(matches), testPmLen) + } + // since results are ordered, first testPmLen entries should always match exactly + for j := 0; j < testPmLen; j++ { + if matches[j] != lvStart+uint64(j) { + t.Fatalf("Incorrect match at index %d (got %d, expected %d)", j, matches[j], lvStart+uint64(j)) + } + } + // the rest are false positives + falsePositives += len(matches) - testPmLen + } + } + } + // Whenever looking for a certain log value hash, each entry in the row that + // was generated by another log value hash (a "foreign entry") has an + // 1 / valuesPerMap chance of yielding a false positive. + // We have testPmLen unique hash entries and a testPmLen long series of entries + // for the same hash. For each of the testPmLen unique hash entries there are + // testPmLen*2-1 foreign entries while for the long series there are testPmLen + // foreign entries. This means that after performing all these filtering runs, + // we have processed 2*testPmLen^2 foreign entries, which given us an estimate + // of how many false positives to expect. + expFalse := testPmCount * testPmLen * testPmLen * 2 / valuesPerMap + if falsePositives < expFalse/2 || falsePositives > expFalse*3/2 { + t.Fatalf("False positive rate out of expected range (got %d, expected %d +-50%%)", falsePositives, expFalse) + } +} From 5a12b94cc79170d8dadb3c74a4488e293ae57486 Mon Sep 17 00:00:00 2001 From: Zsolt Felfoldi Date: Thu, 19 Sep 2024 17:14:27 +0200 Subject: [PATCH 4/4] 111 --- core/filtermaps/filtermaps.go | 18 ++++++++-- core/filtermaps/indexer.go | 68 +++++++++++++++++++++++++---------- 2 files changed, 65 insertions(+), 21 deletions(-) diff --git a/core/filtermaps/filtermaps.go b/core/filtermaps/filtermaps.go index 87fec8a0a3aa..fbdc05be2d9a 100644 --- a/core/filtermaps/filtermaps.go +++ b/core/filtermaps/filtermaps.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/core/rawdb" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethdb" + "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" ) @@ -28,6 +29,13 @@ const ( headCacheSize = 8 // maximum number of recent filter maps cached in memory ) +type blockchain interface { + CurrentBlock() *types.Header + SubscribeChainHeadEvent(ch chan<- core.ChainHeadEvent) event.Subscription + GetHeader(hash common.Hash, number uint64) *types.Header + GetCanonicalHash(number uint64) common.Hash +} + // FilterMaps is the in-memory representation of the log index structure that is // responsible for building and updating the index according to the canonical // chain. @@ -40,7 +48,12 @@ type FilterMaps struct { closeCh chan chan struct{} filterMapsRange - chain *core.BlockChain + chain blockchain + + chainHeadLock sync.Mutex + chainHeadCh chan *types.Header + chainHead *types.Header + chainHeadCount uint64 // filterMapCache caches certain filter maps (headCacheSize most recent maps // and one tail map) that are expected to be frequently accessed and modified @@ -86,7 +99,7 @@ type filterMapsRange struct { // NewFilterMaps creates a new FilterMaps and starts the indexer in order to keep // the structure in sync with the given blockchain. -func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps { +func NewFilterMaps(db ethdb.Database, chain blockchain) *FilterMaps { rs, err := rawdb.ReadFilterMapsRange(db) if err != nil { log.Error("Error reading log index range", "error", err) @@ -104,6 +117,7 @@ func NewFilterMaps(db ethdb.Database, chain *core.BlockChain) *FilterMaps { headBlockHash: rs.HeadBlockHash, tailParentHash: rs.TailParentHash, }, + chainHeadCh: make(chan *types.Header, 10), filterMapCache: make(map[uint32]*filterMap), blockPtrCache: lru.NewCache[uint32, uint64](1000), lvPointerCache: lru.NewCache[uint64, uint64](1000), diff --git a/core/filtermaps/indexer.go b/core/filtermaps/indexer.go index 0d050923ee3b..33936d9c0e11 100644 --- a/core/filtermaps/indexer.go +++ b/core/filtermaps/indexer.go @@ -19,42 +19,67 @@ const ( cachedRevertPoints = 64 // revert points for most recent blocks in memory ) -// updateLoop initializes and updates the log index structure according to the -// canonical chain. -func (f *FilterMaps) updateLoop() { - headEventCh := make(chan core.ChainHeadEvent) - sub := f.chain.SubscribeChainHeadEvent(headEventCh) - defer sub.Unsubscribe() +func (f *FilterMaps) UpdateHead() bool { + f.chainHeadLock.Lock() + defer f.chainHeadLock.Unlock() head := f.chain.CurrentBlock() if head == nil { + return false + } + if f.chainHead == nil || head.Hash() != f.chainHead.Hash() { select { - case ev := <-headEventCh: - head = ev.Block.Header() - case ch := <-f.closeCh: - close(ch) - return + case f.chainHeadCh <- head: + f.chainHead = head + f.chainHeadCount++ + return true + default: } } - fmr := f.getRange() + return false +} - var stop bool +// updateLoop initializes and updates the log index structure according to the +// canonical chain. +func (f *FilterMaps) updateLoop() { + headEventCh := make(chan core.ChainHeadEvent) + sub := f.chain.SubscribeChainHeadEvent(headEventCh) + defer sub.Unsubscribe() + + var ( + head *types.Header + stop bool + ) wait := func() { if stop { return } select { - case ev := <-headEventCh: - head = ev.Block.Header() + case <-headEventCh: + if f.UpdateHead() { + head = <-f.chainHeadCh + } + case head = <-f.chainHeadCh: case <-time.After(time.Second * 20): // keep updating log index during syncing - head = f.chain.CurrentBlock() + if f.UpdateHead() { + head = <-f.chainHeadCh + } case ch := <-f.closeCh: close(ch) stop = true } } + f.UpdateHead() + for head == nil { + wait() + if stop { + return + } + } + fmr := f.getRange() + for !stop { if !fmr.initialized { f.tryInit(head) @@ -77,14 +102,19 @@ func (f *FilterMaps) updateLoop() { f.tryExtendTail(func() bool { // return true if tail processing needs to be stopped select { - case ev := <-headEventCh: - head = ev.Block.Header() + case <-headEventCh: + if f.UpdateHead() { + head = <-f.chainHeadCh + } + case head = <-f.chainHeadCh: case ch := <-f.closeCh: close(ch) stop = true return true default: - head = f.chain.CurrentBlock() + if f.UpdateHead() { + head = <-f.chainHeadCh + } } // stop if there is a new chain head (always prioritize head updates) return fmr.headBlockHash != head.Hash()