Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(execution_deposits_exporter): export deposits up to head and… #939

Merged
merged 3 commits into from
Oct 16, 2024
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 41 additions & 31 deletions backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@ import (
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/rpc"
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
Expand Down Expand Up @@ -163,59 +165,67 @@ func (d *executionDepositsExporter) OnFinalizedCheckpoint(event *constypes.Stand
}

func (d *executionDepositsExporter) OnHead(event *constypes.StandardEventHeadResponse) (err error) {
var nearestELBlock sql.NullInt64
err = db.ReaderDb.Get(&nearestELBlock, "select exec_block_number from blocks where exec_block_number > 0 order by slot desc limit 1")
if err != nil {
return err
}
if !nearestELBlock.Valid {
return fmt.Errorf("no block found for head slot %v", event.Slot)
}
log.Debugf("setting nearestELBlock for execution_deposits_exporter: %v", nearestELBlock.Int64)
d.CurrentHeadBlock.Store(uint64(nearestELBlock.Int64))
return nil
return nil // nop
}

func (d *executionDepositsExporter) exportLoop() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for ; true; <-ticker.C {
currentHeadBlock := d.CurrentHeadBlock.Load()
if currentHeadBlock == 0 {
continue
}
if currentHeadBlock <= d.LastExportedBlock {
continue
}
err := d.exportTillBlock(currentHeadBlock)
err := d.export()
if err != nil {
log.Error(err, fmt.Sprintf("error during export to currentHeadBlock %d", currentHeadBlock), 0)
log.Error(err, "error during export", 0)
services.ReportStatus("execution_deposits_exporter", err.Error(), nil)
} else {
services.ReportStatus("execution_deposits_exporter", "Running", nil)
}
}
}

func (d *executionDepositsExporter) exportTillBlock(block uint64) (err error) {
// important: have to fetch the actual finalized epoch because even tho its called on finalized checkpoint it actually emits for each justified epoch
res, err := d.CL.GetFinalityCheckpoints("head")
func (d *executionDepositsExporter) export() (err error) {
var headBlock, finBlock uint64
var g errgroup.Group
g.Go(func() error {
headSlot, err := d.CL.GetSlot("head")
if err != nil {
return fmt.Errorf("error getting head-slot: %w", err)
}
headBlock = headSlot.Data.Message.Body.ExecutionPayload.BlockNumber
return nil
})
g.Go(func() error {
finSlot, err := d.CL.GetSlot("finalized")
if err != nil {
return fmt.Errorf("error getting finalized-slot: %w", err)
}
finBlock = finSlot.Data.Message.Body.ExecutionPayload.BlockNumber
return nil
})
Comment on lines +188 to +203
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this could be done in onHead & onFinalizedCheckpoint to follow the event driven approach

but since export() is in a fixed loop i would not bother with it rn since it would yield no benefit

err = g.Wait()
if err != nil {
return fmt.Errorf("failed calling GetFinalityCheckpoints(head): %w", err)
return err
}

// note: nextFinalizedSlot can actually be in the next epoch, but for this exporter we do not care about such details
nextFinalizedSlot := res.Data.Finalized.Epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch
nextFinalizedBlock, err := db.GetBlockNumber(nextFinalizedSlot)
if err != nil {
return fmt.Errorf("error getting next finalized block: %w", err)
if d.LastExportedBlock >= headBlock && d.LastExportedFinalizedBlock >= finBlock {
log.Debugf("skip exporting execution layer deposits: last exported block/finalizedBlock: %v/%v, headBlock/finalizedBlock: %v/%v", d.LastExportedBlock, d.LastExportedFinalizedBlock, headBlock, finBlock)
return nil
}

nextFinalizedBlock := finBlock

blockOffset := d.LastExportedBlock + 1
// make sure to reexport every block since last exported finalized blocks to handle reorgs
if blockOffset > d.LastExportedFinalizedBlock {
blockOffset = d.LastExportedFinalizedBlock + 1
}
blockTarget := block
blockTarget := headBlock

log.Infof("exporting execution layer deposits from %v to %v", blockOffset, blockTarget)
log.InfoWithFields(log.Fields{
"nextHeadBlock": headBlock,
"nextFinBlock": nextFinalizedBlock,
"lastHeadBlock": d.LastExportedBlock,
"lastFinBlock": d.LastExportedFinalizedBlock,
}, fmt.Sprintf("exporting execution layer deposits from %d to %d", blockOffset, blockTarget))

depositsToSaveBatchSize := 10_000 // limit how much deposits we save in one go
blockBatchSize := uint64(10_000) // limit how much blocks we fetch until updating the redis-key
Expand Down
Loading