Skip to content

Commit

Permalink
refactor(execution_deposits_exporter): export deposits up to head
Browse files Browse the repository at this point in the history
* export deposits up to head
* handle reorgs by reexporting blocks since lastExportedFinalizedBlock
* persist lastExportedFinalizedBlock in redis
* refactor export-logic from multiple event-based goroutines with canceling to single loop
* aggregated deposits and cached view are only updated when export is on head
* avoid fetching old blocks if no deposits are on the chain

BEDS-585
  • Loading branch information
guybrush committed Oct 10, 2024
1 parent 5869627 commit 0575c81
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 84 deletions.
1 change: 1 addition & 0 deletions backend/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ require (
github.com/go-openapi/jsonpointer v0.20.2 // indirect
github.com/go-openapi/jsonreference v0.20.4 // indirect
github.com/go-openapi/swag v0.22.9 // indirect
github.com/go-redis/redis v6.15.9+incompatible // indirect
github.com/gobwas/glob v0.2.3 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/goccy/go-yaml v1.9.5 // indirect
Expand Down
2 changes: 2 additions & 0 deletions backend/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ github.com/go-playground/universal-translator v0.18.0/go.mod h1:UvRDBj+xPUEGrFYl
github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4=
github.com/go-playground/validator/v10 v10.10.0 h1:I7mrTYv78z8k8VXa/qJlOlEXn/nBh+BF8dHX5nt/dr0=
github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg=
github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI=
github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down
211 changes: 127 additions & 84 deletions backend/pkg/exporter/modules/execution_deposits_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"encoding/hex"
"fmt"
"math/big"
"sync"
"sync/atomic"
"time"

"github.com/attestantio/go-eth2-client/spec/phase0"
Expand All @@ -19,6 +19,7 @@ import (
gethtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethclient"
gethrpc "github.com/ethereum/go-ethereum/rpc"
"github.com/go-redis/redis/v8"
"golang.org/x/exp/maps"

"github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract"
Expand All @@ -38,37 +39,34 @@ import (

type executionDepositsExporter struct {
ModuleContext
Client rpc.Client
ErigonClient *gethrpc.Client
GethClient *gethrpc.Client
LogClient *ethclient.Client
LogFilterer *deposit_contract.DepositContractFilterer
DepositContractAddress common.Address
LastExportedBlock uint64
ExportMutex *sync.Mutex
StopEarlyMutex *sync.Mutex
StopEarly context.CancelFunc
Signer gethtypes.Signer
DepositMethod abi.Method
Client rpc.Client
ErigonClient *gethrpc.Client
GethClient *gethrpc.Client
LogClient *ethclient.Client
LogFilterer *deposit_contract.DepositContractFilterer
DepositContractAddress common.Address
LastExportedBlock uint64
LastExportedFinalizedBlock uint64
LastExportedFinalizedBlockRedisKey string
CurrentHeadBlock atomic.Uint64
Signer gethtypes.Signer
DepositMethod abi.Method
}

func NewExecutionDepositsExporter(moduleContext ModuleContext) ModuleInterface {
return &executionDepositsExporter{
ModuleContext: moduleContext,
Client: moduleContext.ConsClient,
DepositContractAddress: common.HexToAddress(utils.Config.Chain.ClConfig.DepositContractAddress),
LastExportedBlock: 0,
ExportMutex: &sync.Mutex{},
StopEarlyMutex: &sync.Mutex{},
ModuleContext: moduleContext,
Client: moduleContext.ConsClient,
DepositContractAddress: common.HexToAddress(utils.Config.Chain.ClConfig.DepositContractAddress),
LastExportedBlock: 0,
LastExportedFinalizedBlock: 0,
}
}

func (d *executionDepositsExporter) OnHead(event *constypes.StandardEventHeadResponse) (err error) {
return nil // nop
}

func (d *executionDepositsExporter) Init() error {
d.Signer = gethtypes.NewCancunSigner(big.NewInt(int64(utils.Config.Chain.ClConfig.DepositChainID)))
d.Signer = gethtypes.NewCancunSigner(big.NewInt(0).SetUint64(utils.Config.Chain.ClConfig.DepositChainID))

d.LastExportedFinalizedBlockRedisKey = fmt.Sprintf("%d:execution_deposits_exporter:last_exported_finalized_block", utils.Config.Chain.ClConfig.DepositChainID)

rpcClient, err := gethrpc.Dial(utils.Config.Eth1GethEndpoint)
if err != nil {
Expand Down Expand Up @@ -124,14 +122,29 @@ func (d *executionDepositsExporter) Init() error {
d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock
}

log.Infof("initialized execution deposits exporter with last exported block: %v", d.LastExportedBlock)
val, err := db.PersistentRedisDbClient.Get(context.Background(), d.LastExportedFinalizedBlockRedisKey).Uint64()
switch {
case err == redis.Nil:
log.Warnf("%v missing in redis, exporting from beginning", d.LastExportedFinalizedBlockRedisKey)
case err != nil:
log.Fatal(err, "error getting last exported finalized block from redis", 0)
}

d.LastExportedFinalizedBlock = val
// avoid fetching old bocks on a chain without deposits
if d.LastExportedFinalizedBlock > d.LastExportedBlock {
d.LastExportedBlock = d.LastExportedFinalizedBlock
}

log.Infof("initialized execution deposits exporter with last exported block/finalizedBlock: %v/%v", d.LastExportedBlock, d.LastExportedFinalizedBlock)

// quick kick-start
go func() {
err := d.OnFinalizedCheckpoint(nil)
// quick kick-start
err = d.OnHead(nil)
if err != nil {
log.Error(err, "error during kick-start", 0)
log.Error(err, "error kick-starting executionDepositsExporter", 0)
}
d.exportLoop()
}()

return nil
Expand All @@ -145,89 +158,115 @@ func (d *executionDepositsExporter) OnChainReorg(event *constypes.StandardEventC
return nil // nop
}

// can take however long it wants to run, is run in a separate goroutine, so no need to worry about blocking
func (d *executionDepositsExporter) OnFinalizedCheckpoint(event *constypes.StandardFinalizedCheckpointResponse) (err error) {
// important: have to fetch the actual finalized epoch because even tho its called on finalized checkpoint it actually emits for each justified epoch
// so we have to do an extra request to get the actual latest finalized epoch
res, err := d.CL.GetFinalityCheckpoints("head")
if err != nil {
return err
}
return nil // nop
}

func (d *executionDepositsExporter) OnHead(event *constypes.StandardEventHeadResponse) (err error) {
var nearestELBlock sql.NullInt64
err = db.ReaderDb.Get(&nearestELBlock, "select exec_block_number from blocks where slot <= $1 and exec_block_number > 0 order by slot desc limit 1", res.Data.Finalized.Epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch)
err = db.ReaderDb.Get(&nearestELBlock, "select exec_block_number from blocks where exec_block_number > 0 order by slot desc limit 1")
if err != nil {
return err
}
if !nearestELBlock.Valid {
return fmt.Errorf("no block found for finalized epoch %v", res.Data.Finalized.Epoch)
return fmt.Errorf("no block found for head slot %v", event.Slot)
}
log.Debugf("exporting execution layer deposits till block %v", nearestELBlock.Int64)
log.Debugf("setting nearestELBlock for execution_deposits_exporter: %v", nearestELBlock.Int64)
d.CurrentHeadBlock.Store(uint64(nearestELBlock.Int64))
return nil
}

err = d.exportTillBlock(uint64(nearestELBlock.Int64))
if err != nil {
return err
func (d *executionDepositsExporter) exportLoop() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for ; true; <-ticker.C {
currentHeadBlock := d.CurrentHeadBlock.Load()
if currentHeadBlock == 0 {
continue
}
if currentHeadBlock <= d.LastExportedBlock {
continue
}
err := d.exportTillBlock(currentHeadBlock)
if err != nil {
log.Error(err, fmt.Sprintf("error during export to currentHeadBlock %d", currentHeadBlock), 0)
}
}

return nil
}

// this is basically synchronous, each time it gets called it will kill the previous export and replace it with itself
func (d *executionDepositsExporter) exportTillBlock(block uint64) (err error) {
// following blocks if a previous function call is still waiting for an export to stop early
d.StopEarlyMutex.Lock()
if d.StopEarly != nil {
// this will run even if the previous export has already finished
// preventing this would require an overly complex solution
log.Debugf("asking potentially running export to stop early")
d.StopEarly()
}

// following blocks as long as the running export hasn't finished yet
d.ExportMutex.Lock()
ctx, cancel := context.WithCancel(context.Background())
d.StopEarly = cancel
// we have over taken and allow potentially newer function calls to signal us to stop early
d.StopEarlyMutex.Unlock()
// important: have to fetch the actual finalized epoch because even tho its called on finalized checkpoint it actually emits for each justified epoch
res, err := d.CL.GetFinalityCheckpoints("head")
if err != nil {
return fmt.Errorf("failed calling GetFinalityCheckpoints(head): %w", err)
}

// note: nextFinalizedSlot can actually be in the next epoch, but for this exporter we do not care about such details
nextFinalizedSlot := res.Data.Finalized.Epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch
nextFinalizedBlock, err := db.GetBlockNumber(nextFinalizedSlot)
if err != nil {
return fmt.Errorf("error getting next finalized block: %w", err)
}

blockOffset := d.LastExportedBlock + 1
// make sure to reexport every block since last exported finalized blocks to handle reorgs
if blockOffset > d.LastExportedFinalizedBlock {
blockOffset = d.LastExportedFinalizedBlock + 1
}
blockTarget := block

defer d.ExportMutex.Unlock()

log.Infof("exporting execution layer deposits from %v to %v", blockOffset, blockTarget)

depositsToSave := make([]*types.ELDeposit, 0)

depositsToSaveBatchSize := 10_000 // limit how much deposits we save in one go
blockBatchSize := uint64(10_000) // limit how much blocks we fetch until updating the redis-key
depositsToSave := make([]*types.ELDeposit, 0, depositsToSaveBatchSize)
for blockOffset < blockTarget {
tmpBlockTarget := blockOffset + 1000
if tmpBlockTarget > blockTarget {
tmpBlockTarget = blockTarget
depositsToSave = depositsToSave[:0]
blockBatchSizeEnd := blockOffset + blockBatchSize
tmpBlockTarget := blockOffset
for blockOffset < blockTarget && len(depositsToSave) <= depositsToSaveBatchSize && blockOffset < blockBatchSizeEnd {
tmpBlockTarget = blockOffset + 1000
if tmpBlockTarget > blockTarget {
tmpBlockTarget = blockTarget
}
log.Debugf("fetching deposits from %v to %v", blockOffset, tmpBlockTarget)
tmp, err := d.fetchDeposits(blockOffset, tmpBlockTarget)
if err != nil {
return err
}
depositsToSave = append(depositsToSave, tmp...)
blockOffset = tmpBlockTarget
}
log.Debugf("fetching deposits from %v to %v", blockOffset, tmpBlockTarget)
tmp, err := d.fetchDeposits(blockOffset, tmpBlockTarget)

log.Debugf("saving %v deposits", len(depositsToSave))
err = d.saveDeposits(depositsToSave)
if err != nil {
return err
}
depositsToSave = append(depositsToSave, tmp...)
blockOffset = tmpBlockTarget

select {
case <-ctx.Done(): // a newer function call has asked us to stop early
log.Warnf("stop early signal received, stopping export early")
blockTarget = tmpBlockTarget
default:
continue
d.LastExportedBlock = tmpBlockTarget

prevLastExportedFinalizedBlock := d.LastExportedFinalizedBlock
if nextFinalizedBlock > d.LastExportedBlock && d.LastExportedBlock > d.LastExportedFinalizedBlock {
d.LastExportedFinalizedBlock = d.LastExportedBlock
} else if nextFinalizedBlock > d.LastExportedFinalizedBlock {
d.LastExportedFinalizedBlock = nextFinalizedBlock
}
}

log.Debugf("saving %v deposits", len(depositsToSave))
err = d.saveDeposits(depositsToSave)
if err != nil {
return err
}
if len(depositsToSave) > 0 {
log.Infof("exported deposits for blocks %d-%d: %d", blockOffset, tmpBlockTarget, len(depositsToSave))
}

d.LastExportedBlock = blockTarget
// update redis to keep track of last exported finalized block persistently
if prevLastExportedFinalizedBlock != d.LastExportedFinalizedBlock {
log.Infof("updating %v: %v", d.LastExportedFinalizedBlockRedisKey, d.LastExportedFinalizedBlock)
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
err := db.PersistentRedisDbClient.Set(ctx, d.LastExportedFinalizedBlockRedisKey, d.LastExportedFinalizedBlock, 0).Err()
if err != nil {
log.Error(err, fmt.Sprintf("error setting redis %v = %v", d.LastExportedFinalizedBlockRedisKey, d.LastExportedFinalizedBlock), 0)
}
}
}

start := time.Now()
// update cached view
Expand Down Expand Up @@ -387,6 +426,10 @@ func (d *executionDepositsExporter) fetchDeposits(fromBlock, toBlock uint64) (de
}

func (d *executionDepositsExporter) saveDeposits(depositsToSave []*types.ELDeposit) error {
if len(depositsToSave) == 0 {
return nil
}

tx, err := db.WriterDb.Beginx()
if err != nil {
return err
Expand Down

0 comments on commit 0575c81

Please sign in to comment.