Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve performance during long unfinality periods #224

Merged
merged 15 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion db/slots.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func InsertSlot(slot *dbtypes.Slot, tx *sqlx.Tx) error {
ON CONFLICT (slot, root) DO UPDATE SET
status = excluded.status,
eth_block_extra = excluded.eth_block_extra,
eth_block_extra_text = excluded.eth_block_extra_text`,
eth_block_extra_text = excluded.eth_block_extra_text,
fork_id = excluded.fork_id`,
dbtypes.DBEngineSqlite: `
INSERT OR REPLACE INTO slots (
slot, proposer, status, root, parent_root, state_root, graffiti, graffiti_text,
Expand Down
4 changes: 2 additions & 2 deletions db/unfinalized_epochs.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ func GetUnfinalizedEpoch(epoch uint64, headRoot []byte) *dbtypes.UnfinalizedEpoc
return &unfinalizedEpoch
}

func DeleteUnfinalizedEpochsIn(epoch uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`DELETE FROM unfinalized_epochs WHERE epoch = $1`, epoch)
func DeleteUnfinalizedEpochsBefore(epoch uint64, tx *sqlx.Tx) error {
_, err := tx.Exec(`DELETE FROM unfinalized_epochs WHERE epoch < $1`, epoch)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion db/withdrawal_request_txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func InsertWithdrawalRequestTxs(withdrawalTxs []*dbtypes.WithdrawalRequestTx, tx
argIdx += fieldCount
}
fmt.Fprint(&sql, EngineQuery(map[dbtypes.DBEngineType]string{
dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET fork_id = excluded.fork_id",
dbtypes.DBEnginePgsql: " ON CONFLICT (block_root, block_index) DO UPDATE SET validator_index = excluded.validator_index, fork_id = excluded.fork_id",
dbtypes.DBEngineSqlite: "",
}))

Expand Down
77 changes: 23 additions & 54 deletions indexer/beacon/canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"math"
"slices"
"sort"
"strings"
"time"

Expand All @@ -28,47 +27,14 @@ func (indexer *Indexer) GetCanonicalHead(overrideForkId *ForkKey) *Block {

if overrideForkId != nil && indexer.canonicalHead != nil && indexer.canonicalHead.forkId != *overrideForkId {
chainHeads := indexer.cachedChainHeads
chainHeadCandidates := []*ChainHead{}

for _, chainHead := range chainHeads {
parentForkIds := indexer.forkCache.getParentForkIds(chainHead.HeadBlock.forkId)
isInParentIds := false
for _, parentForkId := range parentForkIds {
if parentForkId == *overrideForkId {
isInParentIds = true
break
}
}
if !isInParentIds {
if !slices.Contains(parentForkIds, *overrideForkId) {
continue
}

chainHeadCandidates = append(chainHeadCandidates, chainHead)
}

if len(chainHeadCandidates) > 0 {
sort.Slice(chainHeadCandidates, func(i, j int) bool {
percentagesI := float64(0)
percentagesJ := float64(0)
for k := range chainHeadCandidates[i].PerEpochVotingPercent {
factor := float64(1)
if k == len(chainHeadCandidates[i].PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesI += chainHeadCandidates[i].PerEpochVotingPercent[k] * factor
if len(chainHeadCandidates[j].PerEpochVotingPercent) > k {
percentagesJ += chainHeadCandidates[j].PerEpochVotingPercent[k] * factor
}
}

if percentagesI != percentagesJ {
return percentagesI > percentagesJ
}

return chainHeadCandidates[i].HeadBlock.Slot > chainHeadCandidates[j].HeadBlock.Slot
})

return chainHeadCandidates[0].HeadBlock
return chainHead.HeadBlock
}
}

Expand All @@ -81,24 +47,6 @@ func (indexer *Indexer) GetChainHeads() []*ChainHead {

heads := make([]*ChainHead, len(indexer.cachedChainHeads))
copy(heads, indexer.cachedChainHeads)
sort.Slice(heads, func(i, j int) bool {
percentagesI := float64(0)
percentagesJ := float64(0)
for k := range heads[i].PerEpochVotingPercent {
factor := float64(1)
if k == len(heads[i].PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesI += heads[i].PerEpochVotingPercent[k] * factor
percentagesJ += heads[j].PerEpochVotingPercent[k] * factor
}

if percentagesI != percentagesJ {
return percentagesI > percentagesJ
}

return heads[i].HeadBlock.Slot > heads[j].HeadBlock.Slot
})

return heads
}
Expand Down Expand Up @@ -233,6 +181,27 @@ func (indexer *Indexer) computeCanonicalChain() bool {
}
}

slices.SortFunc(chainHeads, func(headA, headB *ChainHead) int {
percentagesA := float64(0)
percentagesB := float64(0)
for k := range headA.PerEpochVotingPercent {
factor := float64(1)
if k == len(headA.PerEpochVotingPercent)-1 {
factor = 0.5
}
percentagesA += headA.PerEpochVotingPercent[k] * factor
if len(headB.PerEpochVotingPercent) > k {
percentagesB += headB.PerEpochVotingPercent[k] * factor
}
}

if percentagesA != percentagesB {
return int((percentagesB - percentagesA) * 100)
}

return int(headB.HeadBlock.Slot - headA.HeadBlock.Slot)
})

return true
}

Expand Down
15 changes: 11 additions & 4 deletions indexer/beacon/debug.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ type CacheDebugStats struct {
VotesCacheMiss uint64
}
ForkCache struct {
ForkMap CacheDebugMapSize
ParentIdCacheLen uint64
ParentIdCacheHit uint64
ParentIdCacheMiss uint64
ForkMap CacheDebugMapSize
ParentIdCacheLen uint64
ParentIdCacheHit uint64
ParentIdCacheMiss uint64
ParentIdsCacheLen uint64
ParentIdsCacheHit uint64
ParentIdsCacheMiss uint64
}
ValidatorCache struct {
Validators uint64
Expand Down Expand Up @@ -142,6 +145,10 @@ func (indexer *Indexer) getForkCacheDebugStats(cacheStats *CacheDebugStats) {
cacheStats.ForkCache.ParentIdCacheLen = uint64(indexer.forkCache.parentIdCache.Len())
cacheStats.ForkCache.ParentIdCacheHit = indexer.forkCache.parentIdCacheHit
cacheStats.ForkCache.ParentIdCacheMiss = indexer.forkCache.parentIdCacheMiss

cacheStats.ForkCache.ParentIdsCacheLen = uint64(indexer.forkCache.parentIdsCache.Len())
cacheStats.ForkCache.ParentIdsCacheHit = indexer.forkCache.parentIdsCacheHit
cacheStats.ForkCache.ParentIdsCacheMiss = indexer.forkCache.parentIdsCacheMiss
}

func (indexer *Indexer) getValidatorCacheDebugStats(cacheStats *CacheDebugStats) {
Expand Down
55 changes: 29 additions & 26 deletions indexer/beacon/epochstats.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type EpochStatsValues struct {
// EpochStatsPacked holds the packed values for the epoch-specific information.
type EpochStatsPacked struct {
ActiveValidators []EpochStatsPackedValidator
ProposerDuties []phase0.ValidatorIndex
SyncCommitteeDuties []phase0.ValidatorIndex
RandaoMix phase0.Hash32
NextRandaoMix phase0.Hash32
Expand Down Expand Up @@ -159,6 +160,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {

packedValues := &EpochStatsPacked{
ActiveValidators: make([]EpochStatsPackedValidator, es.values.ActiveValidators),
ProposerDuties: es.values.ProposerDuties,
SyncCommitteeDuties: es.values.SyncCommitteeDuties,
RandaoMix: es.values.RandaoMix,
NextRandaoMix: es.values.NextRandaoMix,
Expand Down Expand Up @@ -188,7 +190,7 @@ func (es *EpochStats) buildPackedSSZ(dynSsz *dynssz.DynSsz) ([]byte, error) {

// unmarshalSSZ unmarshals the EpochStats values using the provided SSZ bytes.
// skips computing attester duties if withCommittees is false to speed up the process.
func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withCommittees bool) (*EpochStatsValues, error) {
func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensus.ChainState, ssz []byte, withDuties bool) (*EpochStatsValues, error) {
if dynSsz == nil {
dynSsz = dynssz.NewDynSsz(nil)
}
Expand All @@ -213,6 +215,7 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu
NextRandaoMix: packedValues.NextRandaoMix,
ActiveIndices: make([]phase0.ValidatorIndex, len(packedValues.ActiveValidators)),
EffectiveBalances: make([]uint16, len(packedValues.ActiveValidators)),
ProposerDuties: packedValues.ProposerDuties,
SyncCommitteeDuties: packedValues.SyncCommitteeDuties,
TotalBalance: packedValues.TotalBalance,
ActiveBalance: packedValues.ActiveBalance,
Expand All @@ -232,35 +235,35 @@ func (es *EpochStats) parsePackedSSZ(dynSsz *dynssz.DynSsz, chainState *consensu

values.ActiveValidators = uint64(len(packedValues.ActiveValidators))

beaconState := &duties.BeaconState{
RandaoMix: &values.RandaoMix,
GetActiveCount: func() uint64 {
return values.ActiveValidators
},
GetEffectiveBalance: func(index duties.ActiveIndiceIndex) phase0.Gwei {
return phase0.Gwei(values.EffectiveBalances[index]) * EtherGweiFactor
},
}

// compute proposers
proposerDuties := []phase0.ValidatorIndex{}
for slot := chainState.EpochToSlot(es.epoch); slot < chainState.EpochToSlot(es.epoch+1); slot++ {
proposer, err := duties.GetProposerIndex(chainState.GetSpecs(), beaconState, slot)
proposerIndex := phase0.ValidatorIndex(math.MaxInt64)
if err == nil {
proposerIndex = values.ActiveIndices[proposer]
if withDuties {
beaconState := &duties.BeaconState{
RandaoMix: &values.RandaoMix,
GetActiveCount: func() uint64 {
return values.ActiveValidators
},
GetEffectiveBalance: func(index duties.ActiveIndiceIndex) phase0.Gwei {
return phase0.Gwei(values.EffectiveBalances[index]) * EtherGweiFactor
},
}

proposerDuties = append(proposerDuties, proposerIndex)
}
// compute proposers
proposerDuties := []phase0.ValidatorIndex{}
for slot := chainState.EpochToSlot(es.epoch); slot < chainState.EpochToSlot(es.epoch+1); slot++ {
proposer, err := duties.GetProposerIndex(chainState.GetSpecs(), beaconState, slot)
proposerIndex := phase0.ValidatorIndex(math.MaxInt64)
if err == nil {
proposerIndex = values.ActiveIndices[proposer]
}

values.ProposerDuties = proposerDuties
if beaconState.RandaoMix != nil {
values.RandaoMix = *beaconState.RandaoMix
}
proposerDuties = append(proposerDuties, proposerIndex)
}

// compute committees
if withCommittees {
values.ProposerDuties = proposerDuties
if beaconState.RandaoMix != nil {
values.RandaoMix = *beaconState.RandaoMix
}

// compute committees
attesterDuties, _ := duties.GetAttesterDuties(chainState.GetSpecs(), beaconState, es.epoch)
values.AttesterDuties = attesterDuties
}
Expand Down
20 changes: 9 additions & 11 deletions indexer/beacon/finalization.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,10 +147,6 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
}
canonicalBlocks = append(canonicalBlocks, block)
} else {
if block.isInFinalizedDb {
// orphaned block which is already in db, ignore
continue
}
if block.block == nil {
indexer.logger.Warnf("missing block body for orphaned block %v (%v)", block.Slot, block.Root.String())
continue
Expand Down Expand Up @@ -353,16 +349,18 @@ func (indexer *Indexer) finalizeEpoch(epoch phase0.Epoch, justifiedRoot phase0.R
}

// delete unfinalized epoch aggregations in epoch
if err := db.DeleteUnfinalizedEpochsIn(uint64(epoch), tx); err != nil {
return fmt.Errorf("failed deleting unfinalized epoch aggregations of epoch %v: %v", epoch, err)
if err := db.DeleteUnfinalizedEpochsBefore(uint64(epoch+1), tx); err != nil {
return fmt.Errorf("failed deleting unfinalized epoch aggregations <= epoch %v: %v", epoch, err)
}

// delete unfinalized forks for canonical roots
if err := db.UpdateFinalizedForkParents(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed updating finalized fork parents: %v", err)
}
if err := db.DeleteFinalizedForks(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed deleting finalized forks: %v", err)
if len(canonicalRoots) > 0 {
if err := db.UpdateFinalizedForkParents(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed updating finalized fork parents: %v", err)
}
if err := db.DeleteFinalizedForks(canonicalRoots, tx); err != nil {
return fmt.Errorf("failed deleting finalized forks: %v", err)
}
}

return nil
Expand Down
52 changes: 32 additions & 20 deletions indexer/beacon/forkcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,27 @@ import (

// forkCache is a struct that represents the fork cache in the indexer.
type forkCache struct {
indexer *Indexer
cacheMutex sync.RWMutex
forkMap map[ForkKey]*Fork
finalizedForkId ForkKey
lastForkId ForkKey
parentIdCache *lru.Cache[ForkKey, ForkKey]
parentIdCacheHit uint64
parentIdCacheMiss uint64
forkProcessLock sync.Mutex
indexer *Indexer
cacheMutex sync.RWMutex
forkMap map[ForkKey]*Fork
finalizedForkId ForkKey
lastForkId ForkKey
parentIdCache *lru.Cache[ForkKey, ForkKey]
parentIdCacheHit uint64
parentIdCacheMiss uint64
parentIdsCache *lru.Cache[ForkKey, []ForkKey]
parentIdsCacheHit uint64
parentIdsCacheMiss uint64
forkProcessLock sync.Mutex
}

// newForkCache creates a new instance of the forkCache struct.
func newForkCache(indexer *Indexer) *forkCache {
return &forkCache{
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
parentIdCache: lru.NewCache[ForkKey, ForkKey](1000),
indexer: indexer,
forkMap: make(map[ForkKey]*Fork),
parentIdCache: lru.NewCache[ForkKey, ForkKey](1000),
parentIdsCache: lru.NewCache[ForkKey, []ForkKey](30),
}
}

Expand Down Expand Up @@ -120,30 +124,37 @@ func (cache *forkCache) removeFork(forkId ForkKey) {

// getParentForkIds returns the parent fork ids of the given fork.
func (cache *forkCache) getParentForkIds(forkId ForkKey) []ForkKey {
parentForks := []ForkKey{forkId}
parentForks, isCached := cache.parentIdsCache.Get(forkId)
if isCached {
cache.parentIdsCacheHit++
return parentForks
}

parentForks = []ForkKey{forkId}
parentForkId := forkId
thisFork := cache.getForkById(forkId)

for parentForkId > 1 {
if thisFork != nil {
parentForkId = thisFork.parentFork
} else if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached {
if cachedParent, isCached := cache.parentIdCache.Get(parentForkId); isCached {
cache.parentIdCacheHit++
parentForkId = cachedParent
} else if parentFork := cache.getForkById(parentForkId); parentFork != nil {
parentForkId = parentFork.parentFork
} else if dbFork := db.GetForkById(uint64(parentForkId)); dbFork != nil {
cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(dbFork.ParentFork))
parentForkId = ForkKey(dbFork.ParentFork)
cache.parentIdCache.Add(ForkKey(dbFork.ForkId), ForkKey(dbFork.ParentFork))
cache.parentIdCacheMiss++
} else {
parentForkId = 0
cache.parentIdCache.Add(ForkKey(parentForkId), ForkKey(0))
parentForkId = 0
cache.parentIdCacheMiss++
}

thisFork = cache.getForkById(parentForkId)
parentForks = append(parentForks, parentForkId)
}

cache.parentIdsCache.Add(forkId, parentForks)
cache.parentIdsCacheMiss++

return parentForks
}

Expand Down Expand Up @@ -247,6 +258,7 @@ func (cache *forkCache) setFinalizedEpoch(finalizedSlot phase0.Slot, justifiedRo
}

cache.finalizedForkId = finalizedForkId
cache.parentIdsCache.Purge()

err := db.RunDBTransaction(func(tx *sqlx.Tx) error {
return cache.updateForkState(tx)
Expand Down
Loading
Loading