diff --git a/db/slots.go b/db/slots.go index fb6ec06e..c96b1688 100644 --- a/db/slots.go +++ b/db/slots.go @@ -24,7 +24,8 @@ func InsertSlot(slot *dbtypes.Slot, tx *sqlx.Tx) error { ON CONFLICT (slot, root) DO UPDATE SET status = excluded.status, eth_block_extra = excluded.eth_block_extra, - eth_block_extra_text = excluded.eth_block_extra_text`, + eth_block_extra_text = excluded.eth_block_extra_text, + fork_id = excluded.fork_id`, dbtypes.DBEngineSqlite: ` INSERT OR REPLACE INTO slots ( slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text, diff --git a/db/unfinalized_epochs.go b/db/unfinalized_epochs.go index 87285fba..cc4230c8 100644 --- a/db/unfinalized_epochs.go +++ b/db/unfinalized_epochs.go @@ -96,8 +96,8 @@ func GetUnfinalizedEpoch(epoch uint64, headRoot []byte) *dbtypes.UnfinalizedEpoc return &unfinalizedEpoch } -func DeleteUnfinalizedEpochsIn(epoch uint64, tx *sqlx.Tx) error { - _, err := tx.Exec(`DELETE FROM unfinalized_epochs WHERE epoch = $1`, epoch) +func DeleteUnfinalizedEpochsBefore(epoch uint64, tx *sqlx.Tx) error { + _, err := tx.Exec(`DELETE FROM unfinalized_epochs WHERE epoch < $1`, epoch) if err != nil { return err } diff --git a/db/withdrawal_request_txs.go b/db/withdrawal_request_txs.go index be0a2852..9da94427 100644 --- a/db/withdrawal_request_txs.go +++ b/db/withdrawal_request_txs.go @@ -52,7 +52,7 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx argIdx += fieldCount } fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{ - dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET fork_id = excluded.fork_id", + dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET validator_index = excluded.validator_index, fork_id = excluded.fork_id", dbtypes.DBEngineSqlite: "", })) diff --git a/indexer/beacon/canonical.go b/indexer/beacon/canonical.go index fb8fda26..b28890c6 100644 --- a/indexer/beacon/canonical.go +++ b/indexer/beacon/canonical.go @@ -5,7 +5,6 @@ import ( "fmt" "math" "slices" - "sort" "strings" "time" @@ -28,47 +27,14 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block { if overrideForkId != nil && indexer.canonicalHead != nil && indexer.canonicalHead.forkId != *overrideForkId { chainHeads := indexer.cachedChainHeads - chainHeadCandidates := []*ChainHead{} for _, chainHead := range chainHeads { parentForkIds := indexer.forkCache.getParentForkIds(chainHead.HeadBlock.forkId) - isInParentIds := false - for _, parentForkId := range parentForkIds { - if parentForkId == *overrideForkId { - isInParentIds = true - break - } - } - if !isInParentIds { + if !slices.Contains(parentForkIds, *overrideForkId) { continue } - chainHeadCandidates = append(chainHeadCandidates, chainHead) - } - - if len(chainHeadCandidates) > 0 { - sort.Slice(chainHeadCandidates, func(i, j int) bool { - percentagesI := float64(0) - percentagesJ := float64(0) - for k := range chainHeadCandidates[i].PerEpochVotingPercent { - factor := float64(1) - if k == len(chainHeadCandidates[i].PerEpochVotingPercent)-1 { - factor = 0.5 - } - percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor - if len(chainHeadCandidates[j].PerEpochVotingPercent) > k { - percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor - } - } - - if percentagesI != percentagesJ { - return percentagesI > percentagesJ - } - - return chainHeadCandidates[i].HeadBlock.Slot > chainHeadCandidates[j].HeadBlock.Slot - }) - - return chainHeadCandidates[0].HeadBlock + return chainHead.HeadBlock } } @@ -81,24 +47,6 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead { heads := make([]*ChainHead, len(indexer.cachedChainHeads)) copy(heads, indexer.cachedChainHeads) - sort.Slice(heads, func(i, j int) bool { - percentagesI := float64(0) - percentagesJ := float64(0) - for k := range heads[i].PerEpochVotingPercent { - factor := float64(1) - if k == len(heads[i].PerEpochVotingPercent)-1 { - factor = 0.5 - } - percentagesI += heads[i].PerEpochVotingPercent[k] * factor - percentagesJ += heads[j].PerEpochVotingPercent[k] * factor - } - - if percentagesI != percentagesJ { - return percentagesI > percentagesJ - } - - return heads[i].HeadBlock.Slot > heads[j].HeadBlock.Slot - }) return heads } @@ -233,6 +181,27 @@ func (indexer *Indexer) computeCanonicalChain() bool { } } + slices.SortFunc(chainHeads, func(headA, headB *ChainHead) int { + percentagesA := float64(0) + percentagesB := float64(0) + for k := range headA.PerEpochVotingPercent { + factor := float64(1) + if k == len(headA.PerEpochVotingPercent)-1 { + factor = 0.5 + } + percentagesA += headA.PerEpochVotingPercent[k] * factor + if len(headB.PerEpochVotingPercent) > k { + percentagesB += headB.PerEpochVotingPercent[k] * factor + } + } + + if percentagesA != percentagesB { + return int((percentagesB - percentagesA) * 100) + } + + return int(headB.HeadBlock.Slot - headA.HeadBlock.Slot) + }) + return true } diff --git a/indexer/beacon/debug.go b/indexer/beacon/debug.go index 4fa749d8..c1cea890 100644 --- a/indexer/beacon/debug.go +++ b/indexer/beacon/debug.go @@ -30,10 +30,13 @@ type CacheDebugStats struct { VotesCacheMiss uint64 } ForkCache struct { - ForkMap CacheDebugMapSize - ParentIdCacheLen uint64 - ParentIdCacheHit uint64 - ParentIdCacheMiss uint64 + ForkMap CacheDebugMapSize + ParentIdCacheLen uint64 + ParentIdCacheHit uint64 + ParentIdCacheMiss uint64 + ParentIdsCacheLen uint64 + ParentIdsCacheHit uint64 + ParentIdsCacheMiss uint64 } ValidatorCache struct { Validators uint64 @@ -142,6 +145,10 @@ func (indexer *Indexer) getForkCacheDebugStats(cacheStats *CacheDebugStats) { cacheStats.ForkCache.ParentIdCacheLen = uint64(indexer.forkCache.parentIdCache.Len()) cacheStats.ForkCache.ParentIdCacheHit = indexer.forkCache.parentIdCacheHit cacheStats.ForkCache.ParentIdCacheMiss = indexer.forkCache.parentIdCacheMiss + + cacheStats.ForkCache.ParentIdsCacheLen = uint64(indexer.forkCache.parentIdsCache.Len()) + cacheStats.ForkCache.ParentIdsCacheHit = indexer.forkCache.parentIdsCacheHit + cacheStats.ForkCache.ParentIdsCacheMiss = indexer.forkCache.parentIdsCacheMiss } func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats) { diff --git a/indexer/beacon/epochstats.go b/indexer/beacon/epochstats.go index 08f8c764..77f27189 100644 --- a/indexer/beacon/epochstats.go +++ b/indexer/beacon/epochstats.go @@ -61,6 +61,7 @@ type EpochStatsValues struct { // EpochStatsPacked holds the packed values for the epoch-specific information. type EpochStatsPacked struct { ActiveValidators []EpochStatsPackedValidator + ProposerDuties []phase0.ValidatorIndex SyncCommitteeDuties []phase0.ValidatorIndex RandaoMix phase0.Hash32 NextRandaoMix phase0.Hash32 @@ -159,6 +160,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) { packedValues := &EpochStatsPacked{ ActiveValidators: make([]EpochStatsPackedValidator, es.values.ActiveValidators), + ProposerDuties: es.values.ProposerDuties, SyncCommitteeDuties: es.values.SyncCommitteeDuties, RandaoMix: es.values.RandaoMix, NextRandaoMix: es.values.NextRandaoMix, @@ -188,7 +190,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) { // unmarshalSSZ unmarshals the EpochStats values using the provided SSZ bytes. // skips computing attester duties if withCommittees is false to speed up the process. -func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withCommittees bool) (*EpochStatsValues, error) { +func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withDuties bool) (*EpochStatsValues, error) { if dynSsz == nil { dynSsz = dynssz.NewDynSsz(nil) } @@ -213,6 +215,7 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu NextRandaoMix: packedValues.NextRandaoMix, ActiveIndices: make([]phase0.ValidatorIndex, len(packedValues.ActiveValidators)), EffectiveBalances: make([]uint16, len(packedValues.ActiveValidators)), + ProposerDuties: packedValues.ProposerDuties, SyncCommitteeDuties: packedValues.SyncCommitteeDuties, TotalBalance: packedValues.TotalBalance, ActiveBalance: packedValues.ActiveBalance, @@ -232,35 +235,35 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu values.ActiveValidators = uint64(len(packedValues.ActiveValidators)) - beaconState := &duties.BeaconState{ - RandaoMix: &values.RandaoMix, - GetActiveCount: func() uint64 { - return values.ActiveValidators - }, - GetEffectiveBalance: func(index duties.ActiveIndiceIndex) phase0.Gwei { - return phase0.Gwei(values.EffectiveBalances[index]) * EtherGweiFactor - }, - } - - // compute proposers - proposerDuties := []phase0.ValidatorIndex{} - for slot := chainState.EpochToSlot(es.epoch); slot < chainState.EpochToSlot(es.epoch+1); slot++ { - proposer, err := duties.GetProposerIndex(chainState.GetSpecs(), beaconState, slot) - proposerIndex := phase0.ValidatorIndex(math.MaxInt64) - if err == nil { - proposerIndex = values.ActiveIndices[proposer] + if withDuties { + beaconState := &duties.BeaconState{ + RandaoMix: &values.RandaoMix, + GetActiveCount: func() uint64 { + return values.ActiveValidators + }, + GetEffectiveBalance: func(index duties.ActiveIndiceIndex) phase0.Gwei { + return phase0.Gwei(values.EffectiveBalances[index]) * EtherGweiFactor + }, } - proposerDuties = append(proposerDuties, proposerIndex) - } + // compute proposers + proposerDuties := []phase0.ValidatorIndex{} + for slot := chainState.EpochToSlot(es.epoch); slot < chainState.EpochToSlot(es.epoch+1); slot++ { + proposer, err := duties.GetProposerIndex(chainState.GetSpecs(), beaconState, slot) + proposerIndex := phase0.ValidatorIndex(math.MaxInt64) + if err == nil { + proposerIndex = values.ActiveIndices[proposer] + } - values.ProposerDuties = proposerDuties - if beaconState.RandaoMix != nil { - values.RandaoMix = *beaconState.RandaoMix - } + proposerDuties = append(proposerDuties, proposerIndex) + } - // compute committees - if withCommittees { + values.ProposerDuties = proposerDuties + if beaconState.RandaoMix != nil { + values.RandaoMix = *beaconState.RandaoMix + } + + // compute committees attesterDuties, _ := duties.GetAttesterDuties(chainState.GetSpecs(), beaconState, es.epoch) values.AttesterDuties = attesterDuties } diff --git a/indexer/beacon/finalization.go b/indexer/beacon/finalization.go index b6c7b6da..9aae391c 100644 --- a/indexer/beacon/finalization.go +++ b/indexer/beacon/finalization.go @@ -147,10 +147,6 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R } canonicalBlocks = append(canonicalBlocks, block) } else { - if block.isInFinalizedDb { - // orphaned block which is already in db, ignore - continue - } if block.block == nil { indexer.logger.Warnf("missing block body for orphaned block %v (%v)", block.Slot, block.Root.String()) continue @@ -353,16 +349,18 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R } // delete unfinalized epoch aggregations in epoch - if err := db.DeleteUnfinalizedEpochsIn(uint64(epoch), tx); err != nil { - return fmt.Errorf("failed deleting unfinalized epoch aggregations of epoch %v: %v", epoch, err) + if err := db.DeleteUnfinalizedEpochsBefore(uint64(epoch+1), tx); err != nil { + return fmt.Errorf("failed deleting unfinalized epoch aggregations <= epoch %v: %v", epoch, err) } // delete unfinalized forks for canonical roots - if err := db.UpdateFinalizedForkParents(canonicalRoots, tx); err != nil { - return fmt.Errorf("failed updating finalized fork parents: %v", err) - } - if err := db.DeleteFinalizedForks(canonicalRoots, tx); err != nil { - return fmt.Errorf("failed deleting finalized forks: %v", err) + if len(canonicalRoots) > 0 { + if err := db.UpdateFinalizedForkParents(canonicalRoots, tx); err != nil { + return fmt.Errorf("failed updating finalized fork parents: %v", err) + } + if err := db.DeleteFinalizedForks(canonicalRoots, tx); err != nil { + return fmt.Errorf("failed deleting finalized forks: %v", err) + } } return nil diff --git a/indexer/beacon/forkcache.go b/indexer/beacon/forkcache.go index 4a7bfc9b..195e8734 100644 --- a/indexer/beacon/forkcache.go +++ b/indexer/beacon/forkcache.go @@ -15,23 +15,27 @@ import ( // forkCache is a struct that represents the fork cache in the indexer. type forkCache struct { - indexer *Indexer - cacheMutex sync.RWMutex - forkMap map[ForkKey]*Fork - finalizedForkId ForkKey - lastForkId ForkKey - parentIdCache *lru.Cache[ForkKey, ForkKey] - parentIdCacheHit uint64 - parentIdCacheMiss uint64 - forkProcessLock sync.Mutex + indexer *Indexer + cacheMutex sync.RWMutex + forkMap map[ForkKey]*Fork + finalizedForkId ForkKey + lastForkId ForkKey + parentIdCache *lru.Cache[ForkKey, ForkKey] + parentIdCacheHit uint64 + parentIdCacheMiss uint64 + parentIdsCache *lru.Cache[ForkKey, []ForkKey] + parentIdsCacheHit uint64 + parentIdsCacheMiss uint64 + forkProcessLock sync.Mutex } // newForkCache creates a new instance of the forkCache struct. func newForkCache(indexer *Indexer) *forkCache { return &forkCache{ - indexer: indexer, - forkMap: make(map[ForkKey]*Fork), - parentIdCache: lru.NewCache[ForkKey, ForkKey](1000), + indexer: indexer, + forkMap: make(map[ForkKey]*Fork), + parentIdCache: lru.NewCache[ForkKey, ForkKey](1000), + parentIdsCache: lru.NewCache[ForkKey, []ForkKey](30), } } @@ -120,30 +124,37 @@ func (cache *forkCache) removeFork(forkId ForkKey) { // getParentForkIds returns the parent fork ids of the given fork. func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey { - parentForks := []ForkKey{forkId} + parentForks, isCached := cache.parentIdsCache.Get(forkId) + if isCached { + cache.parentIdsCacheHit++ + return parentForks + } + + parentForks = []ForkKey{forkId} parentForkId := forkId - thisFork := cache.getForkById(forkId) for parentForkId > 1 { - if thisFork != nil { - parentForkId = thisFork.parentFork - } else if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached { + if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached { cache.parentIdCacheHit++ parentForkId = cachedParent + } else if parentFork := cache.getForkById(parentForkId); parentFork != nil { + parentForkId = parentFork.parentFork } else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil { + cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(dbFork.ParentFork)) parentForkId = ForkKey(dbFork.ParentFork) - cache.parentIdCache.Add(ForkKey(dbFork.ForkId), ForkKey(dbFork.ParentFork)) cache.parentIdCacheMiss++ } else { - parentForkId = 0 cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(0)) + parentForkId = 0 cache.parentIdCacheMiss++ } - thisFork = cache.getForkById(parentForkId) parentForks = append(parentForks, parentForkId) } + cache.parentIdsCache.Add(forkId, parentForks) + cache.parentIdsCacheMiss++ + return parentForks } @@ -247,6 +258,7 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo } cache.finalizedForkId = finalizedForkId + cache.parentIdsCache.Purge() err := db.RunDBTransaction(func(tx *sqlx.Tx) error { return cache.updateForkState(tx) diff --git a/indexer/beacon/indexer_getter.go b/indexer/beacon/indexer_getter.go index 00c8ff24..c595e9d0 100644 --- a/indexer/beacon/indexer_getter.go +++ b/indexer/beacon/indexer_getter.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "math/rand/v2" + "slices" "sort" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -208,17 +209,32 @@ func (indexer *Indexer) GetEpochStats(epoch phase0.Epoch, overrideForkId *ForkKe canonicalHead := indexer.GetCanonicalHead(overrideForkId) var bestEpochStats *EpochStats - var bestDistance uint64 + var bestDistance phase0.Slot if canonicalHead != nil { + canonicalForkIds := indexer.forkCache.getParentForkIds(canonicalHead.forkId) + for _, stats := range epochStats { if !stats.ready { continue } - if isInChain, distance := indexer.blockCache.getCanonicalDistance(stats.dependentRoot, canonicalHead.Root, 0); isInChain { - if bestEpochStats == nil || distance < bestDistance { + + dependentBlock := indexer.blockCache.getBlockByRoot(stats.dependentRoot) + if dependentBlock == nil { + blockHead := db.GetBlockHeadByRoot(stats.dependentRoot[:]) + if blockHead != nil { + dependentBlock = newBlock(indexer.dynSsz, phase0.Root(blockHead.Root), phase0.Slot(blockHead.Slot)) + dependentBlock.isInFinalizedDb = true + parentRootVal := phase0.Root(blockHead.ParentRoot) + dependentBlock.parentRoot = &parentRootVal + dependentBlock.forkId = ForkKey(blockHead.ForkId) + dependentBlock.forkChecked = true + } + } + if dependentBlock != nil && slices.Contains(canonicalForkIds, dependentBlock.forkId) { + if bestEpochStats == nil || dependentBlock.Slot > bestDistance { bestEpochStats = stats - bestDistance = distance + bestDistance = dependentBlock.Slot } } } @@ -229,10 +245,12 @@ func (indexer *Indexer) GetEpochStats(epoch phase0.Epoch, overrideForkId *ForkKe if stats.ready { continue } - if isInChain, distance := indexer.blockCache.getCanonicalDistance(stats.dependentRoot, canonicalHead.Root, 0); isInChain { - if bestEpochStats == nil || distance < bestDistance { + + dependentBlock := indexer.blockCache.getBlockByRoot(stats.dependentRoot) + if dependentBlock != nil && slices.Contains(canonicalForkIds, dependentBlock.forkId) { + if bestEpochStats == nil || dependentBlock.Slot > bestDistance { bestEpochStats = stats - bestDistance = distance + bestDistance = dependentBlock.Slot } } } diff --git a/indexer/beacon/pruning.go b/indexer/beacon/pruning.go index 39a58e87..65033d7f 100644 --- a/indexer/beacon/pruning.go +++ b/indexer/beacon/pruning.go @@ -199,7 +199,8 @@ func (indexer *Indexer) processEpochPruning(pruneEpoch phase0.Epoch) (uint64, ui return } - _, err := indexer.dbWriter.persistBlockData(tx, block, epochData.epochStats, depositIndex, false, nil) + // persist pruned block data as orphaned here, the canonical blocks will be updated by the finalization or synchronization process later + _, err := indexer.dbWriter.persistBlockData(tx, block, epochData.epochStats, depositIndex, true, nil) if err != nil { indexer.logger.Errorf("error persisting pruned slot %v: %v", block.Root.String(), err) } diff --git a/indexer/beacon/synchronizer.go b/indexer/beacon/synchronizer.go index fc87b921..6d6e7e5e 100644 --- a/indexer/beacon/synchronizer.go +++ b/indexer/beacon/synchronizer.go @@ -408,16 +408,18 @@ func (sync *synchronizer) syncEpoch(syncEpoch phase0.Epoch, client *Client, last } // delete unfinalized epoch aggregations in epoch - if err := db.DeleteUnfinalizedEpochsIn(uint64(syncEpoch), tx); err != nil { - return fmt.Errorf("failed deleting unfinalized epoch aggregations of epoch %v: %v", syncEpoch, err) + if err := db.DeleteUnfinalizedEpochsBefore(uint64(syncEpoch+1), tx); err != nil { + return fmt.Errorf("failed deleting unfinalized epoch aggregations <= epoch %v: %v", syncEpoch, err) } // delete unfinalized forks for canonical roots - if err := db.UpdateFinalizedForkParents(canonicalBlockRoots, tx); err != nil { - return fmt.Errorf("failed updating finalized fork parents: %v", err) - } - if err := db.DeleteFinalizedForks(canonicalBlockRoots, tx); err != nil { - return fmt.Errorf("failed deleting finalized forks: %v", err) + if len(canonicalBlockRoots) > 0 { + if err := db.UpdateFinalizedForkParents(canonicalBlockRoots, tx); err != nil { + return fmt.Errorf("failed updating finalized fork parents: %v", err) + } + if err := db.DeleteFinalizedForks(canonicalBlockRoots, tx); err != nil { + return fmt.Errorf("failed deleting finalized forks: %v", err) + } } err = db.SetExplorerState("indexer.syncstate", &dbtypes.IndexerSyncState{