diff --git a/backend/pkg/exporter/modules/execution_payloads_exporter.go b/backend/pkg/exporter/modules/execution_payloads_exporter.go index 7266821cc..db7440fab 100644 --- a/backend/pkg/exporter/modules/execution_payloads_exporter.go +++ b/backend/pkg/exporter/modules/execution_payloads_exporter.go @@ -1,6 +1,7 @@ package modules import ( + "bytes" "context" "database/sql" "fmt" @@ -99,6 +100,7 @@ func (d *executionPayloadsExporter) maintainTable() (err error) { blockChan := make(chan *types.Eth1BlockIndexed, 1000) type Result struct { BlockHash []byte + BlockNumber uint64 FeeRecipientReward decimal.Decimal } resData := make([]Result, 0, maxBlock-minBlock+1) @@ -137,7 +139,7 @@ func (d *executionPayloadsExporter) maintainTable() (err error) { if err != nil { return fmt.Errorf("error converting tx reward to decimal for block %v: %w", block.Number, err) } - resData = append(resData, Result{BlockHash: hash, FeeRecipientReward: dec}) + resData = append(resData, Result{BlockHash: hash, FeeRecipientReward: dec, BlockNumber: block.Number}) } }) @@ -154,9 +156,43 @@ func (d *executionPayloadsExporter) maintainTable() (err error) { if err != nil { return fmt.Errorf("error processing blocks: %w", err) } + // sanity checks: check if any block hashes are 0x0000000000000000000000000000000000000000000000000000000000000000 or duplicate, check if count matches expected + seen := make(map[string]bool) + emptyBlockHash := bytes.Repeat([]byte{0}, 32) + err = error(nil) + counter := 0 + for _, r := range resData { + if counter > 25 { + err = fmt.Errorf("too many errors, aborting") + log.Error(err, "error processing blocks", 0) + break + } + if len(r.BlockHash) == 0 { + err = fmt.Errorf("error processing blocks: block hash is empty, block number: %v", r.BlockNumber) + log.Error(err, "error processing blocks", 0) + counter++ + } + if bytes.Equal(r.BlockHash, emptyBlockHash) { + err = fmt.Errorf("error processing blocks: block hash is all zeros, block number: %v", r.BlockNumber) + log.Error(err, "error processing blocks", 0) + counter++ + } + if _, ok := seen[string(r.BlockHash)]; ok { + err = fmt.Errorf("error processing blocks: duplicate block hash, block number: %v", r.BlockNumber) + log.Error(err, "error processing blocks", 0) + counter++ + } + seen[string(r.BlockHash)] = true + } + if err != nil { + return err + } - // update the execution_payloads table + if uint64(len(resData)) != maxBlock-minBlock+1 { + return fmt.Errorf("error processing blocks: expected %v blocks, got %v", maxBlock-minBlock+1, len(resData)) + } + // update the execution_payloads table log.Infof("preparing copy update to temp table") // load data into temp table diff --git a/backend/pkg/exporter/modules/execution_rewards_finalizer.go b/backend/pkg/exporter/modules/execution_rewards_finalizer.go index 13a6529af..c1fd78ded 100644 --- a/backend/pkg/exporter/modules/execution_rewards_finalizer.go +++ b/backend/pkg/exporter/modules/execution_rewards_finalizer.go @@ -8,12 +8,14 @@ import ( "github.com/doug-martin/goqu/v9" "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" + "github.com/gobitfly/beaconchain/pkg/commons/utils" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" ) type executionRewardsFinalizer struct { ModuleContext ExportMutex *sync.Mutex + CooldownTs time.Time } func NewExecutionRewardFinalizer(moduleContext ModuleContext) ModuleInterface { @@ -40,6 +42,9 @@ func (d *executionRewardsFinalizer) OnFinalizedCheckpoint(event *constypes.Stand } func (d *executionRewardsFinalizer) OnHead(event *constypes.StandardEventHeadResponse) (err error) { + if time.Now().Before(d.CooldownTs) { + log.Warnf("execution rewards finalizer is on cooldown till %s", d.CooldownTs) + } // if mutex is locked, return early if !d.ExportMutex.TryLock() { log.Infof("execution rewards finalizer is already running") @@ -48,6 +53,7 @@ func (d *executionRewardsFinalizer) OnHead(event *constypes.StandardEventHeadRes defer d.ExportMutex.Unlock() err = d.maintainTable() if err != nil { + d.CooldownTs = time.Now().Add(1 * time.Minute) return fmt.Errorf("error maintaining table: %w", err) } return nil @@ -80,16 +86,47 @@ func (d *executionRewardsFinalizer) maintainTable() (err error) { } // limit to prevent overloading - if latestFinalizedSlot-lastExportedSlot > 250_000 { - latestFinalizedSlot = lastExportedSlot + 250_000 + // gnosis has a 5 second slot window, so to prevent hammering the db scale the batch size by the slot time + batch := int64(10_000 * utils.Config.Chain.ClConfig.SlotsPerEpoch) + if latestFinalizedSlot-lastExportedSlot > batch { + latestFinalizedSlot = lastExportedSlot + batch } if latestFinalizedSlot <= lastExportedSlot { log.Debugf("no new finalized slots to export") return nil } + // sanity check, check if any non-missed block has a fee_recipient_reward that is NULL + var count struct { + Total int64 `db:"total"` + NonNull int64 `db:"non_null"` + } + gc := goqu.Dialect("postgres").From("blocks"). + Select( + goqu.Func("count", goqu.Star()).As("total"), + goqu.Func("count", goqu.I("ep.fee_recipient_reward")).As("non_null"), + ). + LeftJoin( + goqu.T("execution_payloads").As("ep"), + goqu.On(goqu.I("ep.block_hash").Eq(goqu.I("blocks.exec_block_hash"))), + ). + Where( + goqu.I("slot").Gt(lastExportedSlot), + goqu.I("slot").Lte(latestFinalizedSlot), + goqu.I("status").Eq("1"), + ) + query, args, err := gc.Prepared(true).ToSQL() + if err != nil { + return fmt.Errorf("error preparing query: %w", err) + } + err = db.ReaderDb.Get(&count, query, args...) + if err != nil { + return fmt.Errorf("error getting count of non-missed blocks: %w", err) + } + if count.Total != count.NonNull { + return fmt.Errorf("only %v out of %v blocks have non-null fee_recipient_reward", count.NonNull, count.Total) + } log.Infof("finalized rewards = last exported slot: %v, latest finalized slot: %v", lastExportedSlot, latestFinalizedSlot) - start := time.Now() ds := goqu.Dialect("postgres").Insert("execution_rewards_finalized").FromQuery( goqu.From(goqu.T("blocks").As("b")). @@ -122,12 +159,11 @@ func (d *executionRewardsFinalizer) maintainTable() (err error) { log.Debugf("writing execution rewards finalized data") - query, args, err := ds.Prepared(true).ToSQL() + query, args, err = ds.Prepared(true).ToSQL() if err != nil { return fmt.Errorf("error preparing query: %w", err) } _, err = db.WriterDb.Exec(query, args...) - if err != nil { return fmt.Errorf("error inserting data: %w", err) }