Skip to content

Commit

Permalink
Merge branch 'unstable' into slot-intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
ensi321 committed Sep 10, 2024
2 parents 88ea977 + cbc00c7 commit 1847a6e
Show file tree
Hide file tree
Showing 38 changed files with 614 additions and 216 deletions.
3 changes: 2 additions & 1 deletion packages/api/src/builder/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
} from "@lodestar/types";
import {ForkName, isForkBlobs} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";
import {toPubkeyHex} from "@lodestar/utils";

import {Endpoint, RouteDefinitions, Schema} from "../utils/index.js";
import {MetaHeader, VersionCodec, VersionMeta} from "../utils/metadata.js";
Expand Down Expand Up @@ -105,7 +106,7 @@ export function getDefinitions(config: ChainForkConfig): RouteDefinitions<Endpoi
method: "GET",
req: {
writeReq: ({slot, parentHash, proposerPubkey: proposerPubKey}) => ({
params: {slot, parent_hash: toHexString(parentHash), pubkey: toHexString(proposerPubKey)},
params: {slot, parent_hash: toHexString(parentHash), pubkey: toPubkeyHex(proposerPubKey)},
}),
parseReq: ({params}) => ({
slot: params.slot,
Expand Down
14 changes: 11 additions & 3 deletions packages/beacon-node/src/chain/archiver/archiveStates.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {IStateRegenerator} from "../regen/interface.js";
import {getStateSlotFromBytes} from "../../util/multifork.js";
import {serializeState} from "../serializeState.js";
import {AllocSource, BufferPool} from "../../util/bufferPool.js";

/**
* Minimum number of epochs between single temp archived states
Expand All @@ -30,7 +32,8 @@ export class StatesArchiver {
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
) {}

/**
Expand Down Expand Up @@ -95,8 +98,13 @@ export class StatesArchiver {
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// state
await this.db.stateArchive.put(finalizedStateOrBytes.slot, finalizedStateOrBytes);
// serialize state using BufferPool if provided
await serializeState(
finalizedStateOrBytes,
AllocSource.ARCHIVE_STATE,
(stateBytes) => this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes),
this.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
Expand Down
2 changes: 1 addition & 1 deletion packages/beacon-node/src/chain/archiver/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ export class Archiver {
opts: ArchiverOpts
) {
this.archiveBlobEpochs = opts.archiveBlobEpochs;
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts);
this.statesArchiver = new StatesArchiver(chain.regen, db, logger, opts, chain.bufferPool);
this.prevFinalized = chain.forkChoice.getFinalizedCheckpoint();
this.jobQueue = new JobItemQueue<[CheckpointWithHex], void>(this.processFinalizedCheckpoint, {
maxLength: PROCESS_FINALIZED_CHECKPOINT_QUEUE_LEN,
Expand Down
6 changes: 5 additions & 1 deletion packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ export class BeaconChain implements IBeaconChain {
readonly config: BeaconConfig;
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;

readonly anchorStateLatestBlockSlot: Slot;

Expand Down Expand Up @@ -275,6 +276,9 @@ export class BeaconChain implements IBeaconChain {
const blockStateCache = this.opts.nHistoricalStates
? new FIFOBlockStateCache(this.opts, {metrics})
: new BlockStateCacheImpl({metrics});
this.bufferPool = this.opts.nHistoricalStates
? new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics)
: null;
const checkpointStateCache = this.opts.nHistoricalStates
? new PersistentCheckpointStateCache(
{
Expand All @@ -283,7 +287,7 @@ export class BeaconChain implements IBeaconChain {
clock,
shufflingCache: this.shufflingCache,
blockStateCache,
bufferPool: new BufferPool(anchorState.type.tree_serializedSize(anchorState.node), metrics),
bufferPool: this.bufferPool,
datastore: fileDataStore
? // debug option if we want to investigate any issues with the DB
new FileCPStateDatastore()
Expand Down
2 changes: 2 additions & 0 deletions packages/beacon-node/src/chain/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {IEth1ForBlockProduction} from "../eth1/index.js";
import {IExecutionEngine, IExecutionBuilder} from "../execution/index.js";
import {Metrics} from "../metrics/metrics.js";
import {IClock} from "../util/clock.js";
import {BufferPool} from "../util/bufferPool.js";
import {ChainEventEmitter} from "./emitter.js";
import {IStateRegenerator, RegenCaller} from "./regen/index.js";
import {IBlsVerifier} from "./bls/index.js";
Expand Down Expand Up @@ -86,6 +87,7 @@ export interface IBeaconChain {
readonly config: BeaconConfig;
readonly logger: Logger;
readonly metrics: Metrics | null;
readonly bufferPool: BufferPool | null;

/** The initial slot that the chain is started with */
readonly anchorStateLatestBlockSlot: Slot;
Expand Down
70 changes: 58 additions & 12 deletions packages/beacon-node/src/chain/regen/regen.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import {fromHexString} from "@chainsafe/ssz";
import {phase0, Slot, RootHex, BeaconBlock} from "@lodestar/types";
import {phase0, Slot, RootHex, BeaconBlock, SignedBeaconBlock} from "@lodestar/types";
import {
CachedBeaconStateAllForks,
computeEpochAtSlot,
Expand All @@ -8,6 +8,7 @@ import {
DataAvailableStatus,
processSlots,
stateTransition,
StateHashTreeRootSource,
} from "@lodestar/state-transition";
import {IForkChoice, ProtoBlock} from "@lodestar/fork-choice";
import {Logger, toRootHex} from "@lodestar/utils";
Expand Down Expand Up @@ -145,7 +146,7 @@ export class StateRegenerator implements IStateRegeneratorInternal {
*/
async getState(
stateRoot: RootHex,
_rCaller: RegenCaller,
caller: RegenCaller,
opts?: StateCloneOpts,
// internal option, don't want to expose to external caller
allowDiskReload = false
Expand All @@ -156,6 +157,13 @@ export class StateRegenerator implements IStateRegeneratorInternal {
return cachedStateCtx;
}

// in block gossip validation (getPreState() call), dontTransferCache is specified as true because we only want to transfer cache in verifyBlocksStateTransitionOnly()
// but here we want to process blocks as fast as possible so force to transfer cache in this case
if (opts && allowDiskReload) {
// if there is no `opts` specified, it already means "false"
opts.dontTransferCache = false;
}

// Otherwise we have to use the fork choice to traverse backwards, block by block,
// searching the state caches
// then replay blocks forward to the desired stateRoot
Expand All @@ -166,6 +174,8 @@ export class StateRegenerator implements IStateRegeneratorInternal {
const blocksToReplay = [block];
let state: CachedBeaconStateAllForks | null = null;
const {checkpointStateCache} = this.modules;

const getSeedStateTimer = this.modules.metrics?.regenGetState.getSeedState.startTimer({caller});
// iterateAncestorBlocks only returns ancestor blocks, not the block itself
for (const b of this.modules.forkChoice.iterateAncestorBlocks(block.blockRoot)) {
state = this.modules.blockStateCache.get(b.stateRoot, opts);
Expand All @@ -181,26 +191,58 @@ export class StateRegenerator implements IStateRegeneratorInternal {
}
blocksToReplay.push(b);
}
getSeedStateTimer?.();

if (state === null) {
throw new RegenError({
code: RegenErrorCode.NO_SEED_STATE,
});
}

const blockCount = blocksToReplay.length;
const MAX_EPOCH_TO_PROCESS = 5;
if (blocksToReplay.length > MAX_EPOCH_TO_PROCESS * SLOTS_PER_EPOCH) {
if (blockCount > MAX_EPOCH_TO_PROCESS * SLOTS_PER_EPOCH) {
throw new RegenError({
code: RegenErrorCode.TOO_MANY_BLOCK_PROCESSED,
stateRoot,
});
}

const replaySlots = blocksToReplay.map((b) => b.slot).join(",");
this.modules.logger.debug("Replaying blocks to get state", {stateRoot, replaySlots});
for (const b of blocksToReplay.reverse()) {
const block = await this.modules.db.block.get(fromHexString(b.blockRoot));
if (!block) {
this.modules.metrics?.regenGetState.blockCount.observe({caller}, blockCount);

const replaySlots = new Array<Slot>(blockCount);
const blockPromises = new Array<Promise<SignedBeaconBlock | null>>(blockCount);

const protoBlocksAsc = blocksToReplay.reverse();
for (const [i, protoBlock] of protoBlocksAsc.entries()) {
replaySlots[i] = protoBlock.slot;
blockPromises[i] = this.modules.db.block.get(fromHexString(protoBlock.blockRoot));
}

const logCtx = {stateRoot, replaySlots: replaySlots.join(",")};
this.modules.logger.debug("Replaying blocks to get state", logCtx);

const loadBlocksTimer = this.modules.metrics?.regenGetState.loadBlocks.startTimer({caller});
const blockOrNulls = await Promise.all(blockPromises);
loadBlocksTimer?.();

const blocksByRoot = new Map<RootHex, SignedBeaconBlock>();
for (const [i, blockOrNull] of blockOrNulls.entries()) {
// checking early here helps prevent unneccessary state transition below
if (blockOrNull === null) {
throw new RegenError({
code: RegenErrorCode.BLOCK_NOT_IN_DB,
blockRoot: protoBlocksAsc[i].blockRoot,
});
}
blocksByRoot.set(protoBlocksAsc[i].blockRoot, blockOrNull);
}

const stateTransitionTimer = this.modules.metrics?.regenGetState.stateTransition.startTimer({caller});
for (const b of protoBlocksAsc) {
const block = blocksByRoot.get(b.blockRoot);
// just to make compiler happy, we checked in the above for loop already
if (block === undefined) {
throw new RegenError({
code: RegenErrorCode.BLOCK_NOT_IN_DB,
blockRoot: b.blockRoot,
Expand All @@ -224,7 +266,12 @@ export class StateRegenerator implements IStateRegeneratorInternal {
this.modules.metrics
);

const hashTreeRootTimer = this.modules.metrics?.stateHashTreeRootTime.startTimer({
source: StateHashTreeRootSource.regenState,
});
const stateRoot = toRootHex(state.hashTreeRoot());
hashTreeRootTimer?.();

if (b.stateRoot !== stateRoot) {
throw new RegenError({
slot: b.slot,
Expand All @@ -238,17 +285,16 @@ export class StateRegenerator implements IStateRegeneratorInternal {
// also with allowDiskReload flag, we "reload" it to the state cache too
this.modules.blockStateCache.add(state);
}

// this avoids keeping our node busy processing blocks
await nextEventLoop();
} catch (e) {
throw new RegenError({
code: RegenErrorCode.STATE_TRANSITION_ERROR,
error: e as Error,
});
}
}
this.modules.logger.debug("Replayed blocks to get state", {stateRoot, replaySlots});
stateTransitionTimer?.();

this.modules.logger.debug("Replayed blocks to get state", {...logCtx, stateSlot: state.slot});

return state;
}
Expand Down
33 changes: 33 additions & 0 deletions packages/beacon-node/src/chain/serializeState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import {CachedBeaconStateAllForks} from "@lodestar/state-transition";
import {AllocSource, BufferPool} from "../util/bufferPool.js";

type ProcessStateBytesFn<T> = (stateBytes: Uint8Array) => Promise<T>;

/*
* Serialize state using the BufferPool if provided.
*/
export async function serializeState<T>(
state: CachedBeaconStateAllForks,
source: AllocSource,
processFn: ProcessStateBytesFn<T>,
bufferPool?: BufferPool | null
): Promise<T> {
const size = state.type.tree_serializedSize(state.node);
let stateBytes: Uint8Array | null = null;
if (bufferPool) {
const bufferWithKey = bufferPool.alloc(size, source);
if (bufferWithKey) {
stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
}
}

if (!stateBytes) {
// we already have metrics in BufferPool so no need to do it here
stateBytes = state.serialize();
}

return processFn(stateBytes);
// release the buffer back to the pool automatically
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,9 @@ import {loadCachedBeaconState} from "@lodestar/state-transition";
import {Metrics} from "../../metrics/index.js";
import {IClock} from "../../util/clock.js";
import {ShufflingCache} from "../shufflingCache.js";
import {BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {AllocSource, BufferPool, BufferWithKey} from "../../util/bufferPool.js";
import {StateCloneOpts} from "../regen/interface.js";
import {serializeState} from "../serializeState.js";
import {MapTracker} from "./mapMetrics.js";
import {CPStateDatastore, DatastoreKey, datastoreKeyToCheckpoint} from "./datastore/index.js";
import {CheckpointHex, CacheItemType, CheckpointStateCache, BlockStateCache} from "./types.js";
Expand All @@ -33,7 +34,7 @@ type PersistentCheckpointStateCacheModules = {
shufflingCache: ShufflingCache;
datastore: CPStateDatastore;
blockStateCache: BlockStateCache;
bufferPool?: BufferPool;
bufferPool?: BufferPool | null;
};

/** checkpoint serialized as a string */
Expand Down Expand Up @@ -110,7 +111,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
private readonly datastore: CPStateDatastore;
private readonly shufflingCache: ShufflingCache;
private readonly blockStateCache: BlockStateCache;
private readonly bufferPool?: BufferPool;
private readonly bufferPool?: BufferPool | null;

constructor(
{
Expand Down Expand Up @@ -233,6 +234,10 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
newCachedState.commit();
const stateRoot = toRootHex(newCachedState.hashTreeRoot());
timer?.();

// load all cache in order for consumers (usually regen.getState()) to process blocks faster
newCachedState.validators.getAllReadonlyValues();
newCachedState.balances.getAll();
this.logger.debug("Reload: cached state load successful", {
...logMeta,
stateSlot: newCachedState.slot,
Expand Down Expand Up @@ -698,19 +703,20 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
// persist and do not update epochIndex
this.metrics?.statePersistSecFromSlot.observe(this.clock?.secFromSlot(this.clock?.currentSlot ?? 0) ?? 0);
const cpPersist = {epoch: epoch, root: fromHexString(rootHex)};
{
const timer = this.metrics?.stateSerializeDuration.startTimer();
// automatically free the buffer pool after this scope
using stateBytesWithKey = this.serializeState(state);
let stateBytes = stateBytesWithKey?.buffer;
if (stateBytes == null) {
// fallback logic to use regular way to get state ssz bytes
this.metrics?.persistedStateAllocCount.inc();
stateBytes = state.serialize();
}
timer?.();
persistedKey = await this.datastore.write(cpPersist, stateBytes);
}
// It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
// As monitored on holesky as of Jan 2024:
// - This does not increase heap allocation while gc time is the same
// - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
// - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
// - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
const timer = this.metrics?.stateSerializeDuration.startTimer();
persistedKey = await serializeState(
state,
AllocSource.PERSISTENT_CHECKPOINTS_CACHE_STATE,
(stateBytes) => this.datastore.write(cpPersist, stateBytes),
this.bufferPool
);
timer?.();
persistCount++;
this.logger.verbose("Pruned checkpoint state from memory and persisted to disk", {
...logMeta,
Expand Down Expand Up @@ -767,29 +773,6 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
});
}

/*
* It's not sustainable to allocate ~240MB for each state every epoch, so we use buffer pool to reuse the memory.
* As monitored on holesky as of Jan 2024:
* - This does not increase heap allocation while gc time is the same
* - It helps stabilize persist time and save ~300ms in average (1.5s vs 1.2s)
* - It also helps the state reload to save ~500ms in average (4.3s vs 3.8s)
* - Also `serializeState.test.ts` perf test shows a lot of differences allocating ~240MB once vs per state serialization
*/
private serializeState(state: CachedBeaconStateAllForks): BufferWithKey | null {
const size = state.type.tree_serializedSize(state.node);
if (this.bufferPool) {
const bufferWithKey = this.bufferPool.alloc(size);
if (bufferWithKey) {
const stateBytes = bufferWithKey.buffer;
const dataView = new DataView(stateBytes.buffer, stateBytes.byteOffset, stateBytes.byteLength);
state.serializeToBytes({uint8Array: stateBytes, dataView}, 0);
return bufferWithKey;
}
}

return null;
}

/**
* Serialize validators to bytes leveraging the buffer pool to save memory allocation.
* - As monitored on holesky as of Jan 2024, it helps save ~500ms state reload time (4.3s vs 3.8s)
Expand All @@ -800,7 +783,7 @@ export class PersistentCheckpointStateCache implements CheckpointStateCache {
const type = state.type.fields.validators;
const size = type.tree_serializedSize(state.validators.node);
if (this.bufferPool) {
const bufferWithKey = this.bufferPool.alloc(size);
const bufferWithKey = this.bufferPool.alloc(size, AllocSource.PERSISTENT_CHECKPOINTS_CACHE_VALIDATORS);
if (bufferWithKey) {
const validatorsBytes = bufferWithKey.buffer;
const dataView = new DataView(validatorsBytes.buffer, validatorsBytes.byteOffset, validatorsBytes.byteLength);
Expand Down
Loading

0 comments on commit 1847a6e

Please sign in to comment.