Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(exporter): make el payload exporter less stingy, report unseen blocks in bigtable, fix el rewards finalizer #1279

Merged
merged 3 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions backend/pkg/api/data_access/healthz.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/monitoring/constants"
monitoringServices "github.com/gobitfly/beaconchain/pkg/monitoring/services"
)

type HealthzRepository interface {
Expand Down Expand Up @@ -128,10 +129,7 @@ func (d *DataAccessService) GetHealthz(ctx context.Context, showAll bool) types.
for _, result := range results {
response.Reports[result.EventId] = append(response.Reports[result.EventId], result)
}
requiredEvents := constants.RequiredEvents
if utils.Config.DeploymentType == "production" {
requiredEvents = append(requiredEvents, constants.ProductionRequiredEvents...)
}
requiredEvents := monitoringServices.GetRequiredEvents()
for _, id := range requiredEvents {
if _, ok := response.Reports[string(id)]; !ok {
response.Reports[string(id)] = []types.HealthzResult{
Expand Down
19 changes: 16 additions & 3 deletions backend/pkg/exporter/modules/execution_payloads_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"database/sql"
"fmt"
"maps"
"math/big"
"slices"
"sync"
"time"

Expand Down Expand Up @@ -120,7 +122,6 @@ func (d *executionPayloadsExporter) maintainTable() (err error) {
ctx, abortProcessing := context.WithCancel(context.Background())
defer abortProcessing() // to kill the errgroup if something goes wrong
group, _ := errgroup.WithContext(ctx)

// coroutine to process the blocks
group.Go(func() error {
var block *types.Eth1BlockIndexed
Expand Down Expand Up @@ -171,6 +172,10 @@ func (d *executionPayloadsExporter) maintainTable() (err error) {
// sanity checks: check if any block hashes are 0x0000000000000000000000000000000000000000000000000000000000000000 or duplicate, check if count matches expected
seen := make(map[string]bool)
emptyBlockHash := bytes.Repeat([]byte{0}, 32)
unseenBlockNumbers := make(map[uint64]bool)
for i := minBlock; i <= maxBlock; i++ {
unseenBlockNumbers[i] = true
}
err = error(nil)
counter := 0
for _, r := range resData {
Expand All @@ -195,13 +200,21 @@ func (d *executionPayloadsExporter) maintainTable() (err error) {
counter++
}
seen[string(r.BlockHash)] = true
if _, ok := unseenBlockNumbers[r.BlockNumber]; !ok {
err = fmt.Errorf("error processing blocks: block number %v is not in the unseen map", r.BlockNumber)
log.Error(err, "error processing blocks", 0)
counter++
}
delete(unseenBlockNumbers, r.BlockNumber) // noop in case it doesn't exist
}

if err != nil {
return err
}

if uint64(len(resData)) != maxBlock-minBlock+1 {
return fmt.Errorf("error processing blocks: expected %v blocks, got %v", maxBlock-minBlock+1, len(resData))
u := slices.Collect(maps.Keys(unseenBlockNumbers))
if len(u) > 0 && slices.Min(u) < maxBlock-10 { // we are fine with the last 10 blocks being missing, the indexer might not have caught up yet
return fmt.Errorf("error processing blocks: expected %v blocks, got %v, unseen map: %v", maxBlock-minBlock+1, len(resData), maps.Keys(unseenBlockNumbers))
}

// update the execution_payloads table
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (d *executionRewardsFinalizer) maintainTable() (err error) {
gc := goqu.Dialect("postgres").From("blocks").
Select(
goqu.Func("count", goqu.Star()).As("total"),
goqu.Func("count", goqu.I("ep.fee_recipient_reward")).As("non_null"),
goqu.Func("count", goqu.I("ep.fee_recipient_reward").IsNot(nil)).As("non_null"),
).
LeftJoin(
goqu.T("execution_payloads").As("ep"),
Expand Down
5 changes: 3 additions & 2 deletions backend/pkg/exporter/modules/pubkey_tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ import (

func UpdatePubkeyTag() {
log.Infof("Started Pubkey Tags Updater")
delay := time.Minute * 10
for {
start := time.Now()
r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyPubkeyTags, constants.Default, time.Second*12)
r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyPubkeyTags, delay, time.Second*12)
r(constants.Running, nil)
tx, err := db.WriterDb.Beginx()
if err != nil {
Expand Down Expand Up @@ -45,6 +46,6 @@ func UpdatePubkeyTag() {
r(constants.Success, map[string]string{"took": time.Since(start).String(), "took_raw": time.Since(start).String()})
metrics.TaskDuration.WithLabelValues("validator_pubkey_tag_updater").Observe(time.Since(start).Seconds())

time.Sleep(time.Minute * 10)
time.Sleep(delay)
}
}
2 changes: 1 addition & 1 deletion backend/pkg/exporter/modules/rocketpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ func (rp *RocketpoolExporter) Run() error {

for {
t0 := time.Now()
r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyRocketPool, constants.Default, time.Second*12)
r := monitoringServices.NewStatusReport(constants.Event_ExporterLegacyRocketPool, time.Hour*4, rp.UpdateInterval) // currently takes 2h40m on mainnet...
r(constants.Running, nil)
var err error
err = rp.Update(count)
Expand Down
6 changes: 3 additions & 3 deletions backend/pkg/monitoring/constants/main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package constants

import "time"
import (
"time"
)

// status enum
type StatusType string
Expand Down Expand Up @@ -69,8 +71,6 @@ var ProductionRequiredEvents = []Event{
Event_ExporterLegacyNetworkLiveness,
Event_ExporterLegacySyncCommittees,
Event_ExporterLegacySyncCommitteesCount,
Event_ExporterLegacyRocketPool,
Event_ExporterLegacyPubkeyTags,
Event_ExporterModuleELRewardsFinalizer,
Event_ExporterModuleELPayloadExporter,
Event_ExporterModuleELDepositsExporter,
Expand Down
15 changes: 15 additions & 0 deletions backend/pkg/monitoring/services/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,18 @@ func NewStatusReport(id constants.Event, timeout time.Duration, check_interval t
}()
}
}

func GetRequiredEvents() []constants.Event {
// i would hope this simple of a function doesnt need caching
requiredEvents := constants.RequiredEvents
if utils.Config.DeploymentType == "production" {
requiredEvents = append(requiredEvents, constants.ProductionRequiredEvents...)
}
if utils.Config.RocketpoolExporter.Enabled {
requiredEvents = append(requiredEvents, constants.Event_ExporterLegacyRocketPool)
}
if utils.Config.Indexer.PubKeyTagsExporter.Enabled {
requiredEvents = append(requiredEvents, constants.Event_ExporterLegacyPubkeyTags)
}
return requiredEvents
}
Loading