Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 1115/faster el rewards #1246

Merged
merged 9 commits into from
Jan 9, 2025
1 change: 1 addition & 0 deletions backend/cmd/exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ func Run() {
modules.NewSlotExporter(context),
modules.NewExecutionDepositsExporter(context),
modules.NewExecutionPayloadsExporter(context),
modules.NewExecutionRewardFinalizer(context),
)
}

Expand Down
65 changes: 11 additions & 54 deletions backend/pkg/api/data_access/vdb_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,21 +100,11 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da
elDs := goqu.Dialect("postgres").
Select(
goqu.L("b.epoch"),
goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")).
goqu.COALESCE(goqu.SUM(goqu.I("value")), 0).As("el_rewards")).
From(goqu.L("users_val_dashboards_validators v")).
Where(goqu.L("b.epoch >= ?", startEpoch)).
LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("v.validator_index = b.proposer AND b.status = '1'"))).
LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
From("relays_blocks").
Select(
goqu.L("exec_block_hash"),
goqu.MAX("value").As("value")).
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
)
LeftJoin(goqu.I("execution_rewards_finalized").As("b"), goqu.On(goqu.L("v.validator_index = b.proposer"))).
GroupBy(goqu.L("b.epoch"))

if dashboardId.Validators == nil {
rewardsDs = rewardsDs.
Expand Down Expand Up @@ -557,20 +547,9 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex

elDs := goqu.Dialect("postgres").
Select(
goqu.L("COALESCE(SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)), 0) AS blocks_el_reward")).
goqu.COALESCE(goqu.SUM(goqu.I("value")), 0).As("blocks_el_rewards")).
From(goqu.L("users_val_dashboards_validators v")).
LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("v.validator_index = b.proposer AND b.status = '1'"))).
LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
From("relays_blocks").
Select(
goqu.L("exec_block_hash"),
goqu.MAX("value").As("value")).
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
).
LeftJoin(goqu.I("execution_rewards_finalized").As("b"), goqu.On(goqu.L("v.validator_index = b.proposer"))).
Where(goqu.L("b.epoch = ?", epoch))

// handle the case when we have a list of validators
Expand Down Expand Up @@ -733,21 +712,11 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex
elDs := goqu.Dialect("postgres").
Select(
goqu.L("b.epoch"),
goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")).
goqu.COALESCE(goqu.SUM(goqu.I("value")), 0).As("el_rewards")).
From(goqu.L("users_val_dashboards_validators v")).
LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("v.validator_index = b.proposer AND b.status = '1'"))).
LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
From("relays_blocks").
Select(
goqu.L("exec_block_hash"),
goqu.MAX("value").As("value")).
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
).
Where(goqu.L("b.epoch >= ?", startEpoch))
Where(goqu.L("b.epoch >= ?", startEpoch)).
LeftJoin(goqu.I("execution_rewards_finalized").As("b"), goqu.On(goqu.L("v.validator_index = b.proposer"))).
GroupBy(goqu.L("b.epoch"))

if dashboardId.Validators == nil {
rewardsDs = rewardsDs.
Expand Down Expand Up @@ -986,21 +955,9 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das
elDs := goqu.Dialect("postgres").
Select(
goqu.L("b.proposer"),
goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")).
From(goqu.L("blocks b")).
LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
From("relays_blocks").
Select(
goqu.L("exec_block_hash"),
goqu.MAX("value").As("value")).
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
).
goqu.COALESCE(goqu.SUM(goqu.I("value")), 0).As("el_rewards")).
From(goqu.I("execution_rewards_finalized").As("b")).
Where(goqu.L("b.epoch = ?", epoch)).
Where(goqu.L("b.status = '1'")).
GroupBy(goqu.L("b.proposer"))

// ------------------------------------------------------------------------------------------------------------------
Expand Down
33 changes: 5 additions & 28 deletions backend/pkg/api/data_access/vdb_summary.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,20 +195,9 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da
elRewards := make(map[int64]decimal.Decimal)
ds = goqu.Dialect("postgres").
Select(
goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")).
From(goqu.L("blocks b")).
LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
From("relays_blocks").
Select(
goqu.L("exec_block_hash"),
goqu.MAX("value").As("value")).
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
).
Where(goqu.L("b.epoch >= ? AND b.epoch <= ? AND b.status = '1'", epochMin, epochMax)).
goqu.COALESCE(goqu.SUM(goqu.I("value")), 0).As("el_rewards")).
From(goqu.I("execution_rewards_finalized").As("b")).
Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", epochMin, epochMax)).
GroupBy(goqu.L("result_group_id"))

if len(validators) > 0 {
Expand Down Expand Up @@ -929,20 +918,8 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId
}

elDs := goqu.Dialect("postgres").
Select(goqu.L("COALESCE(SUM(COALESCE(rb.value / 1e18, fee_recipient_reward)), 0) AS el_reward")).
From(goqu.L("blocks AS b")).
LeftJoin(goqu.L("execution_payloads AS ep"), goqu.On(goqu.L("b.exec_block_hash = ep.block_hash"))).
LeftJoin(
goqu.Lateral(goqu.Dialect("postgres").
From("relays_blocks").
Select(
goqu.L("exec_block_hash"),
goqu.MAX("value").As("value")).
Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")).
GroupBy("exec_block_hash")).As("rb"),
goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")),
).
Where(goqu.L("b.status = '1'"))
Select(goqu.COALESCE(goqu.SUM(goqu.L("value / 1e18")), 0)).
From(goqu.I("execution_rewards_finalized").As("b"))

