Skip to content

Commit

Permalink
Merge pull request #224 from ethpandaops/pk910/unfinality-performance…
Browse files Browse the repository at this point in the history
…-fixes

improve performance during long unfinality periods
  • Loading branch information
pk910 authored Jan 27, 2025
2 parents d170a55 + 247c6f1 commit bade6c9
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 134 deletions.
3 changes: 2 additions & 1 deletion db/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func InsertSlot(slot *dbtypes.Slot, tx *sqlx.Tx) error {
ON CONFLICT (slot, root) DO UPDATE SET
status = excluded.status,
eth_block_extra = excluded.eth_block_extra,
eth_block_extra_text = excluded.eth_block_extra_text`,
eth_block_extra_text = excluded.eth_block_extra_text,
fork_id = excluded.fork_id`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO slots (
slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text,
Expand Down
4 changes: 2 additions & 2 deletions db/unfinalized_epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func GetUnfinalizedEpoch(epoch uint64, headRoot []byte) *dbtypes.UnfinalizedEpoc
return &unfinalizedEpoch
}

func DeleteUnfinalizedEpochsIn(epoch uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`DELETE FROM unfinalized_epochs WHERE epoch = $1`, epoch)
func DeleteUnfinalizedEpochsBefore(epoch uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`DELETE FROM unfinalized_epochs WHERE epoch < $1`, epoch)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion db/withdrawal_request_txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET validator_index = excluded.validator_index, fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

Expand Down
77 changes: 23 additions & 54 deletions indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math"
"slices"
"sort"
"strings"
"time"

Expand All @@ -28,47 +27,14 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block {

if overrideForkId != nil && indexer.canonicalHead != nil && indexer.canonicalHead.forkId != *overrideForkId {
chainHeads := indexer.cachedChainHeads
chainHeadCandidates := []*ChainHead{}

for _, chainHead := range chainHeads {
parentForkIds := indexer.forkCache.getParentForkIds(chainHead.HeadBlock.forkId)
isInParentIds := false
for _, parentForkId := range parentForkIds {
if parentForkId == *overrideForkId {
isInParentIds = true
break
}
}
if !isInParentIds {
if !slices.Contains(parentForkIds, *overrideForkId) {
continue
}

chainHeadCandidates = append(chainHeadCandidates, chainHead)
}

if len(chainHeadCandidates) > 0 {
sort.Slice(chainHeadCandidates, func(i, j int) bool {
percentagesI := float64(0)
percentagesJ := float64(0)
for k := range chainHeadCandidates[i].PerEpochVotingPercent {
factor := float64(1)
if k == len(chainHeadCandidates[i].PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor
if len(chainHeadCandidates[j].PerEpochVotingPercent) > k {
percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor
}
}

if percentagesI != percentagesJ {
return percentagesI > percentagesJ
}

return chainHeadCandidates[i].HeadBlock.Slot > chainHeadCandidates[j].HeadBlock.Slot
})

return chainHeadCandidates[0].HeadBlock
return chainHead.HeadBlock
}
}

Expand All @@ -81,24 +47,6 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead {

heads := make([]*ChainHead, len(indexer.cachedChainHeads))
copy(heads, indexer.cachedChainHeads)
sort.Slice(heads, func(i, j int) bool {
percentagesI := float64(0)
percentagesJ := float64(0)
for k := range heads[i].PerEpochVotingPercent {
factor := float64(1)
if k == len(heads[i].PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesI += heads[i].PerEpochVotingPercent[k] * factor
percentagesJ += heads[j].PerEpochVotingPercent[k] * factor
}

if percentagesI != percentagesJ {
return percentagesI > percentagesJ
}

return heads[i].HeadBlock.Slot > heads[j].HeadBlock.Slot
})

return heads
}
Expand Down Expand Up @@ -233,6 +181,27 @@ func (indexer *Indexer) computeCanonicalChain() bool {
}
}

slices.SortFunc(chainHeads, func(headA, headB *ChainHead) int {
percentagesA := float64(0)
percentagesB := float64(0)
for k := range headA.PerEpochVotingPercent {
factor := float64(1)
if k == len(headA.PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesA += headA.PerEpochVotingPercent[k] * factor
if len(headB.PerEpochVotingPercent) > k {
percentagesB += headB.PerEpochVotingPercent[k] * factor
}
}

if percentagesA != percentagesB {
return int((percentagesB - percentagesA) * 100)
}

return int(headB.HeadBlock.Slot - headA.HeadBlock.Slot)
})

return true
}

Expand Down
15 changes: 11 additions & 4 deletions indexer/beacon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ type CacheDebugStats struct {
VotesCacheMiss uint64
}
ForkCache struct {
ForkMap CacheDebugMapSize
ParentIdCacheLen uint64
ParentIdCacheHit uint64
ParentIdCacheMiss uint64
ForkMap CacheDebugMapSize
ParentIdCacheLen uint64
ParentIdCacheHit uint64
ParentIdCacheMiss uint64
ParentIdsCacheLen uint64
ParentIdsCacheHit uint64
ParentIdsCacheMiss uint64
}
ValidatorCache struct {
Validators uint64
Expand Down Expand Up @@ -142,6 +145,10 @@ func (indexer *Indexer) getForkCacheDebugStats(cacheStats *CacheDebugStats) {
cacheStats.ForkCache.ParentIdCacheLen = uint64(indexer.forkCache.parentIdCache.Len())
cacheStats.ForkCache.ParentIdCacheHit = indexer.forkCache.parentIdCacheHit
cacheStats.ForkCache.ParentIdCacheMiss = indexer.forkCache.parentIdCacheMiss

cacheStats.ForkCache.ParentIdsCacheLen = uint64(indexer.forkCache.parentIdsCache.Len())
cacheStats.ForkCache.ParentIdsCacheHit = indexer.forkCache.parentIdsCacheHit
cacheStats.ForkCache.ParentIdsCacheMiss = indexer.forkCache.parentIdsCacheMiss
}

func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats) {
Expand Down
55 changes: 29 additions & 26 deletions indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type EpochStatsValues struct {
// EpochStatsPacked holds the packed values for the epoch-specific information.
type EpochStatsPacked struct {
ActiveValidators []EpochStatsPackedValidator
ProposerDuties []phase0.ValidatorIndex
SyncCommitteeDuties []phase0.ValidatorIndex
RandaoMix phase0.Hash32
NextRandaoMix phase0.Hash32
Expand Down Expand Up @@ -159,6 +160,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {

packedValues := &EpochStatsPacked{
ActiveValidators: make([]EpochStatsPackedValidator, es.values.ActiveValidators),
ProposerDuties: es.values.ProposerDuties,
SyncCommitteeDuties: es.values.SyncCommitteeDuties,
RandaoMix: es.values.RandaoMix,
NextRandaoMix: es.values.NextRandaoMix,
Expand Down Expand Up @@ -188,7 +190,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {

// unmarshalSSZ unmarshals the EpochStats values using the provided SSZ bytes.
// skips computing attester duties if withCommittees is false to speed up the process.
func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withCommittees bool) (*EpochStatsValues, error) {
func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withDuties bool) (*EpochStatsValues, error) {
if dynSsz == nil {
dynSsz = dynssz.NewDynSsz(nil)
}
Expand All @@ -213,6 +215,7 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu
NextRandaoMix: packedValues.NextRandaoMix,
ActiveIndices: make([]phase0.ValidatorIndex, len(packedValues.ActiveValidators)),
EffectiveBalances: make([]uint16, len(packedValues.ActiveValidators)),
ProposerDuties: packedValues.ProposerDuties,
SyncCommitteeDuties: packedValues.SyncCommitteeDuties,
TotalBalance: packedValues.TotalBalance,
ActiveBalance: packedValues.ActiveBalance,
Expand All @@ -232,35 +235,35 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu

values.ActiveValidators = uint64(len(packedValues.ActiveValidators))

beaconState := &duties.BeaconState{
RandaoMix: &values.RandaoMix,
GetActiveCount: func() uint64 {
return values.ActiveValidators
},
GetEffectiveBalance: func(index duties.ActiveIndiceIndex) phase0.Gwei {
return phase0.Gwei(values.EffectiveBalances[index]) * EtherGweiFactor
},
}

// compute proposers
proposerDuties := []phase0.ValidatorIndex{}
for slot := chainState.EpochToSlot(es.epoch); slot < chainState.EpochToSlot(es.epoch+1); slot++ {
proposer, err := duties.GetProposerIndex(chainState.GetSpecs(), beaconState, slot)
proposerIndex := phase0.ValidatorIndex(math.MaxInt64)
if err == nil {
proposerIndex = values.ActiveIndices[proposer]
if withDuties {
beaconState := &duties.BeaconState{
RandaoMix: &values.RandaoMix,
GetActiveCount: func() uint64 {
return values.ActiveValidators
},
GetEffectiveBalance: func(index duties.ActiveIndiceIndex) phase0.Gwei {
return phase0.Gwei(values.EffectiveBalances[index]) * EtherGweiFactor
},
}

proposerDuties = append(proposerDuties, proposerIndex)
}
// compute proposers
proposerDuties := []phase0.ValidatorIndex{}
for slot := chainState.EpochToSlot(es.epoch); slot < chainState.EpochToSlot(es.epoch+1); slot++ {
proposer, err := duties.GetProposerIndex(chainState.GetSpecs(), beaconState, slot)
proposerIndex := phase0.ValidatorIndex(math.MaxInt64)
if err == nil {
proposerIndex = values.ActiveIndices[proposer]
}

values.ProposerDuties = proposerDuties
if beaconState.RandaoMix != nil {
values.RandaoMix = *beaconState.RandaoMix
}
proposerDuties = append(proposerDuties, proposerIndex)
}

// compute committees
if withCommittees {
values.ProposerDuties = proposerDuties
if beaconState.RandaoMix != nil {
values.RandaoMix = *beaconState.RandaoMix
}

// compute committees
attesterDuties, _ := duties.GetAttesterDuties(chainState.GetSpecs(), beaconState, es.epoch)
values.AttesterDuties = attesterDuties
}
Expand Down
20 changes: 9 additions & 11 deletions indexer/beacon/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
}
canonicalBlocks = append(canonicalBlocks, block)
} else {
if block.isInFinalizedDb {
// orphaned block which is already in db, ignore
continue
}
if block.block == nil {
indexer.logger.Warnf("missing block body for orphaned block %v (%v)", block.Slot, block.Root.String())
continue
Expand Down Expand Up @@ -353,16 +349,18 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
}

// delete unfinalized epoch aggregations in epoch
if err := db.DeleteUnfinalizedEpochsIn(uint64(epoch), tx); err != nil {
return fmt.Errorf("failed deleting unfinalized epoch aggregations of epoch %v: %v", epoch, err)
if err := db.DeleteUnfinalizedEpochsBefore(uint64(epoch+1), tx); err != nil {
return fmt.Errorf("failed deleting unfinalized epoch aggregations <= epoch %v: %v", epoch, err)
}

// delete unfinalized forks for canonical roots
if err := db.UpdateFinalizedForkParents(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed updating finalized fork parents: %v", err)
}
if err := db.DeleteFinalizedForks(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed deleting finalized forks: %v", err)
if len(canonicalRoots) > 0 {
if err := db.UpdateFinalizedForkParents(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed updating finalized fork parents: %v", err)
}
if err := db.DeleteFinalizedForks(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed deleting finalized forks: %v", err)
}
}

return nil
Expand Down
52 changes: 32 additions & 20 deletions indexer/beacon/forkcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,27 @@ import (

// forkCache is a struct that represents the fork cache in the indexer.
type forkCache struct {
indexer *Indexer
cacheMutex sync.RWMutex
forkMap map[ForkKey]*Fork
finalizedForkId ForkKey
lastForkId ForkKey
parentIdCache *lru.Cache[ForkKey, ForkKey]
parentIdCacheHit uint64
parentIdCacheMiss uint64
forkProcessLock sync.Mutex
indexer *Indexer
cacheMutex sync.RWMutex
forkMap map[ForkKey]*Fork
finalizedForkId ForkKey
lastForkId ForkKey
parentIdCache *lru.Cache[ForkKey, ForkKey]
parentIdCacheHit uint64
parentIdCacheMiss uint64
parentIdsCache *lru.Cache[ForkKey, []ForkKey]
parentIdsCacheHit uint64
parentIdsCacheMiss uint64
forkProcessLock sync.Mutex
}

// newForkCache creates a new instance of the forkCache struct.
func newForkCache(indexer *Indexer) *forkCache {
return &forkCache{
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
parentIdCache: lru.NewCache[ForkKey, ForkKey](1000),
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
parentIdCache: lru.NewCache[ForkKey, ForkKey](1000),
parentIdsCache: lru.NewCache[ForkKey, []ForkKey](30),
}
}

Expand Down Expand Up @@ -120,30 +124,37 @@ func (cache *forkCache) removeFork(forkId ForkKey) {

// getParentForkIds returns the parent fork ids of the given fork.
func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey {
parentForks := []ForkKey{forkId}
parentForks, isCached := cache.parentIdsCache.Get(forkId)
if isCached {
cache.parentIdsCacheHit++
return parentForks
}

parentForks = []ForkKey{forkId}
parentForkId := forkId
thisFork := cache.getForkById(forkId)

for parentForkId > 1 {
if thisFork != nil {
parentForkId = thisFork.parentFork
} else if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached {
if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached {
cache.parentIdCacheHit++
parentForkId = cachedParent
} else if parentFork := cache.getForkById(parentForkId); parentFork != nil {
parentForkId = parentFork.parentFork
} else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil {
cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(dbFork.ParentFork))
parentForkId = ForkKey(dbFork.ParentFork)
cache.parentIdCache.Add(ForkKey(dbFork.ForkId), ForkKey(dbFork.ParentFork))
cache.parentIdCacheMiss++
} else {
parentForkId = 0
cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(0))
parentForkId = 0
cache.parentIdCacheMiss++
}

thisFork = cache.getForkById(parentForkId)
parentForks = append(parentForks, parentForkId)
}

cache.parentIdsCache.Add(forkId, parentForks)
cache.parentIdsCacheMiss++

return parentForks
}

Expand Down Expand Up @@ -247,6 +258,7 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo
}

cache.finalizedForkId = finalizedForkId
cache.parentIdsCache.Purge()

err := db.RunDBTransaction(func(tx *sqlx.Tx) error {
return cache.updateForkState(tx)
Expand Down
Loading

0 comments on commit bade6c9

Please sign in to comment.