diff --git a/backend/pkg/exporter/modules/execution_deposits_exporter.go b/backend/pkg/exporter/modules/execution_deposits_exporter.go index 41fab0f25..ef7d2129f 100644 --- a/backend/pkg/exporter/modules/execution_deposits_exporter.go +++ b/backend/pkg/exporter/modules/execution_deposits_exporter.go @@ -8,7 +8,7 @@ import ( "encoding/hex" "fmt" "math/big" - "sync" + "sync/atomic" "time" "github.com/attestantio/go-eth2-client/spec/phase0" @@ -19,13 +19,16 @@ import ( gethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" gethrpc "github.com/ethereum/go-ethereum/rpc" + "github.com/go-redis/redis/v8" "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" "github.com/gobitfly/beaconchain/pkg/commons/contracts/deposit_contract" "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/metrics" "github.com/gobitfly/beaconchain/pkg/commons/rpc" + "github.com/gobitfly/beaconchain/pkg/commons/services" "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" @@ -38,37 +41,34 @@ import ( type executionDepositsExporter struct { ModuleContext - Client rpc.Client - ErigonClient *gethrpc.Client - GethClient *gethrpc.Client - LogClient *ethclient.Client - LogFilterer *deposit_contract.DepositContractFilterer - DepositContractAddress common.Address - LastExportedBlock uint64 - ExportMutex *sync.Mutex - StopEarlyMutex *sync.Mutex - StopEarly context.CancelFunc - Signer gethtypes.Signer - DepositMethod abi.Method + Client rpc.Client + ErigonClient *gethrpc.Client + GethClient *gethrpc.Client + LogClient *ethclient.Client + LogFilterer *deposit_contract.DepositContractFilterer + DepositContractAddress common.Address + LastExportedBlock uint64 + LastExportedFinalizedBlock uint64 + LastExportedFinalizedBlockRedisKey string + CurrentHeadBlock atomic.Uint64 + Signer gethtypes.Signer + DepositMethod abi.Method } func NewExecutionDepositsExporter(moduleContext ModuleContext) ModuleInterface { return &executionDepositsExporter{ - ModuleContext: moduleContext, - Client: moduleContext.ConsClient, - DepositContractAddress: common.HexToAddress(utils.Config.Chain.ClConfig.DepositContractAddress), - LastExportedBlock: 0, - ExportMutex: &sync.Mutex{}, - StopEarlyMutex: &sync.Mutex{}, + ModuleContext: moduleContext, + Client: moduleContext.ConsClient, + DepositContractAddress: common.HexToAddress(utils.Config.Chain.ClConfig.DepositContractAddress), + LastExportedBlock: 0, + LastExportedFinalizedBlock: 0, } } -func (d *executionDepositsExporter) OnHead(event *constypes.StandardEventHeadResponse) (err error) { - return nil // nop -} - func (d *executionDepositsExporter) Init() error { - d.Signer = gethtypes.NewCancunSigner(big.NewInt(int64(utils.Config.Chain.ClConfig.DepositChainID))) + d.Signer = gethtypes.NewCancunSigner(big.NewInt(0).SetUint64(utils.Config.Chain.ClConfig.DepositChainID)) + + d.LastExportedFinalizedBlockRedisKey = fmt.Sprintf("%d:execution_deposits_exporter:last_exported_finalized_block", utils.Config.Chain.ClConfig.DepositChainID) rpcClient, err := gethrpc.Dial(utils.Config.Eth1GethEndpoint) if err != nil { @@ -124,14 +124,29 @@ func (d *executionDepositsExporter) Init() error { d.LastExportedBlock = utils.Config.Indexer.ELDepositContractFirstBlock } - log.Infof("initialized execution deposits exporter with last exported block: %v", d.LastExportedBlock) + val, err := db.PersistentRedisDbClient.Get(context.Background(), d.LastExportedFinalizedBlockRedisKey).Uint64() + switch { + case err == redis.Nil: + log.Warnf("%v missing in redis, exporting from beginning", d.LastExportedFinalizedBlockRedisKey) + case err != nil: + log.Fatal(err, "error getting last exported finalized block from redis", 0) + } + + d.LastExportedFinalizedBlock = val + // avoid fetching old bocks on a chain without deposits + if d.LastExportedFinalizedBlock > d.LastExportedBlock { + d.LastExportedBlock = d.LastExportedFinalizedBlock + } + + log.Infof("initialized execution deposits exporter with last exported block/finalizedBlock: %v/%v", d.LastExportedBlock, d.LastExportedFinalizedBlock) - // quick kick-start go func() { - err := d.OnFinalizedCheckpoint(nil) + // quick kick-start + err = d.OnHead(nil) if err != nil { - log.Error(err, "error during kick-start", 0) + log.Error(err, "error kick-starting executionDepositsExporter", 0) } + d.exportLoop() }() return nil @@ -145,90 +160,119 @@ func (d *executionDepositsExporter) OnChainReorg(event *constypes.StandardEventC return nil // nop } -// can take however long it wants to run, is run in a separate goroutine, so no need to worry about blocking func (d *executionDepositsExporter) OnFinalizedCheckpoint(event *constypes.StandardFinalizedCheckpointResponse) (err error) { - // important: have to fetch the actual finalized epoch because even tho its called on finalized checkpoint it actually emits for each justified epoch - // so we have to do an extra request to get the actual latest finalized epoch - res, err := d.CL.GetFinalityCheckpoints("head") - if err != nil { - return err - } + return nil // nop +} - var nearestELBlock sql.NullInt64 - err = db.ReaderDb.Get(&nearestELBlock, "select exec_block_number from blocks where slot <= $1 and exec_block_number > 0 order by slot desc limit 1", res.Data.Finalized.Epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch) - if err != nil { - return err - } - if !nearestELBlock.Valid { - return fmt.Errorf("no block found for finalized epoch %v", res.Data.Finalized.Epoch) +func (d *executionDepositsExporter) OnHead(event *constypes.StandardEventHeadResponse) (err error) { + return nil // nop +} + +func (d *executionDepositsExporter) exportLoop() { + ticker := time.NewTicker(time.Second * 10) + defer ticker.Stop() + for ; true; <-ticker.C { + err := d.export() + if err != nil { + log.Error(err, "error during export", 0) + services.ReportStatus("execution_deposits_exporter", err.Error(), nil) + } else { + services.ReportStatus("execution_deposits_exporter", "Running", nil) + } } - log.Debugf("exporting execution layer deposits till block %v", nearestELBlock.Int64) +} - err = d.exportTillBlock(uint64(nearestELBlock.Int64)) +func (d *executionDepositsExporter) export() (err error) { + var headBlock, finBlock uint64 + var g errgroup.Group + g.Go(func() error { + headSlot, err := d.CL.GetSlot("head") + if err != nil { + return fmt.Errorf("error getting head-slot: %w", err) + } + headBlock = headSlot.Data.Message.Body.ExecutionPayload.BlockNumber + return nil + }) + g.Go(func() error { + finSlot, err := d.CL.GetSlot("finalized") + if err != nil { + return fmt.Errorf("error getting finalized-slot: %w", err) + } + finBlock = finSlot.Data.Message.Body.ExecutionPayload.BlockNumber + return nil + }) + err = g.Wait() if err != nil { return err } - return nil -} - -// this is basically synchronous, each time it gets called it will kill the previous export and replace it with itself -func (d *executionDepositsExporter) exportTillBlock(block uint64) (err error) { - // following blocks if a previous function call is still waiting for an export to stop early - d.StopEarlyMutex.Lock() - if d.StopEarly != nil { - // this will run even if the previous export has already finished - // preventing this would require an overly complex solution - log.Debugf("asking potentially running export to stop early") - d.StopEarly() + if d.LastExportedBlock >= headBlock && d.LastExportedFinalizedBlock >= finBlock { + log.Debugf("skip exporting execution layer deposits: last exported block/finalizedBlock: %v/%v, headBlock/finalizedBlock: %v/%v", d.LastExportedBlock, d.LastExportedFinalizedBlock, headBlock, finBlock) + return nil } - // following blocks as long as the running export hasn't finished yet - d.ExportMutex.Lock() - ctx, cancel := context.WithCancel(context.Background()) - d.StopEarly = cancel - // we have over taken and allow potentially newer function calls to signal us to stop early - d.StopEarlyMutex.Unlock() + nextFinalizedBlock := finBlock blockOffset := d.LastExportedBlock + 1 - blockTarget := block - - defer d.ExportMutex.Unlock() - - log.Infof("exporting execution layer deposits from %v to %v", blockOffset, blockTarget) - - depositsToSave := make([]*types.ELDeposit, 0) - + // make sure to reexport every block since last exported finalized blocks to handle reorgs + if blockOffset > d.LastExportedFinalizedBlock { + blockOffset = d.LastExportedFinalizedBlock + 1 + } + blockTarget := headBlock + + log.InfoWithFields(log.Fields{ + "nextHeadBlock": headBlock, + "nextFinBlock": nextFinalizedBlock, + "lastHeadBlock": d.LastExportedBlock, + "lastFinBlock": d.LastExportedFinalizedBlock, + }, fmt.Sprintf("exporting execution layer deposits from %d to %d", blockOffset, blockTarget)) + + depositsToSaveBatchSize := 10_000 // limit how much deposits we save in one go + blockBatchSize := uint64(10_000) // limit how much blocks we fetch until updating the redis-key + depositsToSave := make([]*types.ELDeposit, 0, depositsToSaveBatchSize) for blockOffset < blockTarget { - tmpBlockTarget := blockOffset + 1000 - if tmpBlockTarget > blockTarget { - tmpBlockTarget = blockTarget + depositsToSave = depositsToSave[:0] + blockBatchStart := blockOffset + for blockOffset < blockTarget && len(depositsToSave) <= depositsToSaveBatchSize && blockOffset < blockBatchStart+blockBatchSize { + tmpBlockTarget := blockOffset + 1000 + if tmpBlockTarget > blockTarget { + tmpBlockTarget = blockTarget + } + log.Debugf("fetching deposits from %v to %v", blockOffset, tmpBlockTarget) + tmp, err := d.fetchDeposits(blockOffset, tmpBlockTarget) + if err != nil { + return err + } + depositsToSave = append(depositsToSave, tmp...) + blockOffset = tmpBlockTarget } - log.Debugf("fetching deposits from %v to %v", blockOffset, tmpBlockTarget) - tmp, err := d.fetchDeposits(blockOffset, tmpBlockTarget) + + log.Debugf("saving %v deposits", len(depositsToSave)) + err = d.saveDeposits(depositsToSave) if err != nil { return err } - depositsToSave = append(depositsToSave, tmp...) - blockOffset = tmpBlockTarget - - select { - case <-ctx.Done(): // a newer function call has asked us to stop early - log.Warnf("stop early signal received, stopping export early") - blockTarget = tmpBlockTarget - default: - continue + d.LastExportedBlock = blockOffset + + prevLastExportedFinalizedBlock := d.LastExportedFinalizedBlock + if nextFinalizedBlock > d.LastExportedBlock && d.LastExportedBlock > d.LastExportedFinalizedBlock { + d.LastExportedFinalizedBlock = d.LastExportedBlock + } else if nextFinalizedBlock > d.LastExportedFinalizedBlock { + d.LastExportedFinalizedBlock = nextFinalizedBlock } - } - log.Debugf("saving %v deposits", len(depositsToSave)) - err = d.saveDeposits(depositsToSave) - if err != nil { - return err + // update redis to keep track of last exported finalized block persistently + if prevLastExportedFinalizedBlock != d.LastExportedFinalizedBlock { + log.Infof("updating %v: %v", d.LastExportedFinalizedBlockRedisKey, d.LastExportedFinalizedBlock) + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + err := db.PersistentRedisDbClient.Set(ctx, d.LastExportedFinalizedBlockRedisKey, d.LastExportedFinalizedBlock, 0).Err() + if err != nil { + log.Error(err, fmt.Sprintf("error setting redis %v = %v", d.LastExportedFinalizedBlockRedisKey, d.LastExportedFinalizedBlock), 0) + } + } } - d.LastExportedBlock = blockTarget - start := time.Now() // update cached view err = d.updateCachedView() @@ -271,8 +315,8 @@ func (d *executionDepositsExporter) fetchDeposits(fromBlock, toBlock uint64) (de return nil, fmt.Errorf("nil deposit-log") } - depositLog := - depositLogIterator.Event + depositLog := depositLogIterator.Event + err = utils.VerifyDepositSignature(&phase0.DepositData{ PublicKey: phase0.BLSPubKey(depositLog.Pubkey), WithdrawalCredentials: depositLog.WithdrawalCredentials, @@ -387,6 +431,10 @@ func (d *executionDepositsExporter) fetchDeposits(fromBlock, toBlock uint64) (de } func (d *executionDepositsExporter) saveDeposits(depositsToSave []*types.ELDeposit) error { + if len(depositsToSave) == 0 { + return nil + } + tx, err := db.WriterDb.Beginx() if err != nil { return err