From b92ff142cf081a0f7cae6dfdf52c507dae218a19 Mon Sep 17 00:00:00 2001 From: tuyennhv Date: Thu, 4 Jan 2024 11:28:33 +0700 Subject: [PATCH] feat: implement new state caches (#6176) * feat: implement LRUBlockStateCache * feat: implement PersistentCheckpointStateCache * feat: implement findSeedStateToReload * fix: add missing catch() * fix: import path in state-transition * fix: model CacheItem and type in PersistentCheckpointStateCache * refactor: use for loop in PersistentCheckpointStateCache.processState * chore: move test code to beforeAll() in persistentCheckpointsCache.test.ts * feat: do not prune persisted state when reload * fix: fifo instead of lru BlockStateCache * fix: do not prune the last added item in FIFOBlockStateCache * fix: sync epochIndex and cache in PersistentCheckpointStateCache * chore: cleanup persistent checkpoint cache types * chore: tweak comments * chore: tweak more comments * chore: reword persistent apis * chore: add type to cpStateCache size metrics * fix: metrics labels after rebasing from unstable --------- Co-authored-by: Cayman --- .../beacon-node/src/chain/shufflingCache.ts | 17 + .../src/chain/stateCache/datastore/db.ts | 38 + .../src/chain/stateCache/datastore/index.ts | 2 + .../src/chain/stateCache/datastore/types.ts | 13 + .../chain/stateCache/fifoBlockStateCache.ts | 181 ++++ .../beacon-node/src/chain/stateCache/index.ts | 1 + .../stateCache/persistentCheckpointsCache.ts | 645 ++++++++++++ .../src/chain/stateCache/stateContextCache.ts | 9 +- .../stateContextCheckpointsCache.ts | 29 +- .../beacon-node/src/chain/stateCache/types.ts | 73 ++ packages/beacon-node/src/db/beacon.ts | 3 + packages/beacon-node/src/db/buckets.ts | 2 + packages/beacon-node/src/db/interface.ts | 3 + .../src/db/repositories/checkpointState.ts | 31 + .../src/metrics/metrics/lodestar.ts | 45 +- packages/beacon-node/src/util/array.ts | 86 ++ .../stateCache/fifoBlockStateCache.test.ts | 120 +++ .../persistentCheckpointsCache.test.ts | 954 ++++++++++++++++++ .../beacon-node/test/unit/util/array.test.ts | 124 +++ .../test/utils/chain/stateCache/datastore.ts | 26 + packages/beacon-node/test/utils/mocks/db.ts | 2 + .../state-transition/src/cache/stateCache.ts | 4 +- packages/state-transition/src/index.ts | 2 +- .../src/util/loadState/index.ts | 1 + .../test/unit/cachedBeaconState.test.ts | 6 +- 25 files changed, 2402 insertions(+), 15 deletions(-) create mode 100644 packages/beacon-node/src/chain/stateCache/datastore/db.ts create mode 100644 packages/beacon-node/src/chain/stateCache/datastore/index.ts create mode 100644 packages/beacon-node/src/chain/stateCache/datastore/types.ts create mode 100644 packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts create mode 100644 packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts create mode 100644 packages/beacon-node/src/chain/stateCache/types.ts create mode 100644 packages/beacon-node/src/db/repositories/checkpointState.ts create mode 100644 packages/beacon-node/test/unit/chain/stateCache/fifoBlockStateCache.test.ts create mode 100644 packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts create mode 100644 packages/beacon-node/test/utils/chain/stateCache/datastore.ts create mode 100644 packages/state-transition/src/util/loadState/index.ts diff --git a/packages/beacon-node/src/chain/shufflingCache.ts b/packages/beacon-node/src/chain/shufflingCache.ts index c8468f3b6db5..23177142d846 100644 --- a/packages/beacon-node/src/chain/shufflingCache.ts +++ b/packages/beacon-node/src/chain/shufflingCache.ts @@ -167,6 +167,23 @@ export class ShufflingCache { } } + /** + * Same to get() function but synchronous. + */ + getSync(shufflingEpoch: Epoch, decisionRootHex: RootHex): EpochShuffling | null { + const cacheItem = this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).get(decisionRootHex); + if (cacheItem === undefined) { + return null; + } + + if (isShufflingCacheItem(cacheItem)) { + return cacheItem.shuffling; + } + + // ignore promise + return null; + } + private add(shufflingEpoch: Epoch, decisionBlock: RootHex, cacheItem: CacheItem): void { this.itemsByDecisionRootByEpoch.getOrDefault(shufflingEpoch).set(decisionBlock, cacheItem); pruneSetToMax(this.itemsByDecisionRootByEpoch, this.maxEpochs); diff --git a/packages/beacon-node/src/chain/stateCache/datastore/db.ts b/packages/beacon-node/src/chain/stateCache/datastore/db.ts new file mode 100644 index 000000000000..fef38a7f8dd2 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/datastore/db.ts @@ -0,0 +1,38 @@ +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; +import {phase0, ssz} from "@lodestar/types"; +import {IBeaconDb} from "../../../db/interface.js"; +import {CPStateDatastore, DatastoreKey} from "./types.js"; + +/** + * Implementation of CPStateDatastore using db. + */ +export class DbCPStateDatastore implements CPStateDatastore { + constructor(private readonly db: IBeaconDb) {} + + async write(cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks): Promise { + const serializedCheckpoint = checkpointToDatastoreKey(cpKey); + const stateBytes = state.serialize(); + await this.db.checkpointState.putBinary(serializedCheckpoint, stateBytes); + return serializedCheckpoint; + } + + async remove(serializedCheckpoint: DatastoreKey): Promise { + await this.db.checkpointState.delete(serializedCheckpoint); + } + + async read(serializedCheckpoint: DatastoreKey): Promise { + return this.db.checkpointState.getBinary(serializedCheckpoint); + } + + async readKeys(): Promise { + return this.db.checkpointState.keys(); + } +} + +export function datastoreKeyToCheckpoint(key: DatastoreKey): phase0.Checkpoint { + return ssz.phase0.Checkpoint.deserialize(key); +} + +export function checkpointToDatastoreKey(cp: phase0.Checkpoint): DatastoreKey { + return ssz.phase0.Checkpoint.serialize(cp); +} diff --git a/packages/beacon-node/src/chain/stateCache/datastore/index.ts b/packages/beacon-node/src/chain/stateCache/datastore/index.ts new file mode 100644 index 000000000000..c37de5292a38 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/datastore/index.ts @@ -0,0 +1,2 @@ +export * from "./types.js"; +export * from "./db.js"; diff --git a/packages/beacon-node/src/chain/stateCache/datastore/types.ts b/packages/beacon-node/src/chain/stateCache/datastore/types.ts new file mode 100644 index 000000000000..66ea67f93500 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/datastore/types.ts @@ -0,0 +1,13 @@ +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; +import {phase0} from "@lodestar/types"; + +// With db implementation, persistedKey is serialized data of a checkpoint +export type DatastoreKey = Uint8Array; + +// Make this generic to support testing +export interface CPStateDatastore { + write: (cpKey: phase0.Checkpoint, state: CachedBeaconStateAllForks) => Promise; + remove: (key: DatastoreKey) => Promise; + read: (key: DatastoreKey) => Promise; + readKeys: () => Promise; +} diff --git a/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts b/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts new file mode 100644 index 000000000000..854983101c04 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/fifoBlockStateCache.ts @@ -0,0 +1,181 @@ +import {toHexString} from "@chainsafe/ssz"; +import {RootHex} from "@lodestar/types"; +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; +import {routes} from "@lodestar/api"; +import {Metrics} from "../../metrics/index.js"; +import {LinkedList} from "../../util/array.js"; +import {MapTracker} from "./mapMetrics.js"; +import {BlockStateCache} from "./types.js"; + +export type FIFOBlockStateCacheOpts = { + maxBlockStates?: number; +}; + +/** + * Regen state if there's a reorg distance > 32 slots. + */ +export const DEFAULT_MAX_BLOCK_STATES = 32; + +/** + * New implementation of BlockStateCache that keeps the most recent n states consistently + * - Maintain a linked list (FIFO) with special handling for head state, which is always the first item in the list + * - Prune per add() instead of per checkpoint so it only keeps n historical states consistently, prune from tail + * - No need to prune per finalized checkpoint + * + * Given this block tree with Block 11 as head: + * ``` + Block 10 + | + +-----+-----+ + | | + Block 11 Block 12 + ^ | + | | + head Block 13 + * ``` + * The maintained key order would be: 11 -> 13 -> 12 -> 10, and state 10 will be pruned first. + */ +export class FIFOBlockStateCache implements BlockStateCache { + /** + * Max number of states allowed in the cache + */ + readonly maxStates: number; + + private readonly cache: MapTracker; + /** + * Key order to implement FIFO cache + */ + private readonly keyOrder: LinkedList; + private readonly metrics: Metrics["stateCache"] | null | undefined; + + constructor(opts: FIFOBlockStateCacheOpts, {metrics}: {metrics?: Metrics | null}) { + this.maxStates = opts.maxBlockStates ?? DEFAULT_MAX_BLOCK_STATES; + this.cache = new MapTracker(metrics?.stateCache); + if (metrics) { + this.metrics = metrics.stateCache; + metrics.stateCache.size.addCollect(() => metrics.stateCache.size.set(this.cache.size)); + } + this.keyOrder = new LinkedList(); + } + + /** + * Set a state as head, happens when importing a block and head block is changed. + */ + setHeadState(item: CachedBeaconStateAllForks | null): void { + if (item !== null) { + this.add(item, true); + } + } + + /** + * Get a state from this cache given a state root hex. + */ + get(rootHex: RootHex): CachedBeaconStateAllForks | null { + this.metrics?.lookups.inc(); + const item = this.cache.get(rootHex); + if (!item) { + return null; + } + + this.metrics?.hits.inc(); + this.metrics?.stateClonedCount.observe(item.clonedCount); + + return item; + } + + /** + * Add a state to this cache. + * @param isHead if true, move it to the head of the list. Otherwise add to the 2nd position. + * In importBlock() steps, normally it'll call add() with isHead = false first. Then call setHeadState() to set the head. + */ + add(item: CachedBeaconStateAllForks, isHead = false): void { + const key = toHexString(item.hashTreeRoot()); + if (this.cache.get(key) != null) { + if (!this.keyOrder.has(key)) { + throw Error(`State exists but key not found in keyOrder: ${key}`); + } + if (isHead) { + this.keyOrder.moveToHead(key); + } else { + this.keyOrder.moveToSecond(key); + } + // same size, no prune + return; + } + + // new state + this.metrics?.adds.inc(); + this.cache.set(key, item); + if (isHead) { + this.keyOrder.unshift(key); + } else { + // insert after head + const head = this.keyOrder.first(); + if (head == null) { + // should not happen, however handle just in case + this.keyOrder.unshift(key); + } else { + this.keyOrder.insertAfter(head, key); + } + } + this.prune(key); + } + + get size(): number { + return this.cache.size; + } + + /** + * Prune the cache from tail to keep the most recent n states consistently. + * The tail of the list is the oldest state, in case regen adds back the same state, + * it should stay next to head so that it won't be pruned right away. + * The FIFO cache helps with this. + */ + prune(lastAddedKey: string): void { + while (this.keyOrder.length > this.maxStates) { + const key = this.keyOrder.last(); + // it does not make sense to prune the last added state + // this only happens when max state is 1 in a short period of time + if (key === lastAddedKey) { + break; + } + if (!key) { + // should not happen + throw new Error("No key"); + } + this.keyOrder.pop(); + this.cache.delete(key); + } + } + + /** + * No need for this implementation + * This is only to conform to the old api + */ + deleteAllBeforeEpoch(): void {} + + /** + * ONLY FOR DEBUGGING PURPOSES. For lodestar debug API. + */ + clear(): void { + this.cache.clear(); + } + + /** ONLY FOR DEBUGGING PURPOSES. For lodestar debug API */ + dumpSummary(): routes.lodestar.StateCacheItem[] { + return Array.from(this.cache.entries()).map(([key, state]) => ({ + slot: state.slot, + root: toHexString(state.hashTreeRoot()), + reads: this.cache.readCount.get(key) ?? 0, + lastRead: this.cache.lastRead.get(key) ?? 0, + checkpointState: false, + })); + } + + /** + * For unit test only. + */ + dumpKeyOrder(): string[] { + return this.keyOrder.toArray(); + } +} diff --git a/packages/beacon-node/src/chain/stateCache/index.ts b/packages/beacon-node/src/chain/stateCache/index.ts index 69fb34a77e4c..b16d87c3fa0d 100644 --- a/packages/beacon-node/src/chain/stateCache/index.ts +++ b/packages/beacon-node/src/chain/stateCache/index.ts @@ -1,2 +1,3 @@ export * from "./stateContextCache.js"; export * from "./stateContextCheckpointsCache.js"; +export * from "./fifoBlockStateCache.js"; diff --git a/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts new file mode 100644 index 000000000000..8ad5c5098118 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/persistentCheckpointsCache.ts @@ -0,0 +1,645 @@ +import {fromHexString, toHexString} from "@chainsafe/ssz"; +import {phase0, Epoch, RootHex} from "@lodestar/types"; +import {CachedBeaconStateAllForks, computeStartSlotAtEpoch, getBlockRootAtSlot} from "@lodestar/state-transition"; +import {Logger, MapDef} from "@lodestar/utils"; +import {routes} from "@lodestar/api"; +import {loadCachedBeaconState} from "@lodestar/state-transition"; +import {Metrics} from "../../metrics/index.js"; +import {IClock} from "../../util/clock.js"; +import {ShufflingCache} from "../shufflingCache.js"; +import {MapTracker} from "./mapMetrics.js"; +import {CheckpointHex, CheckpointStateCache, CacheItemType} from "./types.js"; +import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js"; + +type GetHeadStateFn = () => CachedBeaconStateAllForks; + +type PersistentCheckpointStateCacheModules = { + metrics?: Metrics | null; + logger: Logger; + clock?: IClock | null; + shufflingCache: ShufflingCache; + datastore: CPStateDatastore; + getHeadState?: GetHeadStateFn; +}; + +type PersistentCheckpointStateCacheOpts = { + // Keep max n states in memory, persist the rest to disk + maxCPStateEpochsInMemory?: number; +}; + +/** checkpoint serialized as a string */ +type CacheKey = string; + +type InMemoryCacheItem = { + type: CacheItemType.inMemory; + state: CachedBeaconStateAllForks; + // if a cp state is reloaded from disk, it'll keep track of persistedKey to allow us to remove it from disk later + // it also helps not to persist it again + persistedKey?: DatastoreKey; +}; + +type PersistedCacheItem = { + type: CacheItemType.persisted; + value: DatastoreKey; +}; + +type CacheItem = InMemoryCacheItem | PersistedCacheItem; + +type LoadedStateBytesData = {persistedKey: DatastoreKey; stateBytes: Uint8Array}; + +/** + * Before n-historical states, lodestar keeps mostly 3 states in memory with 1 finalized state + * Since Jan 2024, lodestar stores the finalized state in disk and keeps up to 2 epochs in memory + */ +export const DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY = 2; + +/** + * An implementation of CheckpointStateCache that keep up to n epoch checkpoint states in memory and persist the rest to disk + * - If it's more than `maxEpochsInMemory` epochs old, it will persist n last epochs to disk based on the view of the block + * - Once a chain gets finalized we'll prune all states from memory and disk for epochs < finalizedEpoch + * - In get*() apis if shouldReload is true, it will reload from disk. The reload() api is expensive and should only be called in some important flows: + * - Get state for block processing + * - updateHeadState + * - as with any cache, the state could be evicted from memory at any time, so we should always check if the state is in memory or not + * - Each time we process a state, we only persist exactly 1 checkpoint state per epoch based on the view of block and prune all others. The persisted + * checkpoint state could be finalized and used later in archive task, it's also used to regen states. + * - When we process multiple states in the same epoch, we could persist different checkpoint states of the same epoch because each block could have its + * own view. See unit test of this file `packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts` for more details. + * + * The below diagram shows Previous Root Checkpoint State is persisted for epoch (n-2) and Current Root Checkpoint State is persisted for epoch (n-1) + * while at epoch (n) and (n+1) we have both of them in memory + * + * ╔════════════════════════════════════╗═══════════════╗ + * ║ persisted to db or fs ║ in memory ║ + * ║ reload if needed ║ ║ + * ║ -----------------------------------║---------------║ + * ║ epoch: (n-2) (n-1) ║ n (n+1) ║ + * ║ |-------|-------|----║--|-------|----║ + * ║ ^ ^ ║ ^ ^ ║ + * ║ ║ ^ ^ ║ + * ╚════════════════════════════════════╝═══════════════╝ + * + * The "in memory" checkpoint states are similar to the old implementation: we have both Previous Root Checkpoint State and Current Root Checkpoint State per epoch. + * However in the "persisted to db or fs" part, we usually only persist 1 checkpoint state per epoch, the one that could potentially be justified/finalized later + * based on the view of blocks. + */ +export class PersistentCheckpointStateCache implements CheckpointStateCache { + private readonly cache: MapTracker; + /** Epoch -> Set */ + private readonly epochIndex = new MapDef>(() => new Set()); + private readonly metrics: Metrics["cpStateCache"] | null | undefined; + private readonly logger: Logger; + private readonly clock: IClock | null | undefined; + private preComputedCheckpoint: string | null = null; + private preComputedCheckpointHits: number | null = null; + private readonly maxEpochsInMemory: number; + private readonly datastore: CPStateDatastore; + private readonly shufflingCache: ShufflingCache; + private readonly getHeadState?: GetHeadStateFn; + + constructor( + {metrics, logger, clock, shufflingCache, datastore, getHeadState}: PersistentCheckpointStateCacheModules, + opts: PersistentCheckpointStateCacheOpts + ) { + this.cache = new MapTracker(metrics?.cpStateCache); + if (metrics) { + this.metrics = metrics.cpStateCache; + metrics.cpStateCache.size.addCollect(() => { + let persistCount = 0; + let inMemoryCount = 0; + const memoryEpochs = new Set(); + const persistentEpochs = new Set(); + for (const [key, cacheItem] of this.cache.entries()) { + const {epoch} = fromCacheKey(key); + if (isPersistedCacheItem(cacheItem)) { + persistCount++; + persistentEpochs.add(epoch); + } else { + inMemoryCount++; + memoryEpochs.add(epoch); + } + } + metrics.cpStateCache.size.set({type: CacheItemType.persisted}, persistCount); + metrics.cpStateCache.size.set({type: CacheItemType.inMemory}, inMemoryCount); + metrics.cpStateCache.epochSize.set({type: CacheItemType.persisted}, persistentEpochs.size); + metrics.cpStateCache.epochSize.set({type: CacheItemType.inMemory}, memoryEpochs.size); + }); + } + this.logger = logger; + this.clock = clock; + if (opts.maxCPStateEpochsInMemory !== undefined && opts.maxCPStateEpochsInMemory < 0) { + throw new Error("maxEpochsInMemory must be >= 0"); + } + this.maxEpochsInMemory = opts.maxCPStateEpochsInMemory ?? DEFAULT_MAX_CP_STATE_EPOCHS_IN_MEMORY; + // Specify different datastore for testing + this.datastore = datastore; + this.shufflingCache = shufflingCache; + this.getHeadState = getHeadState; + } + + /** + * Reload checkpoint state keys from the last run. + */ + async init(): Promise { + const persistedKeys = await this.datastore.readKeys(); + for (const persistedKey of persistedKeys) { + const cp = datastoreKeyToCheckpoint(persistedKey); + this.cache.set(toCacheKey(cp), {type: CacheItemType.persisted, value: persistedKey}); + this.epochIndex.getOrDefault(cp.epoch).add(toHexString(cp.root)); + } + this.logger.info("Loaded persisted checkpoint states from the last run", { + count: persistedKeys.length, + maxEpochsInMemory: this.maxEpochsInMemory, + }); + } + + /** + * Get a state from cache, it may reload from disk. + * This is an expensive api, should only be called in some important flows: + * - Validate a gossip block + * - Get block for processing + * - Regen head state + */ + async getOrReload(cp: CheckpointHex): Promise { + const stateOrStateBytesData = await this.getStateOrLoadDb(cp); + if (stateOrStateBytesData === null || isCachedBeaconState(stateOrStateBytesData)) { + return stateOrStateBytesData; + } + const {persistedKey, stateBytes} = stateOrStateBytesData; + const logMeta = {persistedKey: toHexString(persistedKey)}; + this.logger.debug("Reload: read state successful", logMeta); + this.metrics?.stateReloadSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); + const seedState = this.findSeedStateToReload(cp) ?? this.getHeadState?.(); + if (seedState == null) { + throw new Error("No seed state found for cp " + toCacheKey(cp)); + } + this.metrics?.stateReloadEpochDiff.observe(Math.abs(seedState.epochCtx.epoch - cp.epoch)); + this.logger.debug("Reload: found seed state", {...logMeta, seedSlot: seedState.slot}); + + try { + const timer = this.metrics?.stateReloadDuration.startTimer(); + const newCachedState = loadCachedBeaconState(seedState, stateBytes, { + shufflingGetter: this.shufflingCache.getSync.bind(this.shufflingCache), + }); + newCachedState.commit(); + const stateRoot = toHexString(newCachedState.hashTreeRoot()); + timer?.(); + this.logger.debug("Reload: cached state load successful", { + ...logMeta, + stateSlot: newCachedState.slot, + stateRoot, + seedSlot: seedState.slot, + }); + + // only remove persisted state once we reload successfully + const cpKey = toCacheKey(cp); + this.cache.set(cpKey, {type: CacheItemType.inMemory, state: newCachedState, persistedKey}); + this.epochIndex.getOrDefault(cp.epoch).add(cp.rootHex); + // don't prune from memory here, call it at the last 1/3 of slot 0 of an epoch + return newCachedState; + } catch (e) { + this.logger.debug("Reload: error loading cached state", logMeta, e as Error); + return null; + } + } + + /** + * Return either state or state bytes loaded from db. + */ + async getStateOrBytes(cp: CheckpointHex): Promise { + const stateOrLoadedState = await this.getStateOrLoadDb(cp); + if (stateOrLoadedState === null || isCachedBeaconState(stateOrLoadedState)) { + return stateOrLoadedState; + } + return stateOrLoadedState.stateBytes; + } + + /** + * Return either state or state bytes with persisted key loaded from db. + */ + async getStateOrLoadDb(cp: CheckpointHex): Promise { + const cpKey = toCacheKey(cp); + const inMemoryState = this.get(cpKey); + if (inMemoryState) { + return inMemoryState; + } + + const cacheItem = this.cache.get(cpKey); + if (cacheItem === undefined) { + return null; + } + + if (isInMemoryCacheItem(cacheItem)) { + // should not happen, in-memory state is handled above + throw new Error("Expected persistent key"); + } + + const persistedKey = cacheItem.value; + const dbReadTimer = this.metrics?.stateReloadDbReadTime.startTimer(); + const stateBytes = await this.datastore.read(persistedKey); + dbReadTimer?.(); + + if (stateBytes === null) { + return null; + } + return {persistedKey, stateBytes}; + } + + /** + * Similar to get() api without reloading from disk + */ + get(cpOrKey: CheckpointHex | string): CachedBeaconStateAllForks | null { + this.metrics?.lookups.inc(); + const cpKey = typeof cpOrKey === "string" ? cpOrKey : toCacheKey(cpOrKey); + const cacheItem = this.cache.get(cpKey); + + if (cacheItem === undefined) { + return null; + } + + this.metrics?.hits.inc(); + + if (cpKey === this.preComputedCheckpoint) { + this.preComputedCheckpointHits = (this.preComputedCheckpointHits ?? 0) + 1; + } + + if (isInMemoryCacheItem(cacheItem)) { + const {state} = cacheItem; + this.metrics?.stateClonedCount.observe(state.clonedCount); + return state; + } + + return null; + } + + /** + * Add a state of a checkpoint to this cache, prune from memory if necessary. + */ + add(cp: phase0.Checkpoint, state: CachedBeaconStateAllForks): void { + const cpHex = toCheckpointHex(cp); + const key = toCacheKey(cpHex); + const cacheItem = this.cache.get(key); + this.metrics?.adds.inc(); + if (cacheItem !== undefined && isPersistedCacheItem(cacheItem)) { + const persistedKey = cacheItem.value; + // was persisted to disk, set back to memory + this.cache.set(key, {type: CacheItemType.inMemory, state, persistedKey}); + this.logger.verbose("Added checkpoint state to memory but a persisted key existed", { + epoch: cp.epoch, + rootHex: cpHex.rootHex, + persistedKey: toHexString(persistedKey), + }); + } else { + this.cache.set(key, {type: CacheItemType.inMemory, state}); + this.logger.verbose("Added checkpoint state to memory", {epoch: cp.epoch, rootHex: cpHex.rootHex}); + } + this.epochIndex.getOrDefault(cp.epoch).add(cpHex.rootHex); + } + + /** + * Searches in-memory state for the latest cached state with a `root` without reload, starting with `epoch` and descending + */ + getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null { + // sort epochs in descending order, only consider epochs lte `epoch` + const epochs = Array.from(this.epochIndex.keys()) + .sort((a, b) => b - a) + .filter((e) => e <= maxEpoch); + for (const epoch of epochs) { + if (this.epochIndex.get(epoch)?.has(rootHex)) { + const inMemoryState = this.get({rootHex, epoch}); + if (inMemoryState) { + return inMemoryState; + } + } + } + return null; + } + + /** + * Searches state for the latest cached state with a `root`, reload if needed, starting with `epoch` and descending + * This is expensive api, should only be called in some important flows: + * - Validate a gossip block + * - Get block for processing + * - Regen head state + */ + async getOrReloadLatest(rootHex: RootHex, maxEpoch: Epoch): Promise { + // sort epochs in descending order, only consider epochs lte `epoch` + const epochs = Array.from(this.epochIndex.keys()) + .sort((a, b) => b - a) + .filter((e) => e <= maxEpoch); + for (const epoch of epochs) { + if (this.epochIndex.get(epoch)?.has(rootHex)) { + try { + const state = await this.getOrReload({rootHex, epoch}); + if (state) { + return state; + } + } catch (e) { + this.logger.debug("Error get or reload state", {epoch, rootHex}, e as Error); + } + } + } + return null; + } + + /** + * Update the precomputed checkpoint and return the number of his for the + * previous one (if any). + */ + updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null { + const previousHits = this.preComputedCheckpointHits; + this.preComputedCheckpoint = toCacheKey({rootHex, epoch}); + this.preComputedCheckpointHits = 0; + return previousHits; + } + + /** + * This is just to conform to the old implementation + */ + prune(): void { + // do nothing + } + + /** + * Prune all checkpoint states before the provided finalized epoch. + */ + pruneFinalized(finalizedEpoch: Epoch): void { + for (const epoch of this.epochIndex.keys()) { + if (epoch < finalizedEpoch) { + this.deleteAllEpochItems(epoch).catch((e) => + this.logger.debug("Error delete all epoch items", {epoch, finalizedEpoch}, e as Error) + ); + } + } + } + + /** + * After processing a block, prune from memory based on the view of that block. + * This is likely persist 1 state per epoch, at the last 1/3 of slot 0 of an epoch although it'll be called on every last 1/3 of slot. + * Given the following block b was processed with b2, b1, b0 are ancestors in epoch (n-2), (n-1), n respectively + * + * epoch: (n-2) (n-1) n (n+1) + * |-----------|-----------|-----------|-----------| + * ^ ^ ^ ^ + * | | | | + * block chain: b2---------->b1--------->b0-->b + * + * After processing block b, if maxEpochsInMemory is: + * - 2 then we'll persist {root: b2, epoch n-2} checkpoint state to disk + * - 1 then we'll persist {root: b2, epoch n-2} and {root: b1, epoch n-1} checkpoint state to disk + * - 0 then we'll persist {root: b2, epoch n-2} and {root: b1, epoch n-1} and {root: b0, epoch n} checkpoint state to disk + * - if any old epochs checkpoint states are persisted, no need to do it again + * + * Note that for each epoch there could be multiple checkpoint states, usually 2, one for Previous Root Checkpoint State and one for Current Root Checkpoint State. + * We normally only persist 1 checkpoint state per epoch, the one that could potentially be justified/finalized later based on the view of the block. + * Other checkpoint states are pruned from memory. + * + * This design also covers the reorg scenario. Given block c in the same epoch n where c.slot > b.slot, c is not descendant of b, and c is built on top of c0 + * instead of b0 (epoch (n - 1)) + * + * epoch: (n-2) (n-1) n (n+1) + * |-----------|-----------|-----------|-----------| + * ^ ^ ^ ^ ^ ^ + * | | | | | | + * block chain: b2---------->b1----->c0->b0-->b | + * ║ | + * ╚═══════════>c (reorg) + * + * After processing block c, if maxEpochsInMemory is: + * - 0 then we'll persist {root: c0, epoch: n} checkpoint state to disk. Note that regen should populate {root: c0, epoch: n} checkpoint state before. + * + * epoch: (n-1) n (n+1) + * |-------------------------------------------------------------|-------------------------------------------------------------| + * ^ ^ ^ ^ + * _______ | | | | + * | | | | | | + * | db |====== reload ======> {root: b1, epoch: n-1} cp state ======> c0 block state ======> {root: c0, epoch: n} cp state =====> c block state + * |_______| + * + * + * + * - 1 then we'll persist {root: b1, epoch n-1} checkpoint state to disk. Note that at epoch n there is both {root: b0, epoch: n} and {root: c0, epoch: n} checkpoint states in memory + * - 2 then we'll persist {root: b2, epoch n-2} checkpoint state to disk, there are also 2 checkpoint states in memory at epoch n, same to the above (maxEpochsInMemory=1) + * + * As of Nov 2023, it takes 1.3s to 1.5s to persist a state on holesky on fast server. TODO: + * - improve state serialization time + * - or research how to only store diff against the finalized state + */ + async processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise { + let persistCount = 0; + // it's important to sort the epochs in ascending order, in case of big reorg we always want to keep the most recent checkpoint states + const sortedEpochs = Array.from(this.epochIndex.keys()).sort((a, b) => a - b); + if (sortedEpochs.length <= this.maxEpochsInMemory) { + return 0; + } + + for (const lowestEpoch of sortedEpochs.slice(0, sortedEpochs.length - this.maxEpochsInMemory)) { + const epochBoundarySlot = computeStartSlotAtEpoch(lowestEpoch); + const epochBoundaryRoot = + epochBoundarySlot === state.slot ? fromHexString(blockRootHex) : getBlockRootAtSlot(state, epochBoundarySlot); + const epochBoundaryHex = toHexString(epochBoundaryRoot); + + // for each epoch, usually there are 2 rootHex respective to the 2 checkpoint states: Previous Root Checkpoint State and Current Root Checkpoint State + for (const rootHex of this.epochIndex.get(lowestEpoch) ?? []) { + const cpKey = toCacheKey({epoch: lowestEpoch, rootHex}); + const cacheItem = this.cache.get(cpKey); + + if (cacheItem !== undefined && isInMemoryCacheItem(cacheItem)) { + // this is state in memory, we don't care if the checkpoint state is already persisted + let {persistedKey} = cacheItem; + const {state} = cacheItem; + const logMeta = { + stateSlot: state.slot, + rootHex, + epochBoundaryHex, + persistedKey: persistedKey ? toHexString(persistedKey) : "", + }; + + if (rootHex === epochBoundaryHex) { + if (persistedKey) { + // no need to persist + this.logger.verbose("Pruned checkpoint state from memory but no need to persist", logMeta); + } else { + // persist and do not update epochIndex + this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0); + const timer = this.metrics?.statePersistDuration.startTimer(); + const cpPersist = {epoch: lowestEpoch, root: epochBoundaryRoot}; + persistedKey = await this.datastore.write(cpPersist, state); + timer?.(); + persistCount++; + this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", { + ...logMeta, + persistedKey: toHexString(persistedKey), + }); + } + // overwrite cpKey, this means the state is deleted from memory + this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); + } else { + if (persistedKey) { + // persisted file will be eventually deleted by the archive task + // this also means the state is deleted from memory + this.cache.set(cpKey, {type: CacheItemType.persisted, value: persistedKey}); + // do not update epochIndex + } else { + // delete the state from memory + this.cache.delete(cpKey); + this.epochIndex.get(lowestEpoch)?.delete(rootHex); + } + this.metrics?.statePruneFromMemoryCount.inc(); + this.logger.verbose("Pruned checkpoint state from memory", logMeta); + } + } + } + } + + return persistCount; + } + + /** + * Find a seed state to reload the state of provided checkpoint. Based on the design of n-historical state: + * + * ╔════════════════════════════════════╗═══════════════╗ + * ║ persisted to db or fs ║ in memory ║ + * ║ reload if needed ║ ║ + * ║ -----------------------------------║---------------║ + * ║ epoch: (n-2) (n-1) ║ n (n+1) ║ + * ║ |-------|-------|----║--|-------|----║ + * ║ ^ ^ ║ ^ ^ ║ + * ║ ║ ^ ^ ║ + * ╚════════════════════════════════════╝═══════════════╝ + * + * we always reload an epoch in the past. We'll start with epoch n then (n+1) prioritizing ones with the same view of `reloadedCp`. + * + * This could return null and we should get head state in that case. + */ + findSeedStateToReload(reloadedCp: CheckpointHex): CachedBeaconStateAllForks | null { + const maxEpoch = Math.max(...Array.from(this.epochIndex.keys())); + const reloadedCpSlot = computeStartSlotAtEpoch(reloadedCp.epoch); + let firstState: CachedBeaconStateAllForks | null = null; + // no need to check epochs before `maxEpoch - this.maxEpochsInMemory + 1` before they are all persisted + for (let epoch = maxEpoch - this.maxEpochsInMemory + 1; epoch <= maxEpoch; epoch++) { + // if there's at least 1 state in memory in an epoch, just return the 1st one + if (firstState !== null) { + return firstState; + } + + for (const rootHex of this.epochIndex.get(epoch) || []) { + const cpKey = toCacheKey({rootHex, epoch}); + const cacheItem = this.cache.get(cpKey); + if (cacheItem === undefined) { + // should not happen + continue; + } + if (isInMemoryCacheItem(cacheItem)) { + const {state} = cacheItem; + if (firstState === null) { + firstState = state; + } + + // amongst states of the same epoch, choose the one with the same view of reloadedCp + if ( + reloadedCpSlot < state.slot && + toHexString(getBlockRootAtSlot(state, reloadedCpSlot)) === reloadedCp.rootHex + ) { + return state; + } + } + } + } + + return firstState; + } + + clear(): void { + this.cache.clear(); + this.epochIndex.clear(); + } + + /** ONLY FOR DEBUGGING PURPOSES. For lodestar debug API */ + dumpSummary(): routes.lodestar.StateCacheItem[] { + return Array.from(this.cache.keys()).map((key) => { + const cp = fromCacheKey(key); + // TODO: add checkpoint key and persistent key to the summary + return { + slot: computeStartSlotAtEpoch(cp.epoch), + root: cp.rootHex, + reads: this.cache.readCount.get(key) ?? 0, + lastRead: this.cache.lastRead.get(key) ?? 0, + checkpointState: true, + }; + }); + } + + /** ONLY FOR DEBUGGING PURPOSES. For spec tests on error */ + dumpCheckpointKeys(): string[] { + return Array.from(this.cache.keys()); + } + + /** + * Delete all items of an epoch from disk and memory + */ + private async deleteAllEpochItems(epoch: Epoch): Promise { + let persistCount = 0; + const rootHexes = this.epochIndex.get(epoch) || []; + for (const rootHex of rootHexes) { + const key = toCacheKey({rootHex, epoch}); + const cacheItem = this.cache.get(key); + + if (cacheItem) { + const persistedKey = isPersistedCacheItem(cacheItem) ? cacheItem.value : cacheItem.persistedKey; + if (persistedKey) { + await this.datastore.remove(persistedKey); + persistCount++; + this.metrics?.persistedStateRemoveCount.inc(); + } + } + this.cache.delete(key); + } + this.epochIndex.delete(epoch); + this.logger.verbose("Pruned finalized checkpoints states for epoch", { + epoch, + persistCount, + rootHexes: Array.from(rootHexes).join(","), + }); + } +} + +function toCheckpointHex(checkpoint: phase0.Checkpoint): CheckpointHex { + return { + epoch: checkpoint.epoch, + rootHex: toHexString(checkpoint.root), + }; +} + +function toCacheKey(cp: CheckpointHex | phase0.Checkpoint): CacheKey { + if (isCheckpointHex(cp)) { + return `${cp.rootHex}_${cp.epoch}`; + } + return `${toHexString(cp.root)}_${cp.epoch}`; +} + +function fromCacheKey(key: CacheKey): CheckpointHex { + const [rootHex, epoch] = key.split("_"); + return { + rootHex, + epoch: Number(epoch), + }; +} + +function isCachedBeaconState( + stateOrBytes: CachedBeaconStateAllForks | LoadedStateBytesData +): stateOrBytes is CachedBeaconStateAllForks { + return (stateOrBytes as CachedBeaconStateAllForks).slot !== undefined; +} + +function isInMemoryCacheItem(cacheItem: CacheItem): cacheItem is InMemoryCacheItem { + return cacheItem.type === CacheItemType.inMemory; +} + +function isPersistedCacheItem(cacheItem: CacheItem): cacheItem is PersistedCacheItem { + return cacheItem.type === CacheItemType.persisted; +} + +function isCheckpointHex(cp: CheckpointHex | phase0.Checkpoint): cp is CheckpointHex { + return (cp as CheckpointHex).rootHex !== undefined; +} diff --git a/packages/beacon-node/src/chain/stateCache/stateContextCache.ts b/packages/beacon-node/src/chain/stateCache/stateContextCache.ts index 44523abf799c..3a04c4f4a258 100644 --- a/packages/beacon-node/src/chain/stateCache/stateContextCache.ts +++ b/packages/beacon-node/src/chain/stateCache/stateContextCache.ts @@ -4,15 +4,16 @@ import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; import {routes} from "@lodestar/api"; import {Metrics} from "../../metrics/index.js"; import {MapTracker} from "./mapMetrics.js"; +import {BlockStateCache} from "./types.js"; const MAX_STATES = 3 * 32; /** - * In memory cache of CachedBeaconState - * - * Similar API to Repository + * Old implementation of StateCache + * - Prune per checkpoint so number of states ranges from 96 to 128 + * - Keep a separate head state to make sure it is always available */ -export class StateContextCache { +export class StateContextCache implements BlockStateCache { /** * Max number of states allowed in the cache */ diff --git a/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts b/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts index 0cb48f0e2ded..a177db9b7c87 100644 --- a/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts +++ b/packages/beacon-node/src/chain/stateCache/stateContextCheckpointsCache.ts @@ -5,6 +5,7 @@ import {MapDef} from "@lodestar/utils"; import {routes} from "@lodestar/api"; import {Metrics} from "../../metrics/index.js"; import {MapTracker} from "./mapMetrics.js"; +import {CheckpointStateCache as CheckpointStateCacheInterface, CacheItemType} from "./types.js"; export type CheckpointHex = {epoch: Epoch; rootHex: RootHex}; const MAX_EPOCHS = 10; @@ -14,8 +15,9 @@ const MAX_EPOCHS = 10; * belonging to checkpoint * * Similar API to Repository + * TODO: rename to MemoryCheckpointStateCache in the next PR of n-historical states */ -export class CheckpointStateCache { +export class CheckpointStateCache implements CheckpointStateCacheInterface { private readonly cache: MapTracker; /** Epoch -> Set */ private readonly epochIndex = new MapDef>(() => new Set()); @@ -27,11 +29,32 @@ export class CheckpointStateCache { this.cache = new MapTracker(metrics?.cpStateCache); if (metrics) { this.metrics = metrics.cpStateCache; - metrics.cpStateCache.size.addCollect(() => metrics.cpStateCache.size.set(this.cache.size)); - metrics.cpStateCache.epochSize.addCollect(() => metrics.cpStateCache.epochSize.set(this.epochIndex.size)); + metrics.cpStateCache.size.addCollect(() => + metrics.cpStateCache.size.set({type: CacheItemType.inMemory}, this.cache.size) + ); + metrics.cpStateCache.epochSize.addCollect(() => + metrics.cpStateCache.epochSize.set({type: CacheItemType.inMemory}, this.epochIndex.size) + ); } } + async getOrReload(cp: CheckpointHex): Promise { + return this.get(cp); + } + + async getStateOrBytes(cp: CheckpointHex): Promise { + return this.get(cp); + } + + async getOrReloadLatest(rootHex: string, maxEpoch: number): Promise { + return this.getLatest(rootHex, maxEpoch); + } + + async processState(): Promise { + // do nothing, this class does not support prunning + return 0; + } + get(cp: CheckpointHex): CachedBeaconStateAllForks | null { this.metrics?.lookups.inc(); const cpKey = toCheckpointKey(cp); diff --git a/packages/beacon-node/src/chain/stateCache/types.ts b/packages/beacon-node/src/chain/stateCache/types.ts new file mode 100644 index 000000000000..5867d7d356c1 --- /dev/null +++ b/packages/beacon-node/src/chain/stateCache/types.ts @@ -0,0 +1,73 @@ +import {CachedBeaconStateAllForks} from "@lodestar/state-transition"; +import {Epoch, RootHex, phase0} from "@lodestar/types"; +import {routes} from "@lodestar/api"; + +export type CheckpointHex = {epoch: Epoch; rootHex: RootHex}; + +/** + * Lodestar currently keeps two state caches around. + * + * 1. BlockStateCache is keyed by state root, and intended to keep extremely recent states around (eg: post states from the latest blocks) + * These states are most likely to be useful for state transition of new blocks. + * + * 2. CheckpointStateCache is keyed by checkpoint, and intended to keep states which have just undergone an epoch transition. + * These states are useful for gossip verification and for avoiding an epoch transition during state transition of first-in-epoch blocks + */ + +/** + * Store up to n recent block states. + * + * The cache key is state root + */ +export interface BlockStateCache { + get(rootHex: RootHex): CachedBeaconStateAllForks | null; + add(item: CachedBeaconStateAllForks): void; + setHeadState(item: CachedBeaconStateAllForks | null): void; + clear(): void; + size: number; + prune(headStateRootHex: RootHex): void; + deleteAllBeforeEpoch(finalizedEpoch: Epoch): void; + dumpSummary(): routes.lodestar.StateCacheItem[]; +} + +/** + * Store checkpoint states to preserve epoch transition, this helps lodestar run exactly 1 epoch transition per epoch in normal network conditions. + * + * There are 2 types of checkpoint states: + * + * - Previous Root Checkpoint State: where root is from previous epoch, this is added when we prepare for next slot, + * or to validate gossip block + * ``` + * epoch: (n-2) (n-1) n (n+1) + * |-------|-------|-------|-------| + * root ---------------------^ + * ``` + * + * - Current Root Checkpoint State: this is added when we process block slot 0 of epoch n, note that this block could + * be skipped so we don't always have this checkpoint state + * ``` + * epoch: (n-2) (n-1) n (n+1) + * |-------|-------|-------|-------| + * root ---------------------^ + * ``` + */ +export interface CheckpointStateCache { + init?: () => Promise; + getOrReload(cp: CheckpointHex): Promise; + getStateOrBytes(cp: CheckpointHex): Promise; + get(cpOrKey: CheckpointHex | string): CachedBeaconStateAllForks | null; + add(cp: phase0.Checkpoint, state: CachedBeaconStateAllForks): void; + getLatest(rootHex: RootHex, maxEpoch: Epoch): CachedBeaconStateAllForks | null; + getOrReloadLatest(rootHex: RootHex, maxEpoch: Epoch): Promise; + updatePreComputedCheckpoint(rootHex: RootHex, epoch: Epoch): number | null; + prune(finalizedEpoch: Epoch, justifiedEpoch: Epoch): void; + pruneFinalized(finalizedEpoch: Epoch): void; + processState(blockRootHex: RootHex, state: CachedBeaconStateAllForks): Promise; + clear(): void; + dumpSummary(): routes.lodestar.StateCacheItem[]; +} + +export enum CacheItemType { + persisted = "persisted", + inMemory = "in-memory", +} diff --git a/packages/beacon-node/src/db/beacon.ts b/packages/beacon-node/src/db/beacon.ts index 58b99f2a37e0..07cc47fa54d8 100644 --- a/packages/beacon-node/src/db/beacon.ts +++ b/packages/beacon-node/src/db/beacon.ts @@ -21,6 +21,7 @@ import { BLSToExecutionChangeRepository, } from "./repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js"; +import {CheckpointStateRepository} from "./repositories/checkpointState.js"; export type BeaconDbModules = { config: ChainForkConfig; @@ -35,6 +36,7 @@ export class BeaconDb implements IBeaconDb { blobSidecarsArchive: BlobSidecarsArchiveRepository; stateArchive: StateArchiveRepository; + checkpointState: CheckpointStateRepository; voluntaryExit: VoluntaryExitRepository; proposerSlashing: ProposerSlashingRepository; @@ -67,6 +69,7 @@ export class BeaconDb implements IBeaconDb { this.blobSidecarsArchive = new BlobSidecarsArchiveRepository(config, db); this.stateArchive = new StateArchiveRepository(config, db); + this.checkpointState = new CheckpointStateRepository(config, db); this.voluntaryExit = new VoluntaryExitRepository(config, db); this.blsToExecutionChange = new BLSToExecutionChangeRepository(config, db); this.proposerSlashing = new ProposerSlashingRepository(config, db); diff --git a/packages/beacon-node/src/db/buckets.ts b/packages/beacon-node/src/db/buckets.ts index 1a3abfa33623..9dffd0608d52 100644 --- a/packages/beacon-node/src/db/buckets.ts +++ b/packages/beacon-node/src/db/buckets.ts @@ -28,6 +28,8 @@ export enum Bucket { phase0_proposerSlashing = 14, // ValidatorIndex -> ProposerSlashing phase0_attesterSlashing = 15, // Root -> AttesterSlashing capella_blsToExecutionChange = 16, // ValidatorIndex -> SignedBLSToExecutionChange + // checkpoint states + allForks_checkpointState = 17, // Root -> allForks.BeaconState // allForks_pendingBlock = 25, // Root -> SignedBeaconBlock // DEPRECATED on v0.30.0 phase0_depositEvent = 19, // depositIndex -> DepositEvent diff --git a/packages/beacon-node/src/db/interface.ts b/packages/beacon-node/src/db/interface.ts index 58bf25c57aa7..6ffb8992f635 100644 --- a/packages/beacon-node/src/db/interface.ts +++ b/packages/beacon-node/src/db/interface.ts @@ -19,6 +19,7 @@ import { BLSToExecutionChangeRepository, } from "./repositories/index.js"; import {PreGenesisState, PreGenesisStateLastProcessedBlock} from "./single/index.js"; +import {CheckpointStateRepository} from "./repositories/checkpointState.js"; /** * The DB service manages the data layer of the beacon chain @@ -36,6 +37,8 @@ export interface IBeaconDb { // finalized states stateArchive: StateArchiveRepository; + // checkpoint states + checkpointState: CheckpointStateRepository; // op pool voluntaryExit: VoluntaryExitRepository; diff --git a/packages/beacon-node/src/db/repositories/checkpointState.ts b/packages/beacon-node/src/db/repositories/checkpointState.ts new file mode 100644 index 000000000000..8848f4d26d3a --- /dev/null +++ b/packages/beacon-node/src/db/repositories/checkpointState.ts @@ -0,0 +1,31 @@ +import {ChainForkConfig} from "@lodestar/config"; +import {Db, Repository} from "@lodestar/db"; +import {BeaconStateAllForks} from "@lodestar/state-transition"; +import {ssz} from "@lodestar/types"; +import {Bucket, getBucketNameByValue} from "../buckets.js"; + +/** + * Store temporary checkpoint states. + * We should only put/get binary data from this repository, consumer will load it into an existing state ViewDU object. + */ +export class CheckpointStateRepository extends Repository { + constructor(config: ChainForkConfig, db: Db) { + // Pick some type but won't be used. Casted to any because no type can match `BeaconStateAllForks` + // eslint-disable-next-line @typescript-eslint/no-unsafe-assignment, @typescript-eslint/no-explicit-any + const type = ssz.phase0.BeaconState as any; + const bucket = Bucket.allForks_checkpointState; + super(config, db, bucket, type, getBucketNameByValue(bucket)); + } + + getId(): Uint8Array { + throw Error("CheckpointStateRepository does not work with value"); + } + + encodeValue(): Uint8Array { + throw Error("CheckpointStateRepository does not work with value"); + } + + decodeValue(): BeaconStateAllForks { + throw Error("CheckpointStateRepository does not work with value"); + } +} diff --git a/packages/beacon-node/src/metrics/metrics/lodestar.ts b/packages/beacon-node/src/metrics/metrics/lodestar.ts index 6c000685556c..ea2251b3dce5 100644 --- a/packages/beacon-node/src/metrics/metrics/lodestar.ts +++ b/packages/beacon-node/src/metrics/metrics/lodestar.ts @@ -16,6 +16,7 @@ import {PeerSyncType, RangeSyncType} from "../../sync/utils/remoteSyncType.js"; import {LodestarMetadata} from "../options.js"; import {RegistryMetricCreator} from "../utils/registryMetricCreator.js"; import {OpSource} from "../validatorMonitor.js"; +import {CacheItemType} from "../../chain/stateCache/types.js"; export type LodestarMetrics = ReturnType; @@ -1098,13 +1099,15 @@ export function createLodestarMetrics( name: "lodestar_cp_state_cache_adds_total", help: "Total number of items added in checkpoint state cache", }), - size: register.gauge({ + size: register.gauge<{type: CacheItemType}>({ name: "lodestar_cp_state_cache_size", help: "Checkpoint state cache size", + labelNames: ["type"], }), - epochSize: register.gauge({ + epochSize: register.gauge<{type: CacheItemType}>({ name: "lodestar_cp_state_epoch_size", help: "Checkpoint state cache size", + labelNames: ["type"], }), reads: register.avgMinMax({ name: "lodestar_cp_state_epoch_reads", @@ -1119,6 +1122,44 @@ export function createLodestarMetrics( help: "Histogram of cloned count per state every time state.clone() is called", buckets: [1, 2, 5, 10, 50, 250], }), + statePersistDuration: register.histogram({ + name: "lodestar_cp_state_cache_state_persist_seconds", + help: "Histogram of time to persist state to db", + buckets: [0.1, 0.5, 1, 2, 3, 4], + }), + statePruneFromMemoryCount: register.gauge({ + name: "lodestar_cp_state_cache_state_prune_from_memory_count", + help: "Total number of states pruned from memory", + }), + statePersistSecFromSlot: register.histogram({ + name: "lodestar_cp_state_cache_state_persist_seconds_from_slot", + help: "Histogram of time to persist state to db since the clock slot", + buckets: [0, 2, 4, 6, 8, 10, 12], + }), + stateReloadDuration: register.histogram({ + name: "lodestar_cp_state_cache_state_reload_seconds", + help: "Histogram of time to load state from db", + buckets: [0, 2, 4, 6, 8, 10, 12], + }), + stateReloadEpochDiff: register.histogram({ + name: "lodestar_cp_state_cache_state_reload_epoch_diff", + help: "Histogram of epoch difference between seed state epoch and loaded state epoch", + buckets: [0, 1, 2, 4, 8, 16, 32], + }), + stateReloadSecFromSlot: register.histogram({ + name: "lodestar_cp_state_cache_state_reload_seconds_from_slot", + help: "Histogram of time to load state from db since the clock slot", + buckets: [0, 2, 4, 6, 8, 10, 12], + }), + stateReloadDbReadTime: register.histogram({ + name: "lodestar_cp_state_cache_state_reload_db_read_seconds", + help: "Histogram of time to load state bytes from db", + buckets: [0.01, 0.05, 0.1, 0.2, 0.5], + }), + persistedStateRemoveCount: register.gauge({ + name: "lodestar_cp_state_cache_persisted_state_remove_count", + help: "Total number of persisted states removed", + }), }, balancesCache: { diff --git a/packages/beacon-node/src/util/array.ts b/packages/beacon-node/src/util/array.ts index 72f81fbee72b..a154ee1bbf34 100644 --- a/packages/beacon-node/src/util/array.ts +++ b/packages/beacon-node/src/util/array.ts @@ -45,6 +45,9 @@ export class LinkedList { return this._length; } + /** + * Add to the end of the list + */ push(data: T): void { if (this._length === 0) { this.tail = this.head = new Node(data); @@ -64,6 +67,9 @@ export class LinkedList { this._length++; } + /** + * Add to the beginning of the list + */ unshift(data: T): void { if (this._length === 0) { this.tail = this.head = new Node(data); @@ -83,6 +89,25 @@ export class LinkedList { this._length++; } + insertAfter(after: T, data: T): void { + const node = this.findNode(after); + if (!node) { + return; + } + + if (node === this.tail) { + this.push(data); + return; + } + + const newNode = new Node(data); + newNode.next = node.next; + newNode.prev = node; + node.next = newNode; + if (newNode.next) newNode.next.prev = newNode; + this._length++; + } + pop(): T | null { const oldTail = this.tail; if (!oldTail) return null; @@ -173,6 +198,48 @@ export class LinkedList { return false; } + /** + * Move an existing item to the head of the list. + * If the item is not found, do nothing. + */ + moveToHead(item: T): void { + // if this is head, do nothing + if (this.head?.data === item) { + return; + } + + const found = this.deleteFirst(item); + if (found) { + this.unshift(item); + } + } + + /** + * Move an existing item to the second position of the list. + * If the item is not found, do nothing. + */ + moveToSecond(item: T): void { + // if this is head or second, do nothing + if (this.head?.data === item || this.head?.next?.data === item) { + return; + } + + const found = this.deleteFirst(item); + if (found) { + if (this.head?.next) { + const oldSecond = this.head.next; + const newSecond = new Node(item); + this.head.next = newSecond; + newSecond.next = oldSecond; + newSecond.prev = this.head; + oldSecond.prev = newSecond; + } else { + // only 1 item in the list + this.push(item); + } + } + } + next(): IteratorResult { if (!this.pointer) { return {done: true, value: undefined}; @@ -222,4 +289,23 @@ export class LinkedList { return arr; } + + /** + * Check if the item is in the list. + * @returns + */ + has(item: T): boolean { + return this.findNode(item) !== null; + } + + private findNode(item: T): Node | null { + let node = this.head; + while (node) { + if (node.data === item) { + return node; + } + node = node.next; + } + return null; + } } diff --git a/packages/beacon-node/test/unit/chain/stateCache/fifoBlockStateCache.test.ts b/packages/beacon-node/test/unit/chain/stateCache/fifoBlockStateCache.test.ts new file mode 100644 index 000000000000..62f2bff13d19 --- /dev/null +++ b/packages/beacon-node/test/unit/chain/stateCache/fifoBlockStateCache.test.ts @@ -0,0 +1,120 @@ +import {describe, it, expect, beforeEach} from "vitest"; +import {toHexString} from "@chainsafe/ssz"; +import {EpochShuffling} from "@lodestar/state-transition"; +import {SLOTS_PER_EPOCH} from "@lodestar/params"; +import {CachedBeaconStateAllForks} from "@lodestar/state-transition/src/types.js"; +import {FIFOBlockStateCache} from "../../../../src/chain/stateCache/index.js"; +import {generateCachedState} from "../../../utils/state.js"; + +describe("FIFOBlockStateCache", function () { + let cache: FIFOBlockStateCache; + const shuffling: EpochShuffling = { + epoch: 0, + activeIndices: [], + shuffling: [], + committees: [], + committeesPerSlot: 1, + }; + + const state1 = generateCachedState({slot: 0}); + const key1 = toHexString(state1.hashTreeRoot()); + state1.epochCtx.currentShuffling = {...shuffling, epoch: 0}; + + const state2 = generateCachedState({slot: 1 * SLOTS_PER_EPOCH}); + const key2 = toHexString(state2.hashTreeRoot()); + state2.epochCtx.currentShuffling = {...shuffling, epoch: 1}; + + const state3 = generateCachedState({slot: 2 * SLOTS_PER_EPOCH}); + const key3 = toHexString(state3.hashTreeRoot()); + state3.epochCtx.currentShuffling = {...shuffling, epoch: 2}; + + beforeEach(function () { + // max 2 items + cache = new FIFOBlockStateCache({maxBlockStates: 2}, {}); + cache.add(state1); + cache.add(state2); + }); + + const testCases: { + name: string; + headState: CachedBeaconStateAllForks; + addAsHeadArr: boolean[]; + keptStates: string[]; + prunedState: string; + }[] = [ + { + name: "add as head, prune key1", + headState: state2, + addAsHeadArr: [true], + keptStates: [key3, key2], + prunedState: key1, + }, + { + name: "add, prune key1", + headState: state2, + addAsHeadArr: [false], + keptStates: [key2, key3], + prunedState: key1, + }, + { + name: "add as head, prune key2", + headState: state1, + addAsHeadArr: [true], + keptStates: [key3, key1], + prunedState: key2, + }, + { + name: "add, prune key2", + headState: state1, + addAsHeadArr: [false], + keptStates: [key1, key3], + prunedState: key2, + }, + // same flow to importBlock + { + name: "add then set as head, prune key1", + headState: state2, + addAsHeadArr: [false, true], + keptStates: [key3, key2], + prunedState: key1, + }, + { + name: "add then set as head, prune key2", + headState: state1, + addAsHeadArr: [false, true], + keptStates: [key3, key1], + prunedState: key2, + }, + ]; + + for (const {name, headState, addAsHeadArr, keptStates, prunedState} of testCases) { + it(name, () => { + // move to head this state + cache.setHeadState(headState); + expect(cache.size).to.be.equal(2, "Size must be same as initial 2"); + for (const addAsHead of addAsHeadArr) { + cache.add(state3, addAsHead); + } + expect(cache.size).to.be.equal(2, "Size should reduce to initial 2 after prunning"); + expect(cache.dumpKeyOrder()).toEqual(keptStates); + expect(cache.get(prunedState)).toBeNull(); + for (const key of keptStates) { + expect(cache.get(key), `must have key ${key}`).to.be.not.null; + } + }); + } + + it("Should not prune newly added state", () => { + cache = new FIFOBlockStateCache({maxBlockStates: 1}, {}); + cache.setHeadState(state1); + // Size must be same as initial 1 + expect(cache.size).toEqual(1); + cache.add(state2); + // Should not deleted newly added state + expect(cache.size).toEqual(2); + cache.add(state3); + // Should delete 1 state + expect(cache.size).toEqual(2); + expect(cache.dumpKeyOrder()).toEqual([key1, key3]); + }); +}); diff --git a/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts new file mode 100644 index 000000000000..83a2dddd65dd --- /dev/null +++ b/packages/beacon-node/test/unit/chain/stateCache/persistentCheckpointsCache.test.ts @@ -0,0 +1,954 @@ +import {describe, it, expect, beforeAll, beforeEach} from "vitest"; +import {SLOTS_PER_EPOCH, SLOTS_PER_HISTORICAL_ROOT} from "@lodestar/params"; +import {CachedBeaconStateAllForks, computeEpochAtSlot, computeStartSlotAtEpoch} from "@lodestar/state-transition"; +import {RootHex, phase0} from "@lodestar/types"; +import {mapValues, toHexString} from "@lodestar/utils"; +import {PersistentCheckpointStateCache} from "../../../../src/chain/stateCache/persistentCheckpointsCache.js"; +import {checkpointToDatastoreKey} from "../../../../src/chain/stateCache/datastore/index.js"; +import {generateCachedState} from "../../../utils/state.js"; +import {ShufflingCache} from "../../../../src/chain/shufflingCache.js"; +import {testLogger} from "../../../utils/logger.js"; +import {getTestDatastore} from "../../../utils/chain/stateCache/datastore.js"; +import {CheckpointHex} from "../../../../src/chain/stateCache/types.js"; +import {toCheckpointHex} from "../../../../src/chain/index.js"; + +describe("PersistentCheckpointStateCache", function () { + let root0a: Buffer, root0b: Buffer, root1: Buffer, root2: Buffer; + let cp0a: phase0.Checkpoint, cp0b: phase0.Checkpoint, cp1: phase0.Checkpoint, cp2: phase0.Checkpoint; + let cp0aHex: CheckpointHex, cp0bHex: CheckpointHex, cp1Hex: CheckpointHex, cp2Hex: CheckpointHex; + let persistent0bKey: RootHex; + const startSlotEpoch20 = computeStartSlotAtEpoch(20); + const startSlotEpoch21 = computeStartSlotAtEpoch(21); + const startSlotEpoch22 = computeStartSlotAtEpoch(22); + let cache: PersistentCheckpointStateCache; + let fileApisBuffer: Map; + let states: Record<"cp0a" | "cp0b" | "cp1" | "cp2", CachedBeaconStateAllForks>; + let stateBytes: Record<"cp0a" | "cp0b" | "cp1" | "cp2", Uint8Array>; + + beforeAll(() => { + root0a = Buffer.alloc(32); + root0b = Buffer.alloc(32, 1); + root1 = Buffer.alloc(32, 2); + root2 = Buffer.alloc(32, 3); + root0b[31] = 1; + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b--------root1--------root2 + // | + // 0a + // root0a is of the last slot of epoch 19 + cp0a = {epoch: 20, root: root0a}; + // root0b is of the first slot of epoch 20 + cp0b = {epoch: 20, root: root0b}; + cp1 = {epoch: 21, root: root1}; + cp2 = {epoch: 22, root: root2}; + [cp0aHex, cp0bHex, cp1Hex, cp2Hex] = [cp0a, cp0b, cp1, cp2].map((cp) => toCheckpointHex(cp)); + persistent0bKey = toHexString(checkpointToDatastoreKey(cp0b)); + const allStates = [cp0a, cp0b, cp1, cp2] + .map((cp) => generateCachedState({slot: cp.epoch * SLOTS_PER_EPOCH})) + .map((state, i) => { + const stateEpoch = computeEpochAtSlot(state.slot); + if (stateEpoch === 20 && i === 0) { + // cp0a + state.blockRoots.set((startSlotEpoch20 - 1) % SLOTS_PER_HISTORICAL_ROOT, root0a); + state.blockRoots.set(startSlotEpoch20 % SLOTS_PER_HISTORICAL_ROOT, root0a); + return state; + } + + // other states based on cp0b + state.blockRoots.set((startSlotEpoch20 - 1) % SLOTS_PER_HISTORICAL_ROOT, root0a); + state.blockRoots.set(startSlotEpoch20 % SLOTS_PER_HISTORICAL_ROOT, root0b); + + if (stateEpoch >= 21) { + state.blockRoots.set(startSlotEpoch21 % SLOTS_PER_HISTORICAL_ROOT, root1); + } + if (stateEpoch >= 22) { + state.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root2); + } + return state; + }); + + states = { + // Previous Root Checkpoint State of epoch 20 + cp0a: allStates[0], + // Current Root Checkpoint State of epoch 20 + cp0b: allStates[1], + // Current Root Checkpoint State of epoch 21 + cp1: allStates[2], + // Current Root Checkpoint State of epoch 22 + cp2: allStates[3], + }; + stateBytes = mapValues(states, (state) => state.serialize()); + }); + + beforeEach(() => { + fileApisBuffer = new Map(); + const datastore = getTestDatastore(fileApisBuffer); + cache = new PersistentCheckpointStateCache( + {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, + {maxCPStateEpochsInMemory: 2} + ); + cache.add(cp0a, states["cp0a"]); + cache.add(cp0b, states["cp0b"]); + cache.add(cp1, states["cp1"]); + }); + + it("getLatest", () => { + // cp0 + expect(cache.getLatest(cp0aHex.rootHex, cp0a.epoch)?.hashTreeRoot()).toEqual(states["cp0a"].hashTreeRoot()); + expect(cache.getLatest(cp0aHex.rootHex, cp0a.epoch + 1)?.hashTreeRoot()).toEqual(states["cp0a"].hashTreeRoot()); + expect(cache.getLatest(cp0aHex.rootHex, cp0a.epoch - 1)?.hashTreeRoot()).to.be.undefined; + + // cp1 + expect(cache.getLatest(cp1Hex.rootHex, cp1.epoch)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(cache.getLatest(cp1Hex.rootHex, cp1.epoch + 1)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(cache.getLatest(cp1Hex.rootHex, cp1.epoch - 1)?.hashTreeRoot()).to.be.undefined; + + // cp2 + expect(cache.getLatest(cp2Hex.rootHex, cp2.epoch)?.hashTreeRoot()).to.be.undefined; + }); + + it("getOrReloadLatest", async () => { + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + + // cp0b is persisted + expect(fileApisBuffer.size).toEqual(1); + expect(Array.from(fileApisBuffer.keys())).toEqual([persistent0bKey]); + + // getLatest() does not reload from disk + expect(cache.getLatest(cp0aHex.rootHex, cp0a.epoch)).to.be.null; + expect(cache.getLatest(cp0bHex.rootHex, cp0b.epoch)).to.be.null; + + // cp0a has the root from previous epoch so we only prune it from db + expect(await cache.getOrReloadLatest(cp0aHex.rootHex, cp0a.epoch)).to.be.null; + // but getOrReloadLatest() does for cp0b + expect((await cache.getOrReloadLatest(cp0bHex.rootHex, cp0b.epoch))?.serialize()).toEqual(stateBytes["cp0b"]); + expect((await cache.getOrReloadLatest(cp0bHex.rootHex, cp0b.epoch + 1))?.serialize()).toEqual(stateBytes["cp0b"]); + expect((await cache.getOrReloadLatest(cp0bHex.rootHex, cp0b.epoch - 1))?.serialize()).to.be.undefined; + }); + + it("pruneFinalized and getStateOrBytes", async function () { + cache.add(cp2, states["cp2"]); + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + // cp0 is persisted + expect(fileApisBuffer.size).toEqual(1); + expect(Array.from(fileApisBuffer.keys())).toEqual([persistent0bKey]); + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + // cp1 is in memory + expect(cache.get(cp1Hex)).to.be.not.null; + // cp2 is in memory + expect(cache.get(cp2Hex)).to.be.not.null; + // finalize epoch cp2 + cache.pruneFinalized(cp2.epoch); + expect(fileApisBuffer.size).toEqual(0); + expect(cache.get(cp1Hex)).to.be.null; + expect(cache.get(cp2Hex)).to.be.not.null; + expect(await cache.getStateOrBytes(cp0bHex)).to.be.null; + }); + + describe("findSeedStateToReload", () => { + beforeEach(() => { + fileApisBuffer = new Map(); + const datastore = getTestDatastore(fileApisBuffer); + cache = new PersistentCheckpointStateCache( + {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, + {maxCPStateEpochsInMemory: 2} + ); + cache.add(cp0a, states["cp0a"]); + cache.add(cp0b, states["cp0b"]); + cache.add(cp1, states["cp1"]); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b--------root1--------root2 + // | + // 0a + it("single state at lowest memory epoch", async function () { + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + expect(cache.findSeedStateToReload(cp0aHex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(cache.findSeedStateToReload(cp0bHex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b--------root1--------root2 | + // | | + // 0a------------------------------root3 + // ^ ^ + // cp1a={0a, 21} {0a, 22}=cp2a + it("multiple states at lowest memory epoch", async function () { + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + + const cp1a = {epoch: 21, root: root0a}; + const cp1aState = states["cp0a"].clone(); + cp1aState.slot = 21 * SLOTS_PER_EPOCH; + cp1aState.blockRoots.set(startSlotEpoch21 % SLOTS_PER_HISTORICAL_ROOT, root0a); + cp1aState.commit(); + cache.add(cp1a, cp1aState); + + const cp2a = {epoch: 22, root: root0a}; + const cp2aState = cp1aState.clone(); + cp2aState.slot = 22 * SLOTS_PER_EPOCH; + cp2aState.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root0a); + cp2aState.commit(); + cache.add(cp2a, cp2aState); + + const root3 = Buffer.alloc(32, 100); + const state3 = cp2aState.clone(); + state3.slot = 22 * SLOTS_PER_EPOCH + 3; + state3.commit(); + await cache.processState(toHexString(root3), state3); + + // state of {0a, 21} is choosen because it was built from cp0a + expect(cache.findSeedStateToReload(cp0aHex)?.hashTreeRoot()).toEqual(cp1aState.hashTreeRoot()); + // cp1 is choosen for 0b because it was built from cp0b + expect(cache.findSeedStateToReload(cp0bHex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + const randomRoot = Buffer.alloc(32, 101); + // for other random root it'll pick the first state of epoch 21 which is states["cp1"] + expect(cache.findSeedStateToReload({epoch: 20, rootHex: toHexString(randomRoot)})?.hashTreeRoot()).toEqual( + states["cp1"].hashTreeRoot() + ); + }); + }); + + describe("processState, maxEpochsInMemory = 2", () => { + beforeEach(() => { + fileApisBuffer = new Map(); + const datastore = getTestDatastore(fileApisBuffer); + cache = new PersistentCheckpointStateCache( + {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, + {maxCPStateEpochsInMemory: 2} + ); + cache.add(cp0a, states["cp0a"]); + cache.add(cp0b, states["cp0b"]); + cache.add(cp1, states["cp1"]); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b--------root1--------root2-----root3 + // | + // 0a + it("no reorg", async function () { + expect(fileApisBuffer.size).toEqual(0); + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + expect(cache.get(cp2Hex)?.hashTreeRoot()).toEqual(states["cp2"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + const blockStateRoot3 = states["cp2"].clone(); + blockStateRoot3.slot = 22 * SLOTS_PER_EPOCH + 3; + const root3 = Buffer.alloc(32, 100); + // process state of root3 + await cache.processState(toHexString(root3), blockStateRoot3); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + // epoch 22 has 1 checkpoint state + expect(cache.get(cp2Hex)).to.be.not.null; + // epoch 21 has 1 checkpoint state + expect(cache.get(cp1Hex)).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0bHex)).to.be.null; + // but cp0bHex is persisted + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + // while cp0aHex is not + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ ^ + // || | | | | + // |0b--------root1--------root2-root3 | + // | | + // 0a |---------root4 + it("reorg in same epoch", async function () { + // mostly the same to the above test + expect(fileApisBuffer.size).toEqual(0); + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + expect(cache.get(cp2Hex)?.hashTreeRoot()).toEqual(states["cp2"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + const blockStateRoot3 = states["cp2"].clone(); + blockStateRoot3.slot = 22 * SLOTS_PER_EPOCH + 3; + const root3 = Buffer.alloc(32, 100); + // process state of root3 + await cache.processState(toHexString(root3), blockStateRoot3); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + const blockStateRoot4 = states["cp2"].clone(); + blockStateRoot4.slot = 22 * SLOTS_PER_EPOCH + 4; + const root4 = Buffer.alloc(32, 101); + // process state of root4 + await cache.processState(toHexString(root4), blockStateRoot4); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + // epoch 22 has 1 checkpoint state + expect(cache.get(cp2Hex)).to.be.not.null; + // epoch 21 has 1 checkpoint state + expect(cache.get(cp1Hex)).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0bHex)).to.be.null; + // but cp0bHex is persisted + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + // while cp0aHex is not + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^ ^ ^ ^ ^ + // | | | | | + // 0b---------root1-----|-root2 | + // | | + // |------root3 + // 1a ^ + // | + // {1a, 22}=cp2a + it("reorg 1 epoch", async function () { + // process root2 state + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + // regen generates cp2a + const root1a = Buffer.alloc(32, 100); + const cp2a = {epoch: 22, root: root1a}; + const cp2aState = states["cp1"].clone(); + cp2aState.slot = 22 * SLOTS_PER_EPOCH; + // assuming reorg block is at slot 5 of epoch 21 + cp2aState.blockRoots.set((startSlotEpoch21 + 5) % SLOTS_PER_HISTORICAL_ROOT, root1a); + cp2aState.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root1a); + cache.add(cp2a, cp2aState); + + // block state of root3 in epoch 22 is built on cp2a + const blockStateRoot3 = cp2aState.clone(); + blockStateRoot3.slot = 22 * SLOTS_PER_EPOCH + 3; + + const root3 = Buffer.alloc(32, 101); + // process state of root3 + await cache.processState(toHexString(root3), blockStateRoot3); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + // epoch 22 has 2 checkpoint states + expect(cache.get(cp2Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp2a))).to.be.not.null; + // epoch 21 has 1 checkpoint state + expect(cache.get(cp1Hex)).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^ ^ ^ ^ ^ + // | | | | | + // 0b--------|root1-------root2 | + // | | + // |-----------------root3 + // 0a ^ ^ + // | | + // cp1a={0a, 21} {0a, 22}=cp2a + it("reorg 2 epochs", async function () { + // process root2 state + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + // reload cp0b from disk + expect((await cache.getOrReload(toCheckpointHex(cp0b)))?.serialize()).toStrictEqual(stateBytes["cp0b"]); + + // regen generates cp1a + const root0a = Buffer.alloc(32, 100); + const cp1a = {epoch: 21, root: root0a}; + const cp1aState = states["cp0b"].clone(); + cp1aState.slot = 21 * SLOTS_PER_EPOCH; + // assuming reorg block is at slot 5 of epoch 20 + cp1aState.blockRoots.set((startSlotEpoch20 + 5) % SLOTS_PER_HISTORICAL_ROOT, root0a); + cache.add(cp1a, cp1aState); + + // regen generates cp2a + const cp2a = {epoch: 22, root: root0a}; + const cp2aState = cp1aState.clone(); + cp2aState.slot = 22 * SLOTS_PER_EPOCH; + cp2aState.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root0a); + cache.add(cp2a, cp2aState); + + // block state of root3 in epoch 22 is built on cp2a + const blockStateRoot3 = cp2aState.clone(); + blockStateRoot3.slot = 22 * SLOTS_PER_EPOCH + 3; + + const root3 = Buffer.alloc(32, 101); + // process state of root3 + await cache.processState(toHexString(root3), blockStateRoot3); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + // epoch 21 and 22 have 2 checkpoint states + expect(cache.get(cp1Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1a))).to.be.not.null; + expect(cache.get(cp2Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp2a))).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b--------root1--------root2 | + // |/ | + // 0a---------------------------root3 + // ^ ^ + // | | + // cp1a={0a, 21} {0a, 22}=cp2a + it("reorg 3 epochs, persist cp 0a", async function () { + // process root2 state + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + // cp0a was pruned from memory and not in disc + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + + // regen needs to regen cp0a + cache.add(cp0a, states["cp0a"]); + + // regen generates cp1a + const cp1a = {epoch: 21, root: root0a}; + const cp1aState = generateCachedState({slot: 21 * SLOTS_PER_EPOCH}); + cp1aState.blockRoots.set((startSlotEpoch20 - 1) % SLOTS_PER_HISTORICAL_ROOT, root0a); + cp1aState.blockRoots.set(startSlotEpoch20 % SLOTS_PER_HISTORICAL_ROOT, root0a); + cache.add(cp1a, cp1aState); + + // regen generates cp2a + const cp2a = {epoch: 22, root: root0a}; + const cp2aState = cp1aState.clone(); + cp2aState.slot = 22 * SLOTS_PER_EPOCH; + cp2aState.blockRoots.set(startSlotEpoch21 % SLOTS_PER_HISTORICAL_ROOT, root0a); + cache.add(cp2a, cp2aState); + + // block state of root3 in epoch 22 is built on cp2a + const blockStateRoot3 = cp2aState.clone(); + blockStateRoot3.slot = 22 * SLOTS_PER_EPOCH + 3; + blockStateRoot3.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root0a); + + // regen populates cache when producing blockStateRoot3 + + const root3 = Buffer.alloc(32, 100); + // process state of root3 + expect(await cache.processState(toHexString(root3), blockStateRoot3)).toEqual(1); + await assertPersistedCheckpointState([cp0b, cp0a], [stateBytes["cp0b"], stateBytes["cp0a"]]); + // epoch 21 and 22 have 2 checkpoint states + expect(cache.get(cp1Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1a))).to.be.not.null; + expect(cache.get(cp2Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp2a))).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b--------root1--------root2 | + // || | + // ||---------------------------root3 + // 0a ^ ^ + // | | + // cp1b={0b, 21} {0b, 22}=cp2b + it("reorg 3 epochs, prune but no persist", async function () { + // process root2 state + cache.add(cp2, states["cp2"]); + expect(await cache.processState(toHexString(cp2.root), states["cp2"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + // cp0a was pruned from memory and not in disc + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + + // regen needs to reload cp0b + cache.add(cp0b, states["cp0b"]); + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + + // regen generates cp1b + const cp1b = {epoch: 21, root: root0b}; + const cp1bState = states["cp0b"].clone(); + cp1bState.slot = 21 * SLOTS_PER_EPOCH; + cp1bState.blockRoots.set(startSlotEpoch21 % SLOTS_PER_HISTORICAL_ROOT, root0b); + cache.add(cp1b, cp1bState); + + // regen generates cp2b + const cp2b = {epoch: 22, root: root0b}; + const cp2bState = cp1bState.clone(); + cp2bState.slot = 22 * SLOTS_PER_EPOCH; + cp2bState.blockRoots.set(startSlotEpoch22 % SLOTS_PER_HISTORICAL_ROOT, root0b); + cache.add(cp2b, cp2bState); + + // block state of root3 in epoch 22 is built on cp2a + const blockStateRoot3 = cp2bState.clone(); + blockStateRoot3.slot = 22 * SLOTS_PER_EPOCH + 3; + const root3 = Buffer.alloc(32, 100); + // process state of root3, nothing is persisted + expect(await cache.processState(toHexString(root3), blockStateRoot3)).toEqual(0); + // but state of cp0b is pruned from memory + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + // epoch 21 and 22 have 2 checkpoint states + expect(cache.get(cp1Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1b))).to.be.not.null; + expect(cache.get(cp2Hex)).to.be.not.null; + expect(cache.get(toCheckpointHex(cp2b))).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + }); + }); + + describe("processState, maxEpochsInMemory = 1", () => { + beforeEach(() => { + fileApisBuffer = new Map(); + const datastore = getTestDatastore(fileApisBuffer); + cache = new PersistentCheckpointStateCache( + {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, + {maxCPStateEpochsInMemory: 1} + ); + cache.add(cp0a, states["cp0a"]); + cache.add(cp0b, states["cp0b"]); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b--------root1--root2 + // | + // 0a + it("no reorg", async () => { + expect(fileApisBuffer.size).toEqual(0); + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + const blockStateRoot2 = states["cp1"].clone(); + blockStateRoot2.slot = 21 * SLOTS_PER_EPOCH + 3; + const root2 = Buffer.alloc(32, 100); + // process state of root2 + await cache.processState(toHexString(root2), blockStateRoot2); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + + // epoch 21 has 1 checkpoint state + expect(cache.get(cp1Hex)).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + // but cp0bHex is persisted + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + // while cp0aHex is not + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b--------root1--root2 | + // | |---------root3 + // 0a + it("reorg in same epoch", async () => { + // almost the same to "no reorg" test + expect(fileApisBuffer.size).toEqual(0); + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + const blockStateRoot2 = states["cp1"].clone(); + blockStateRoot2.slot = 21 * SLOTS_PER_EPOCH + 3; + const root2 = Buffer.alloc(32, 100); + // process state of root2 + await cache.processState(toHexString(root2), blockStateRoot2); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + + const blockStateRoot3 = states["cp1"].clone(); + blockStateRoot3.slot = 21 * SLOTS_PER_EPOCH + 4; + const root3 = Buffer.alloc(32, 101); + // process state of root3 + await cache.processState(toHexString(root3), blockStateRoot3); + + // epoch 21 has 1 checkpoint state + expect(cache.get(cp1Hex)).to.be.not.null; + // epoch 20 has 0 checkpoint state + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + // but cp0bHex is persisted + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + // while cp0aHex is not + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b----1a--root1 | + // | |----|-------root2 + // 0a | + // cp1a={1a, 21} + it("reorg 1 epoch, no persist 1a", async () => { + // root 1a + expect(fileApisBuffer.size).toEqual(0); + const root1a = Buffer.alloc(32, 100); + const state1a = states["cp0b"].clone(); + state1a.slot = 20 * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH - 1; + state1a.blockRoots.set(state1a.slot % SLOTS_PER_HISTORICAL_ROOT, root1a); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(0); + expect(fileApisBuffer.size).toEqual(0); + await assertPersistedCheckpointState([], []); + + // cp1 + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + + // root2, regen cp1a + const cp1aState = state1a.clone(); + cp1aState.slot = 21 * SLOTS_PER_EPOCH; + const cp1a = {epoch: 21, root: root1a}; + cache.add(cp1a, cp1aState); + const blockStateRoot2 = cp1aState.clone(); + blockStateRoot2.slot = 21 * SLOTS_PER_EPOCH + 3; + const root2 = Buffer.alloc(32, 100); + // process state of root2 + expect(await cache.processState(toHexString(root2), blockStateRoot2)).toEqual(0); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + // keep these 2 cp states at epoch 21 + expect(cache.get(toCheckpointHex(cp1a))).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1))).to.be.not.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b--------root1 | + // ||-----------|-------root2 + // 0a {21, 1b}=cp1b + it("reorg 1 epoch, no persist 0b", async () => { + expect(fileApisBuffer.size).toEqual(0); + // cp1 + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + + // simulate regen + cache.add(cp0b, states["cp0b"]); + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + // root2, regen cp0b + const cp1bState = states["cp0b"].clone(); + cp1bState.slot = 21 * SLOTS_PER_EPOCH; + const cp1b = {epoch: 21, root: root0b}; + cache.add(cp1b, cp1bState); + const blockStateRoot2 = cp1bState.clone(); + blockStateRoot2.slot = 21 * SLOTS_PER_EPOCH + 3; + const root2 = Buffer.alloc(32, 100); + // process state of root2, nothing is persisted + expect(await cache.processState(toHexString(root2), blockStateRoot2)).toEqual(0); + + // but cp0b in-memory state is pruned + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + // keep these 2 cp states at epoch 21 + expect(cache.get(toCheckpointHex(cp1b))).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1))).to.be.not.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ ^ + // || | | | + // |0b-----|--root1 | + // | | | | + // 0a-----1a----|-------root2 + // | + // cp1a={1a, 21} + it("reorg 1 epoch, persist one more checkpoint state", async () => { + // root 1a + expect(fileApisBuffer.size).toEqual(0); + const root1a = Buffer.alloc(32, 100); + const state1a = states["cp0a"].clone(); + state1a.slot = 20 * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH - 1; + state1a.blockRoots.set(state1a.slot % SLOTS_PER_HISTORICAL_ROOT, root1a); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(0); + expect(fileApisBuffer.size).toEqual(0); + // at epoch 20, there should be 2 cps in memory + expect(cache.get(cp0aHex)).to.be.not.null; + expect(cache.get(cp0bHex)).to.be.not.null; + await assertPersistedCheckpointState([], []); + + // cp1 + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + // 2 checkpoint states at epoch 20 are pruned + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + // only cp0bHex is persisted + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + + // root2, regen cp0a + cache.add(cp0a, states["cp0a"]); + const cp1aState = state1a.clone(); + cp1aState.slot = 21 * SLOTS_PER_EPOCH; + const cp1a = {epoch: 21, root: root1a}; + cache.add(cp1a, cp1aState); + const blockStateRoot2 = cp1aState.clone(); + blockStateRoot2.slot = 21 * SLOTS_PER_EPOCH + 3; + const root2 = Buffer.alloc(32, 100); + // process state of root2, persist cp0a + expect(await cache.processState(toHexString(root2), blockStateRoot2)).toEqual(1); + await assertPersistedCheckpointState([cp0b, cp0a], [stateBytes["cp0b"], stateBytes["cp0a"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + // keep these 2 cp states at epoch 21 + expect(cache.get(toCheckpointHex(cp1a))).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1))).to.be.not.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b--------root1 | + // | | | + // 0a-----------|-------root2 + // | + // cp1a={0a, 21} + it("reorg 2 epochs", async () => { + // cp1 + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + expect(fileApisBuffer.size).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + // 2 checkpoint states at epoch 20 are pruned + expect(cache.get(cp0aHex)).to.be.null; + expect(cache.get(cp0bHex)).to.be.null; + // only cp0bHex is persisted + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + + // root2, regen cp0a + cache.add(cp0a, states["cp0a"]); + const cp1aState = states["cp0a"].clone(); + cp1aState.slot = 21 * SLOTS_PER_EPOCH; + const cp1a = {epoch: 21, root: root0a}; + cache.add(cp1a, cp1aState); + const blockStateRoot2 = cp1aState.clone(); + blockStateRoot2.slot = 21 * SLOTS_PER_EPOCH + 3; + const root2 = Buffer.alloc(32, 100); + // process state of root2, persist cp0a + expect(await cache.processState(toHexString(root2), blockStateRoot2)).toEqual(1); + await assertPersistedCheckpointState([cp0b, cp0a], [stateBytes["cp0b"], stateBytes["cp0a"]]); + expect(cache.get(cp1Hex)?.hashTreeRoot()).toEqual(states["cp1"].hashTreeRoot()); + // keep these 2 cp states at epoch 21 + expect(cache.get(toCheckpointHex(cp1a))).to.be.not.null; + expect(cache.get(toCheckpointHex(cp1))).to.be.not.null; + }); + + describe("processState, maxEpochsInMemory = 0", () => { + beforeEach(() => { + fileApisBuffer = new Map(); + const datastore = getTestDatastore(fileApisBuffer); + cache = new PersistentCheckpointStateCache( + {datastore, logger: testLogger(), shufflingCache: new ShufflingCache()}, + {maxCPStateEpochsInMemory: 0} + ); + cache.add(cp0a, states["cp0a"]); + cache.add(cp0b, states["cp0b"]); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ + // || | + // |0b --root1a + // | + // 0a + it("no reorg", async () => { + expect(await cache.processState(toHexString(root0b), states["cp0b"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + + const root1a = Buffer.alloc(32, 100); + const state1a = states["cp0b"].clone(); + state1a.slot = 20 * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH + 3; + state1a.blockRoots.set(state1a.slot % SLOTS_PER_HISTORICAL_ROOT, root1a); + expect(await cache.processState(toHexString(root1a), state1a)).toEqual(0); + + // nothing change + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b --root1a| + // | \ | + // 0a \------root1b + it("reorg in same epoch", async () => { + expect(await cache.processState(toHexString(root0b), states["cp0b"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + + const root1a = Buffer.alloc(32, 100); + const state1a = states["cp0b"].clone(); + state1a.slot = 20 * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH + 3; + state1a.blockRoots.set(state1a.slot % SLOTS_PER_HISTORICAL_ROOT, root1a); + expect(await cache.processState(toHexString(root1a), state1a)).toEqual(0); + + // nothing change + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + + // simulate reload cp1b + cache.add(cp0b, states["cp0b"]); + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(states["cp0b"]); + const root1b = Buffer.alloc(32, 101); + const state1b = states["cp0b"].clone(); + state1b.slot = state1a.slot + 1; + state1b.blockRoots.set(state1b.slot % SLOTS_PER_HISTORICAL_ROOT, root1b); + // but no need to persist cp1b + expect(await cache.processState(toHexString(root1b), state1b)).toEqual(0); + // although states["cp0b"] is pruned + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b --root1a| + // | | + // 0a---------root1b + it("reorg 1 epoch", async () => { + expect(await cache.processState(toHexString(root0b), states["cp0b"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + + const root1a = Buffer.alloc(32, 100); + const state1a = states["cp0b"].clone(); + state1a.slot = 20 * SLOTS_PER_EPOCH + SLOTS_PER_EPOCH + 3; + state1a.blockRoots.set(state1a.slot % SLOTS_PER_HISTORICAL_ROOT, root1a); + expect(await cache.processState(toHexString(root1a), state1a)).toEqual(0); + + // nothing change + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + + const root1b = Buffer.alloc(32, 101); + const state1b = states["cp0a"].clone(); + state1b.slot = state1a.slot + 1; + state1b.blockRoots.set(state1b.slot % SLOTS_PER_HISTORICAL_ROOT, root1b); + // regen should reload cp0a from disk + cache.add(cp0a, states["cp0a"]); + expect(await cache.processState(toHexString(root1b), state1b)).toEqual(1); + await assertPersistedCheckpointState([cp0b, cp0a], [stateBytes["cp0b"], stateBytes["cp0a"]]); + + // both cp0a and cp0b are persisted + expect(await cache.getStateOrBytes(cp0aHex)).toEqual(stateBytes["cp0a"]); + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + }); + + // epoch: 19 20 21 22 23 + // |-----------|-----------|-----------|-----------| + // ^^ ^ ^ + // || | | + // |0b--------root1 | + // | | + // 0a-----------------root2 + // ^ + // {0a, 21}=cp1a + it("reorg 2 epochs", async () => { + expect(await cache.processState(toHexString(root0b), states["cp0b"])).toEqual(1); + await assertPersistedCheckpointState([cp0b], [stateBytes["cp0b"]]); + expect(await cache.getStateOrBytes(cp0aHex)).to.be.null; + expect(await cache.getStateOrBytes(cp0bHex)).toEqual(stateBytes["cp0b"]); + + cache.add(cp1, states["cp1"]); + expect(await cache.processState(toHexString(cp1.root), states["cp1"])).toEqual(1); + await assertPersistedCheckpointState([cp0b, cp1], [stateBytes["cp0b"], stateBytes["cp1"]]); + + // regen should populate cp0a and cp1a checkpoint states + cache.add(cp0a, states["cp0a"]); + const cp1a = {epoch: 21, root: root0a}; + const cp1aState = states["cp0a"].clone(); + cp1aState.blockRoots.set((20 * SLOTS_PER_EPOCH) % SLOTS_PER_HISTORICAL_ROOT, root0a); + cp1aState.blockRoots.set((21 * SLOTS_PER_EPOCH) % SLOTS_PER_HISTORICAL_ROOT, root0a); + cp1aState.slot = 21 * SLOTS_PER_EPOCH; + cache.add(cp1a, cp1aState); + + const root2 = Buffer.alloc(32, 100); + const state2 = cp1aState.clone(); + state2.slot = 21 * SLOTS_PER_EPOCH + 3; + state2.blockRoots.set(state2.slot % SLOTS_PER_HISTORICAL_ROOT, root2); + expect(await cache.processState(toHexString(root2), state2)).toEqual(2); + // expect 4 cp states are persisted + await assertPersistedCheckpointState( + [cp0b, cp1, cp0a, cp1a], + [stateBytes["cp0b"], stateBytes["cp1"], stateBytes["cp0a"], cp1aState.serialize()] + ); + }); + }); + }); + + async function assertPersistedCheckpointState(cps: phase0.Checkpoint[], stateBytesArr: Uint8Array[]): Promise { + const persistedKeys = cps.map((cp) => toHexString(checkpointToDatastoreKey(cp))); + expect(Array.from(fileApisBuffer.keys())).toStrictEqual(persistedKeys); + for (const [i, persistedKey] of persistedKeys.entries()) { + expect(fileApisBuffer.get(persistedKey)).toStrictEqual(stateBytesArr[i]); + } + for (const [i, cp] of cps.entries()) { + const cpHex = toCheckpointHex(cp); + expect(await cache.getStateOrBytes(cpHex)).toStrictEqual(stateBytesArr[i]); + // simple get() does not reload from disk + expect(cache.get(cpHex)).to.be.null; + } + } +}); diff --git a/packages/beacon-node/test/unit/util/array.test.ts b/packages/beacon-node/test/unit/util/array.test.ts index 5ca275d5a278..d505d27c2e9f 100644 --- a/packages/beacon-node/test/unit/util/array.test.ts +++ b/packages/beacon-node/test/unit/util/array.test.ts @@ -102,6 +102,72 @@ describe("LinkedList", () => { expect(list.last()).toBe(98); }); + describe("moveToHead", () => { + let list: LinkedList; + + beforeEach(() => { + list = new LinkedList(); + list.push(1); + list.push(2); + list.push(3); + }); + + it("item is head", () => { + list.moveToHead(1); + expect(list.toArray()).toEqual([1, 2, 3]); + expect(list.first()).toBe(1); + }); + + it("item is middle", () => { + list.moveToHead(2); + expect(list.toArray()).toEqual([2, 1, 3]); + expect(list.first()).toBe(2); + }); + + it("item is tail", () => { + list.moveToHead(3); + expect(list.toArray()).toEqual([3, 1, 2]); + expect(list.first()).toBe(3); + }); + }); + + describe("moveToSecond", () => { + let list: LinkedList; + + beforeEach(() => { + list = new LinkedList(); + list.push(1); + list.push(2); + list.push(3); + list.push(4); + }); + + it("item is head", () => { + list.moveToSecond(1); + expect(list.toArray()).toEqual([1, 2, 3, 4]); + expect(list.first()).toBe(1); + }); + + it("item is second", () => { + list.moveToSecond(2); + expect(list.toArray()).toEqual([1, 2, 3, 4]); + expect(list.first()).toBe(1); + }); + + it("item is third", () => { + list.moveToSecond(3); + expect(list.toArray()).toEqual([1, 3, 2, 4]); + expect(list.first()).toBe(1); + }); + + it("item is tail", () => { + list.moveToSecond(4); + expect(list.toArray()).toEqual([1, 4, 2, 3]); + expect(list.first()).toBe(1); + expect(list.last()).toBe(3); + }); + }); + it("values", () => { expect(Array.from(list.values())).toEqual([]); const count = 100; @@ -165,6 +231,46 @@ describe("LinkedList", () => { }); }); + describe("insertAfter", () => { + let list: LinkedList; + + beforeEach(() => { + list = new LinkedList(); + list.push(1); + list.push(2); + list.push(3); + }); + + it("insert after 0", () => { + // should do nothing + list.insertAfter(0, 4); + expect(list.toArray()).toEqual([1, 2, 3]); + expect(list.first()).toBe(1); + expect(list.last()).toBe(3); + }); + + it("insert after 1", () => { + list.insertAfter(1, 4); + expect(list.toArray()).toEqual([1, 4, 2, 3]); + expect(list.first()).toBe(1); + expect(list.last()).toBe(3); + }); + + it("insert after 2", () => { + list.insertAfter(2, 4); + expect(list.toArray()).toEqual([1, 2, 4, 3]); + expect(list.first()).toBe(1); + expect(list.last()).toBe(3); + }); + + it("insert after 3", () => { + list.insertAfter(3, 4); + expect(list.toArray()).toEqual([1, 2, 3, 4]); + expect(list.first()).toBe(1); + expect(list.last()).toBe(4); + }); + }); + it("toArray", () => { expect(list.toArray()).toEqual([]); @@ -205,4 +311,22 @@ describe("LinkedList", () => { }); } }); + + describe("has", () => { + let list: LinkedList; + + beforeEach(() => { + list = new LinkedList(); + list.push(1); + list.push(2); + list.push(3); + }); + + it("should return true if the item is in the list", () => { + expect(list.has(1)).toBe(true); + expect(list.has(2)).toBe(true); + expect(list.has(3)).toBe(true); + expect(list.has(4)).toBe(false); + }); + }); }); diff --git a/packages/beacon-node/test/utils/chain/stateCache/datastore.ts b/packages/beacon-node/test/utils/chain/stateCache/datastore.ts new file mode 100644 index 000000000000..8a944f4c2d88 --- /dev/null +++ b/packages/beacon-node/test/utils/chain/stateCache/datastore.ts @@ -0,0 +1,26 @@ +import {fromHexString, toHexString} from "@chainsafe/ssz"; +import {CPStateDatastore, checkpointToDatastoreKey} from "../../../../src/chain/stateCache/datastore/index.js"; + +export function getTestDatastore(fileApisBuffer: Map): CPStateDatastore { + const datastore: CPStateDatastore = { + write: (cp, state) => { + const persistentKey = checkpointToDatastoreKey(cp); + const stringKey = toHexString(persistentKey); + if (!fileApisBuffer.has(stringKey)) { + fileApisBuffer.set(stringKey, state.serialize()); + } + return Promise.resolve(persistentKey); + }, + remove: (persistentKey) => { + const stringKey = toHexString(persistentKey); + if (fileApisBuffer.has(stringKey)) { + fileApisBuffer.delete(stringKey); + } + return Promise.resolve(); + }, + read: (persistentKey) => Promise.resolve(fileApisBuffer.get(toHexString(persistentKey)) ?? null), + readKeys: () => Promise.resolve(Array.from(fileApisBuffer.keys()).map((key) => fromHexString(key))), + }; + + return datastore; +} diff --git a/packages/beacon-node/test/utils/mocks/db.ts b/packages/beacon-node/test/utils/mocks/db.ts index 731091bc8e6e..16d7b32a1bcc 100644 --- a/packages/beacon-node/test/utils/mocks/db.ts +++ b/packages/beacon-node/test/utils/mocks/db.ts @@ -1,4 +1,5 @@ import {IBeaconDb} from "../../../src/db/index.js"; +import {CheckpointStateRepository} from "../../../src/db/repositories/checkpointState.js"; import { AttesterSlashingRepository, BlockArchiveRepository, @@ -38,6 +39,7 @@ export function getStubbedBeaconDb(): IBeaconDb { // finalized states stateArchive: createStubInstance(StateArchiveRepository), + checkpointState: createStubInstance(CheckpointStateRepository), // op pool voluntaryExit: createStubInstance(VoluntaryExitRepository), diff --git a/packages/state-transition/src/cache/stateCache.ts b/packages/state-transition/src/cache/stateCache.ts index 140e3d04c155..b01ca0c409b2 100644 --- a/packages/state-transition/src/cache/stateCache.ts +++ b/packages/state-transition/src/cache/stateCache.ts @@ -159,9 +159,9 @@ export function createCachedBeaconState( * Create a CachedBeaconState given a cached seed state and state bytes * This guarantees that the returned state shares the same tree with the seed state * Check loadState() api for more details - * TODO: after EIP-6110 need to provide a pivotValidatorIndex to decide which comes to finalized validators cache, which comes to unfinalized cache + * // TODO: rename to loadUnfinalizedCachedBeaconState() due to EIP-6110 */ -export function loadUnfinalizedCachedBeaconState( +export function loadCachedBeaconState( cachedSeedState: T, stateBytes: Uint8Array, opts?: EpochCacheOpts diff --git a/packages/state-transition/src/index.ts b/packages/state-transition/src/index.ts index ff77aa75180b..8786c0f6e358 100644 --- a/packages/state-transition/src/index.ts +++ b/packages/state-transition/src/index.ts @@ -26,7 +26,7 @@ export type { // Main state caches export { createCachedBeaconState, - loadUnfinalizedCachedBeaconState, + loadCachedBeaconState, type BeaconStateCache, isCachedBeaconState, isStateBalancesNodesPopulated, diff --git a/packages/state-transition/src/util/loadState/index.ts b/packages/state-transition/src/util/loadState/index.ts new file mode 100644 index 000000000000..706de3c11540 --- /dev/null +++ b/packages/state-transition/src/util/loadState/index.ts @@ -0,0 +1 @@ +export {loadState} from "./loadState.js"; diff --git a/packages/state-transition/test/unit/cachedBeaconState.test.ts b/packages/state-transition/test/unit/cachedBeaconState.test.ts index cd32776d1045..2891cd3e6216 100644 --- a/packages/state-transition/test/unit/cachedBeaconState.test.ts +++ b/packages/state-transition/test/unit/cachedBeaconState.test.ts @@ -5,7 +5,7 @@ import {config as defaultConfig} from "@lodestar/config/default"; import {createBeaconConfig} from "@lodestar/config"; import {createCachedBeaconStateTest} from "../utils/state.js"; import {PubkeyIndexMap} from "../../src/cache/pubkeyCache.js"; -import {createCachedBeaconState, loadUnfinalizedCachedBeaconState} from "../../src/cache/stateCache.js"; +import {createCachedBeaconState, loadCachedBeaconState} from "../../src/cache/stateCache.js"; import {interopPubkeysCached} from "../utils/interop.js"; import {modifyStateSameValidator, newStateWithValidators} from "../utils/capella.js"; import {EpochShuffling, getShufflingDecisionBlock} from "../../src/util/epochShuffling.js"; @@ -129,7 +129,7 @@ describe("CachedBeaconState", () => { // confirm loadState() result const stateBytes = state.serialize(); - const newCachedState = loadUnfinalizedCachedBeaconState(seedState, stateBytes, {skipSyncCommitteeCache: true}); + const newCachedState = loadCachedBeaconState(seedState, stateBytes, {skipSyncCommitteeCache: true}); const newStateBytes = newCachedState.serialize(); expect(newStateBytes).toEqual(stateBytes); expect(newCachedState.hashTreeRoot()).toEqual(state.hashTreeRoot()); @@ -171,7 +171,7 @@ describe("CachedBeaconState", () => { expect(newCachedState.epochCtx).toEqual(cachedState.epochCtx); } - // confirm loadUnfinalizedCachedBeaconState() result + // confirm loadCachedBeaconState() result for (let i = 0; i < newCachedState.validators.length; i++) { expect(newCachedState.epochCtx.pubkey2index.get(newCachedState.validators.get(i).pubkey)).toBe(i); expect(newCachedState.epochCtx.index2pubkey[i].toBytes()).toEqual(pubkeys[i]);