Skip to content

Commit

Permalink
refactor(execution_deposits_exporter): improve getting head and final…
Browse files Browse the repository at this point in the history
…ized blocks
  • Loading branch information
guybrush committed Oct 16, 2024
1 parent 20cc367 commit 5ee4eaa
Showing 1 changed file with 41 additions and 31 deletions.
72 changes: 41 additions & 31 deletions backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
Expand Down Expand Up @@ -163,59 +165,67 @@ func (d *executionDepositsExporter) OnFinalizedCheckpoint(event *constypes.Stand
}

func (d *executionDepositsExporter) OnHead(event *constypes.StandardEventHeadResponse) (err error) {
var nearestELBlock sql.NullInt64
err = db.ReaderDb.Get(&nearestELBlock, "select exec_block_number from blocks where exec_block_number > 0 order by slot desc limit 1")
if err != nil {
return err
}
if !nearestELBlock.Valid {
return fmt.Errorf("no block found for head slot %v", event.Slot)
}
log.Debugf("setting nearestELBlock for execution_deposits_exporter: %v", nearestELBlock.Int64)
d.CurrentHeadBlock.Store(uint64(nearestELBlock.Int64))
return nil
return nil // nop
}

func (d *executionDepositsExporter) exportLoop() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for ; true; <-ticker.C {
currentHeadBlock := d.CurrentHeadBlock.Load()
if currentHeadBlock == 0 {
continue
}
if currentHeadBlock <= d.LastExportedBlock {
continue
}
err := d.exportTillBlock(currentHeadBlock)
err := d.export()
if err != nil {
log.Error(err, fmt.Sprintf("error during export to currentHeadBlock %d", currentHeadBlock), 0)
log.Error(err, "error during export", 0)
services.ReportStatus("execution_deposits_exporter", err.Error(), nil)
} else {
services.ReportStatus("execution_deposits_exporter", "Running", nil)
}
}
}

func (d *executionDepositsExporter) exportTillBlock(block uint64) (err error) {
// important: have to fetch the actual finalized epoch because even tho its called on finalized checkpoint it actually emits for each justified epoch
res, err := d.CL.GetFinalityCheckpoints("head")
func (d *executionDepositsExporter) export() (err error) {
var headBlock, finBlock uint64
var g errgroup.Group
g.Go(func() error {
headSlot, err := d.CL.GetSlot("head")
if err != nil {
return fmt.Errorf("error getting head-slot: %w", err)
}
headBlock = headSlot.Data.Message.Body.ExecutionPayload.BlockNumber
return nil
})
g.Go(func() error {
finSlot, err := d.CL.GetSlot("finalized")
if err != nil {
return fmt.Errorf("error getting finalized-slot: %w", err)
}
finBlock = finSlot.Data.Message.Body.ExecutionPayload.BlockNumber
return nil
})
err = g.Wait()
if err != nil {
return fmt.Errorf("failed calling GetFinalityCheckpoints(head): %w", err)
return err
}

// note: nextFinalizedSlot can actually be in the next epoch, but for this exporter we do not care about such details
nextFinalizedSlot := res.Data.Finalized.Epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch
nextFinalizedBlock, err := db.GetBlockNumber(nextFinalizedSlot)
if err != nil {
return fmt.Errorf("error getting next finalized block: %w", err)
if d.LastExportedBlock >= headBlock && d.LastExportedFinalizedBlock >= finBlock {
log.Debugf("skip exporting execution layer deposits: last exported block/finalizedBlock: %v/%v, headBlock/finalizedBlock: %v/%v", d.LastExportedBlock, d.LastExportedFinalizedBlock, headBlock, finBlock)
return nil
}

nextFinalizedBlock := finBlock

blockOffset := d.LastExportedBlock + 1
// make sure to reexport every block since last exported finalized blocks to handle reorgs
if blockOffset > d.LastExportedFinalizedBlock {
blockOffset = d.LastExportedFinalizedBlock + 1
}
blockTarget := block
blockTarget := headBlock

log.Infof("exporting execution layer deposits from %v to %v", blockOffset, blockTarget)
log.InfoWithFields(log.Fields{
"nextHeadBlock": headBlock,
"nextFinBlock": nextFinalizedBlock,
"lastHeadBlock": d.LastExportedBlock,
"lastFinBlock": d.LastExportedFinalizedBlock,
}, fmt.Sprintf("exporting execution layer deposits from %d to %d", blockOffset, blockTarget))

depositsToSaveBatchSize := 10_000 // limit how much deposits we save in one go
blockBatchSize := uint64(10_000) // limit how much blocks we fetch until updating the redis-key
Expand Down

0 comments on commit 5ee4eaa

Please sign in to comment.