if len(dashboardId.Validators) > 0 {
elDs = elDs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ func (s *Services) startEfficiencyDataService(wg *sync.WaitGroup) {
startTime := time.Now()
delay := time.Duration(utils.Config.Chain.ClConfig.SlotsPerEpoch*utils.Config.Chain.ClConfig.SecondsPerSlot) * time.Second
r := services.NewStatusReport("api_service_avg_efficiency", constants.Default, delay)
err := s.updateEfficiencyData() // TODO: only update data if something has changed (new head epoch)
r(constants.Running, nil)
err := s.updateEfficiencyData() // TODO: only update data if something has changed (new head epoch)
if err != nil {
log.Error(err, "error updating average network efficiency data", 0)
r(constants.Failure, map[string]string{"error": err.Error()})
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
-- +goose Up
-- +goose StatementBegin
CREATE TABLE execution_rewards_finalized (
epoch int4 NOT NULL,
slot int4 NOT NULL,
proposer int4 NOT NULL,
value numeric NOT NULL,
CONSTRAINT finalized_execution_rewards_pk PRIMARY KEY (slot)
);
-- +goose StatementEnd
-- +goose StatementBegin
CREATE INDEX finalized_execution_rewards_epoch_idx ON execution_rewards_finalized USING btree (epoch);
-- +goose StatementEnd
-- +goose StatementBegin
CREATE UNIQUE INDEX finalized_execution_rewards_proposer_idx ON execution_rewards_finalized USING btree (proposer, slot);
-- +goose StatementEnd

-- +goose Down
-- +goose StatementBegin
DROP TABLE execution_rewards_finalized;
-- +goose StatementEnd
53 changes: 4 additions & 49 deletions backend/pkg/exporter/modules/execution_payloads_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"math/big"
"sync"
"time"

"github.com/gobitfly/beaconchain/pkg/commons/db"
"github.com/gobitfly/beaconchain/pkg/commons/log"
Expand All @@ -18,16 +17,14 @@ import (
)

type executionPayloadsExporter struct {
ModuleContext ModuleContext
ExportMutex *sync.Mutex
CachedViewMutex *sync.Mutex
ModuleContext ModuleContext
ExportMutex *sync.Mutex
}

func NewExecutionPayloadsExporter(moduleContext ModuleContext) ModuleInterface {
return &executionPayloadsExporter{
ModuleContext: moduleContext,
ExportMutex: &sync.Mutex{},
CachedViewMutex: &sync.Mutex{},
ModuleContext: moduleContext,
ExportMutex: &sync.Mutex{},
}
}

Expand Down Expand Up @@ -59,51 +56,9 @@ func (d *executionPayloadsExporter) OnChainReorg(event *constypes.StandardEventC

// can take however long it wants to run, is run in a separate goroutine, so no need to worry about blocking
func (d *executionPayloadsExporter) OnFinalizedCheckpoint(event *constypes.StandardFinalizedCheckpointResponse) (err error) {
// if mutex is locked, return early
if !d.CachedViewMutex.TryLock() {
log.Infof("execution payloads exporter is already running")
return nil
}
defer d.CachedViewMutex.Unlock()

start := time.Now()
// update cached view
err = d.updateCachedView()
if err != nil {
return err
}

log.Infof("updating execution payloads cached view took %v", time.Since(start))
return nil
}

func (d *executionPayloadsExporter) updateCachedView() (err error) {
err = db.CacheQuery(`
SELECT DISTINCT ON (uvdv.dashboard_id, uvdv.group_id, b.slot)
uvdv.dashboard_id,
uvdv.group_id,
b.slot,
coalesce(cp.cl_attestations_reward / 1e9, 0) + coalesce(cp.cl_sync_aggregate_reward / 1e9, 0) + coalesce(cp.cl_slashing_inclusion_reward / 1e9, 0) + coalesce(rb.value / 1e18, ep.fee_recipient_reward) as reward,
coalesce(rb.proposer_fee_recipient, b.exec_fee_recipient) as fee_recipient,
rb.value IS NOT NULL AS is_mev
FROM
blocks b
INNER JOIN execution_payloads ep ON ep.block_hash = b.exec_block_hash
INNER JOIN consensus_payloads cp ON cp.slot = b.slot
INNER JOIN users_val_dashboards_validators uvdv ON b.proposer = uvdv.validator_index
LEFT JOIN relays_blocks rb ON rb.exec_block_hash = b.exec_block_hash
WHERE
b.status = '1'
AND b.exec_block_hash IS NOT NULL AND ep.fee_recipient_reward IS NOT NULL
ORDER BY
dashboard_id,
group_id,
slot DESC,
rb.value DESC;
`, "cached_proposal_rewards", []string{"dashboard_id", "slot"}, []string{"dashboard_id", "reward"}, []string{"dashboard_id"})
return err
}

func (d *executionPayloadsExporter) maintainTable() (err error) {
blocks := struct {
MinBlock sql.NullInt64 `db:"min"`
Expand Down
Loading
Loading