From 989033f7aa552d0686a38653d0ab8b58dc2982fc Mon Sep 17 00:00:00 2001 From: Stefan Pletka <124689083+Eisei24@users.noreply.github.com> Date: Wed, 11 Sep 2024 17:28:45 +0200 Subject: [PATCH] feat(BEDS-151): Implement val dashboard RPL protocol mode --- backend/pkg/api/data_access/mobile.go | 10 +- backend/pkg/api/data_access/vdb_blocks.go | 52 +- backend/pkg/api/data_access/vdb_helpers.go | 136 ++++ backend/pkg/api/data_access/vdb_management.go | 69 +- backend/pkg/api/data_access/vdb_rewards.go | 660 +++++++++++++----- backend/pkg/api/data_access/vdb_summary.go | 351 +++++++--- .../pkg/api/data_access/vdb_withdrawals.go | 185 +++-- backend/pkg/api/types/archiver.go | 4 +- backend/pkg/api/types/rocketpool.go | 12 + backend/pkg/api/types/validator_dashboard.go | 5 - frontend/types/api/validator_dashboard.ts | 4 - 11 files changed, 1077 insertions(+), 411 deletions(-) diff --git a/backend/pkg/api/data_access/mobile.go b/backend/pkg/api/data_access/mobile.go index 89b2b7b03..1acc486a3 100644 --- a/backend/pkg/api/data_access/mobile.go +++ b/backend/pkg/api/data_access/mobile.go @@ -177,6 +177,12 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex data.NetworkEfficiency = utils.CalculateTotalEfficiency( efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime]) + protocolModes := t.VDBProtocolModes{RocketPool: true} + rpInfos, err := d.getRocketPoolInfos(ctx, wrappedDashboardId, t.AllGroups) + if err != nil { + return nil, fmt.Errorf("error retrieving rocketpool infos: %w", err) + } + // Validator status eg.Go(func() error { validatorMapping, err := d.services.GetCurrentValidatorMapping() @@ -262,7 +268,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex retrieveApr := func(hours int, apr *float64) { eg.Go(func() error { - _, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours) + _, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours) if err != nil { return err } @@ -273,7 +279,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex retrieveRewards := func(hours int, rewards *decimal.Decimal) { eg.Go(func() error { - clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours) + clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours) if err != nil { return err } diff --git a/backend/pkg/api/data_access/vdb_blocks.go b/backend/pkg/api/data_access/vdb_blocks.go index 4616991f3..d27baa5a2 100644 --- a/backend/pkg/api/data_access/vdb_blocks.go +++ b/backend/pkg/api/data_access/vdb_blocks.go @@ -24,8 +24,6 @@ import ( ) func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBBlocksColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBBlocksTableRow, *t.Paging, error) { - // @DATA-ACCESS incorporate protocolModes - // ------------------------------------- // Setup var err error @@ -42,6 +40,15 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das } } + // Get the rocketpool minipool infos + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, nil, err + } + } + searchPubkey := regexp.MustCompile(`^0x[0-9a-fA-F]{96}$`).MatchString(search) searchGroup := regexp.MustCompile(`^[a-zA-Z0-9_\-.\ ]+$`).MatchString(search) searchIndex := regexp.MustCompile(`^[0-9]+$`).MatchString(search) @@ -175,6 +182,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das goqu.COALESCE(goqu.L("rb.value / 1e18"), goqu.I("ep.fee_recipient_reward")).As("el_reward"), ) + if rpInfos != nil && protocolModes.RocketPool { + blocksDs = blocksDs. + SelectAppend(goqu.L("(blocks.exec_fee_recipient = ? AND (rb.proposer_fee_recipient IS NULL OR rb.proposer_fee_recipient = ?)) AS is_smoothing_pool", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } + // 3. Sorting and pagination defaultColumns := []t.SortColumn{ {Column: enums.VDBBlocksColumns.Slot.ToExpr(), Desc: true, Offset: currentCursor.Slot}, @@ -272,6 +284,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das goqu.L("NULL::NUMERIC").As("el_reward"), ) + if rpInfos != nil && protocolModes.RocketPool { + scheduledDs = scheduledDs. + SelectAppend(goqu.L("NULL::BOOL").As("is_smoothing_pool")) + } + // We don't have access to exec_block_number and status for a WHERE without wrapping the query so if we sort by those get all the data if colSort.Column == enums.VDBBlocksColumns.Proposer || colSort.Column == enums.VDBBlocksColumns.Slot { scheduledDs = scheduledDs. @@ -307,16 +324,17 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das // ------------------------------------- // Execute query var proposals []struct { - Proposer t.VDBValidator `db:"validator_index"` - Group uint64 `db:"group_id"` - Epoch uint64 `db:"epoch"` - Slot uint64 `db:"slot"` - Status uint64 `db:"status"` - Block sql.NullInt64 `db:"exec_block_number"` - FeeRecipient []byte `db:"fee_recipient"` - ElReward decimal.NullDecimal `db:"el_reward"` - ClReward decimal.NullDecimal `db:"cl_reward"` - GraffitiText sql.NullString `db:"graffiti_text"` + Proposer t.VDBValidator `db:"validator_index"` + Group uint64 `db:"group_id"` + Epoch uint64 `db:"epoch"` + Slot uint64 `db:"slot"` + Status uint64 `db:"status"` + Block sql.NullInt64 `db:"exec_block_number"` + FeeRecipient []byte `db:"fee_recipient"` + ElReward decimal.NullDecimal `db:"el_reward"` + ClReward decimal.NullDecimal `db:"cl_reward"` + GraffitiText sql.NullString `db:"graffiti_text"` + IsSmoothingPool sql.NullBool `db:"is_smoothing_pool"` // for cursor only Reward decimal.Decimal @@ -444,9 +462,19 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das TraceIdx: -1, }) reward.El = proposal.ElReward.Decimal.Mul(decimal.NewFromInt(1e18)) + if rpInfos != nil && protocolModes.RocketPool && !(proposal.IsSmoothingPool.Valid && proposal.IsSmoothingPool.Bool) { + if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok { + reward.El = reward.El.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } } if clReward, ok := clRewards[proposal.Slot]; ok && clReward.Valid { reward.Cl = clReward.Decimal.Mul(decimal.NewFromInt(1e18)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok { + reward.Cl = reward.Cl.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } } proposals[i].Reward = proposal.ElReward.Decimal.Add(proposal.ClReward.Decimal) data[i].Reward = &reward diff --git a/backend/pkg/api/data_access/vdb_helpers.go b/backend/pkg/api/data_access/vdb_helpers.go index 30fd72f61..b336059c6 100644 --- a/backend/pkg/api/data_access/vdb_helpers.go +++ b/backend/pkg/api/data_access/vdb_helpers.go @@ -7,12 +7,15 @@ import ( "time" "github.com/doug-martin/goqu/v9" + "github.com/ethereum/go-ethereum/common/hexutil" "github.com/gobitfly/beaconchain/pkg/api/enums" t "github.com/gobitfly/beaconchain/pkg/api/types" "github.com/gobitfly/beaconchain/pkg/commons/cache" "github.com/gobitfly/beaconchain/pkg/commons/utils" "github.com/lib/pq" "github.com/pkg/errors" + "github.com/shopspring/decimal" + "golang.org/x/sync/errgroup" ) //////////////////// Helper functions (must be used by more than one VDB endpoint!) @@ -130,3 +133,136 @@ func (d *DataAccessService) getTimeToNextWithdrawal(distance uint64) time.Time { return timeToWithdrawal } + +func (d *DataAccessService) getRocketPoolInfos(ctx context.Context, dashboardId t.VDBId, groupId int64) (*t.RPInfo, error) { + wg := errgroup.Group{} + + queryResult := []struct { + ValidatorIndex uint64 `db:"validatorindex"` + NodeAddress []byte `db:"node_address"` + NodeFee float64 `db:"node_fee"` + NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"` + UserDepositBalance decimal.Decimal `db:"user_deposit_balance"` + EndTime sql.NullTime `db:"end_time"` + SmoothingPoolAddress []byte `db:"smoothing_pool_address"` + SmoothingPoolEth *decimal.Decimal `db:"smoothing_pool_eth"` + }{} + + wg.Go(func() error { + ds := goqu.Dialect("postgres"). + Select( + goqu.L("v.validatorindex"), + goqu.L("rplm.node_address"), + goqu.L("rplm.node_fee"), + goqu.L("rplm.node_deposit_balance"), + goqu.L("rplm.user_deposit_balance"), + goqu.L("rplrs.end_time"), + goqu.L("rploc.smoothing_pool_address"), + goqu.L("rplrs.smoothing_pool_eth"), + ). + From(goqu.L("rocketpool_minipools AS rplm")). + LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))). + LeftJoin(goqu.L("rocketpool_rewards_summary AS rplrs"), goqu.On(goqu.L("rplm.node_address = rplrs.node_address"))). + LeftJoin(goqu.L("rocketpool_onchain_configs AS rploc"), goqu.On(goqu.L("rplm.rocketpool_storage_address = rploc.rocketpool_storage_address"))). + Where(goqu.L("rplm.node_deposit_balance IS NOT NULL")). + Where(goqu.L("rplm.user_deposit_balance IS NOT NULL")) + + if len(dashboardId.Validators) == 0 { + ds = ds. + LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))). + Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id)) + + if groupId != t.AllGroups { + ds = ds. + Where(goqu.L("uvdv.group_id = ?", groupId)) + } + } else { + ds = ds. + Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators))) + } + + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return fmt.Errorf("error preparing query: %w", err) + } + + err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) + if err != nil { + return fmt.Errorf("error retrieving rocketpool validators data: %w", err) + } + + return nil + }) + + nodeMinipoolCount := make(map[string]uint64) + wg.Go(func() error { + queryResult := []struct { + NodeAddress []byte `db:"node_address"` + MinipoolCount uint64 `db:"minipool_count"` + }{} + + err := d.alloyReader.SelectContext(ctx, &queryResult, ` + SELECT + node_address, + COUNT(node_address) AS minipool_count + FROM rocketpool_minipools + GROUP BY node_address`) + if err != nil { + return fmt.Errorf("error retrieving rocketpool node deposits data: %w", err) + } + + for _, res := range queryResult { + node := hexutil.Encode(res.NodeAddress) + nodeMinipoolCount[node] = res.MinipoolCount + } + + return nil + }) + + err := wg.Wait() + if err != nil { + return nil, err + } + + if len(queryResult) == 0 { + return nil, nil + } + + rpInfo := t.RPInfo{ + Minipool: make(map[uint64]t.RPMinipoolInfo), + // Smoothing pool address is the same for all nodes on the network so take the first result + SmoothingPoolAddress: queryResult[0].SmoothingPoolAddress, + } + + for _, res := range queryResult { + if _, ok := rpInfo.Minipool[res.ValidatorIndex]; !ok { + rpInfo.Minipool[res.ValidatorIndex] = t.RPMinipoolInfo{ + NodeFee: res.NodeFee, + NodeDepositBalance: res.NodeDepositBalance, + UserDepositBalance: res.UserDepositBalance, + SmoothingPoolRewards: make(map[uint64]decimal.Decimal), + } + } + + node := hexutil.Encode(res.NodeAddress) + if res.EndTime.Valid && res.SmoothingPoolEth != nil { + epoch := uint64(utils.TimeToEpoch(res.EndTime.Time)) + splitReward := res.SmoothingPoolEth.Div(decimal.NewFromUint64(nodeMinipoolCount[node])) + rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch] = + rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch].Add(splitReward) + } + } + + return &rpInfo, nil +} + +func (d *DataAccessService) getRocketPoolOperatorFactor(minipool t.RPMinipoolInfo) decimal.Decimal { + fullDeposit := minipool.UserDepositBalance.Add(minipool.NodeDepositBalance) + operatorShare := minipool.NodeDepositBalance.Div(fullDeposit) + invOperatorShare := decimal.NewFromInt(1).Sub(operatorShare) + + commissionReward := invOperatorShare.Mul(decimal.NewFromFloat(minipool.NodeFee)) + operatorFactor := operatorShare.Add(commissionReward) + + return operatorFactor +} diff --git a/backend/pkg/api/data_access/vdb_management.go b/backend/pkg/api/data_access/vdb_management.go index db4037818..a50ab703f 100644 --- a/backend/pkg/api/data_access/vdb_management.go +++ b/backend/pkg/api/data_access/vdb_management.go @@ -330,6 +330,11 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d }) } + rpInfos, err := d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, fmt.Errorf("error retrieving rocketpool validators: %w", err) + } + // Validator status and balance eg.Go(func() error { validatorMapping, err := d.services.GetCurrentValidatorMapping() @@ -346,10 +351,13 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d data.Groups = append(data.Groups, t.VDBOverviewGroup{Id: t.DefaultGroupId, Name: t.DefaultGroupName, Count: uint64(len(validators))}) } - // Status + // Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators + var nonRpDashboardId t.VDBId + for _, validator := range validators { metadata := validatorMapping.ValidatorMetadata[validator] + // Status switch constypes.ValidatorDbStatus(metadata.Status) { case constypes.DbExitingOnline, constypes.DbSlashingOnline, constypes.DbActiveOnline: data.Validators.Online++ @@ -362,61 +370,16 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d case constypes.DbExited: data.Validators.Exited++ } - } - - // Find rocketpool validators - type RpOperatorInfo struct { - ValidatorIndex uint64 `db:"validatorindex"` - NodeFee float64 `db:"node_fee"` - NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"` - UserDepositBalance decimal.Decimal `db:"user_deposit_balance"` - } - var queryResult []RpOperatorInfo - - ds := goqu.Dialect("postgres"). - Select( - goqu.L("v.validatorindex"), - goqu.L("rplm.node_fee"), - goqu.L("rplm.node_deposit_balance"), - goqu.L("rplm.user_deposit_balance")). - From(goqu.L("rocketpool_minipools AS rplm")). - LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))). - Where(goqu.L("node_deposit_balance IS NOT NULL")). - Where(goqu.L("user_deposit_balance IS NOT NULL")) - - if len(dashboardId.Validators) == 0 { - ds = ds. - LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))). - Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id)) - } else { - ds = ds. - Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators))) - } - - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %w", err) - } - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) - if err != nil { - return fmt.Errorf("error retrieving rocketpool validators data: %w", err) - } - - rpValidators := make(map[uint64]RpOperatorInfo) - for _, res := range queryResult { - rpValidators[res.ValidatorIndex] = res - } - - // Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators - var nonRpDashboardId t.VDBId - - for _, validator := range validators { - metadata := validatorMapping.ValidatorMetadata[validator] + // Balance validatorBalance := utils.GWeiToWei(big.NewInt(int64(metadata.Balance))) effectiveBalance := utils.GWeiToWei(big.NewInt(int64(metadata.EffectiveBalance))) - if rpValidator, ok := rpValidators[validator]; ok { + if rpInfos == nil { + data.Balances.Total = data.Balances.Total.Add(validatorBalance) + + nonRpDashboardId.Validators = append(nonRpDashboardId.Validators, validator) + } else if rpValidator, ok := rpInfos.Minipool[validator]; ok { if protocolModes.RocketPool { // Calculate the balance of the operator fullDeposit := rpValidator.UserDepositBalance.Add(rpValidator.NodeDepositBalance) @@ -457,7 +420,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d retrieveRewardsAndEfficiency := func(table string, hours int, rewards *t.ClElValue[decimal.Decimal], apr *t.ClElValue[float64], efficiency *float64) { // Rewards + APR eg.Go(func() error { - (*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, -1, hours) + (*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, t.AllGroups, protocolModes, rpInfos, hours) if err != nil { return err } diff --git a/backend/pkg/api/data_access/vdb_rewards.go b/backend/pkg/api/data_access/vdb_rewards.go index 1221f9596..134143691 100644 --- a/backend/pkg/api/data_access/vdb_rewards.go +++ b/backend/pkg/api/data_access/vdb_rewards.go @@ -2,7 +2,6 @@ package dataaccess import ( "context" - "database/sql" "fmt" "math/big" "slices" @@ -17,13 +16,11 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/cache" "github.com/gobitfly/beaconchain/pkg/commons/utils" "github.com/lib/pq" - "github.com/pkg/errors" "github.com/shopspring/decimal" "golang.org/x/sync/errgroup" ) func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBRewardsColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBRewardsTableRow, *t.Paging, error) { - // @DATA-ACCESS incorporate protocolModes result := make([]t.VDBRewardsTableRow, 0) var paging t.Paging @@ -53,11 +50,11 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da if strings.HasPrefix(search, "0x") && utils.IsHash(search) { search = strings.ToLower(search) - // Get the current validator state to convert pubkey to index validatorMapping, err := d.services.GetCurrentValidatorMapping() if err != nil { return nil, nil, err } + if index, ok := validatorMapping.ValidatorIndices[search]; ok { indexSearch = int64(index) } else { @@ -80,26 +77,38 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da groupIdSearchMap := make(map[uint64]bool, 0) // ------------------------------------------------------------------------------------------------------------------ - // Build the query that serves as base for both the main and EL rewards queries + // Get rocketpool minipool infos if needed + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, nil, err + } + } + + // ------------------------------------------------------------------------------------------------------------------ + // Build the main and EL rewards queries rewardsDs := goqu.Dialect("postgres"). From(goqu.L("validator_dashboard_data_epoch e")). With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( goqu.L("e.epoch"), - goqu.L(`SUM(COALESCE(e.attestations_reward, 0) + COALESCE(e.blocks_cl_reward, 0) + COALESCE(e.sync_rewards, 0)) AS cl_rewards`), - goqu.L("SUM(COALESCE(e.attestations_scheduled, 0)) AS attestations_scheduled"), - goqu.L("SUM(COALESCE(e.attestations_executed, 0)) AS attestations_executed"), - goqu.L("SUM(COALESCE(e.blocks_scheduled, 0)) AS blocks_scheduled"), - goqu.L("SUM(COALESCE(e.blocks_proposed, 0)) AS blocks_proposed"), - goqu.L("SUM(COALESCE(e.sync_scheduled, 0)) AS sync_scheduled"), - goqu.L("SUM(COALESCE(e.sync_executed, 0)) AS sync_executed"), - goqu.L("SUM(CASE WHEN e.slashed THEN 1 ELSE 0 END) AS slashed_in_epoch"), - goqu.L("SUM(COALESCE(e.blocks_slashing_count, 0)) AS slashed_amount")). + goqu.L("e.validator_index"), + goqu.L(`(e.attestations_reward + e.blocks_cl_reward + e.sync_rewards) AS cl_rewards`), + goqu.L("e.attestations_scheduled"), + goqu.L("e.attestations_executed"), + goqu.L("e.blocks_scheduled"), + goqu.L("e.blocks_proposed"), + goqu.L("e.sync_scheduled"), + goqu.L("e.sync_executed"), + goqu.L("e.slashed"), + goqu.L("e.blocks_slashing_count")). Where(goqu.L("e.epoch_timestamp >= fromUnixTimestamp(?)", utils.EpochToTime(startEpoch).Unix())) elDs := goqu.Dialect("postgres"). Select( goqu.L("b.epoch"), + goqu.L("b.proposer"), goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). From(goqu.L("users_val_dashboards_validators v")). Where(goqu.L("b.epoch >= ?", startEpoch)). @@ -110,11 +119,19 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da From("relays_blocks"). Select( goqu.L("exec_block_hash"), + goqu.L("proposer_fee_recipient"), goqu.MAX("value").As("value")). Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")). - GroupBy("exec_block_hash")).As("rb"), + GroupBy("exec_block_hash", "proposer_fee_recipient")).As("rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")), - ) + ). + GroupBy(goqu.L("b.epoch"), goqu.L("b.proposer")) + + if rpInfos != nil && protocolModes.RocketPool { + // Exclude rewards that went to the smoothing pool + elDs = elDs. + Where(goqu.L("(b.exec_fee_recipient != ? OR (rb.proposer_fee_recipient IS NOT NULL AND rb.proposer_fee_recipient != ?))", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } if dashboardId.Validators == nil { rewardsDs = rewardsDs. @@ -221,12 +238,10 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da if dashboardId.AggregateGroups { rewardsDs = rewardsDs. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - GroupBy(goqu.L("e.epoch")) + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) elDs = elDs. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - GroupBy(goqu.L("b.epoch")) + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) if isReverseDirection { rewardsDs = rewardsDs.Order(goqu.L("e.epoch").Desc()) @@ -237,11 +252,10 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da } } else { rewardsDs = rewardsDs. - SelectAppend(goqu.L("v.group_id AS result_group_id")). - GroupBy(goqu.L("e.epoch"), goqu.L("result_group_id")) + SelectAppend(goqu.L("v.group_id AS result_group_id")) elDs = elDs. SelectAppend(goqu.L("v.group_id AS result_group_id")). - GroupBy(goqu.L("b.epoch"), goqu.L("result_group_id")) + GroupByAppend(goqu.L("result_group_id")) if isReverseDirection { rewardsDs = rewardsDs.Order(goqu.L("e.epoch").Desc(), goqu.L("result_group_id").Desc()) @@ -255,12 +269,10 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da // In case a list of validators is provided set the group to the default id rewardsDs = rewardsDs. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("e.validator_index IN ?", dashboardId.Validators)). - GroupBy(goqu.L("e.epoch")) + Where(goqu.L("e.validator_index IN ?", dashboardId.Validators)) elDs = elDs. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))). - GroupBy(goqu.L("b.epoch")) + Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))) if currentCursor.IsValid() { rewardsDs = rewardsDs.Where(goqu.L(fmt.Sprintf("e.epoch_timestamp %s fromUnixTimestamp(?)", sortSearchDirection), utils.EpochToTime(currentCursor.Epoch).Unix())) @@ -295,21 +307,40 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da // ------------------------------------------------------------------------------------------------------------------ // Build the main query and get the data - queryResult := []struct { - Epoch uint64 `db:"epoch"` - GroupId int64 `db:"result_group_id"` - ClRewards int64 `db:"cl_rewards"` - AttestationsScheduled uint64 `db:"attestations_scheduled"` - AttestationsExecuted uint64 `db:"attestations_executed"` - BlocksScheduled uint64 `db:"blocks_scheduled"` - BlocksProposed uint64 `db:"blocks_proposed"` - SyncScheduled uint64 `db:"sync_scheduled"` - SyncExecuted uint64 `db:"sync_executed"` - SlashedInEpoch uint64 `db:"slashed_in_epoch"` - SlashedAmount uint64 `db:"slashed_amount"` - }{} + + type QueryResultSum struct { + Epoch uint64 + GroupId int64 + ClRewards decimal.Decimal + AttestationsScheduled uint64 + AttestationsExecuted uint64 + BlocksScheduled uint64 + BlocksProposed uint64 + SyncScheduled uint64 + SyncExecuted uint64 + Slashed uint64 + BlocksSlashingCount uint64 + } + var queryResultSum []QueryResultSum + smoothingPoolRewards := make(map[uint64]map[int64]decimal.Decimal, 0) // epoch -> group -> reward wg.Go(func() error { + type QueryResult struct { + Epoch uint64 `db:"epoch"` + GroupId int64 `db:"result_group_id"` + ValidatorIndex uint64 `db:"validator_index"` + ClRewards int64 `db:"cl_rewards"` + AttestationsScheduled uint64 `db:"attestations_scheduled"` + AttestationsExecuted uint64 `db:"attestations_executed"` + BlocksScheduled uint64 `db:"blocks_scheduled"` + BlocksProposed uint64 `db:"blocks_proposed"` + SyncScheduled uint64 `db:"sync_scheduled"` + SyncExecuted uint64 `db:"sync_executed"` + Slashed bool `db:"slashed"` + BlocksSlashingCount uint64 `db:"blocks_slashing_count"` + } + var queryResult []QueryResult + query, args, err := rewardsDs.Prepared(true).ToSQL() if err != nil { return fmt.Errorf("error preparing query: %w", err) @@ -317,8 +348,57 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving rewards data: %w", err) + return fmt.Errorf("error retrieving rewards data: %v", err) + } + + validatorGroupMap := make(map[uint64]int64) + for _, row := range queryResult { + if len(queryResultSum) == 0 || + queryResultSum[len(queryResultSum)-1].Epoch != row.Epoch || + queryResultSum[len(queryResultSum)-1].GroupId != row.GroupId { + queryResultSum = append(queryResultSum, QueryResultSum{ + Epoch: row.Epoch, + GroupId: row.GroupId, + }) + } + + validatorGroupMap[row.ValidatorIndex] = row.GroupId + + current := &queryResultSum[len(queryResultSum)-1] + + current.AttestationsScheduled += row.AttestationsScheduled + current.AttestationsExecuted += row.AttestationsExecuted + current.BlocksScheduled += row.BlocksScheduled + current.BlocksProposed += row.BlocksProposed + current.SyncScheduled += row.SyncScheduled + current.SyncExecuted += row.SyncExecuted + if row.Slashed { + current.Slashed++ + } + current.BlocksSlashingCount += row.BlocksSlashingCount + + reward := utils.GWeiToWei(big.NewInt(row.ClRewards)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[row.ValidatorIndex]; ok { + reward = reward.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + current.ClRewards = current.ClRewards.Add(reward) + } + + // Calculate smoothing pool rewards + // Has to be done here in the cl and not el part because here we have the list of all relevant validators + if rpInfos != nil && protocolModes.RocketPool { + for validatorIndex, groupId := range validatorGroupMap { + for epoch, reward := range rpInfos.Minipool[validatorIndex].SmoothingPoolRewards { + if _, ok := smoothingPoolRewards[epoch]; !ok { + smoothingPoolRewards[epoch] = make(map[int64]decimal.Decimal) + } + smoothingPoolRewards[epoch][groupId] = smoothingPoolRewards[epoch][groupId].Add(reward) + } + } } + return nil }) @@ -327,6 +407,7 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da elRewards := make(map[uint64]map[int64]decimal.Decimal) wg.Go(func() error { elQueryResult := []struct { + Proposer uint64 `db:"proposer"` Epoch uint64 `db:"epoch"` GroupId int64 `db:"result_group_id"` ElRewards decimal.Decimal `db:"el_rewards"` @@ -346,7 +427,14 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da if _, ok := elRewards[entry.Epoch]; !ok { elRewards[entry.Epoch] = make(map[int64]decimal.Decimal) } - elRewards[entry.Epoch][entry.GroupId] = entry.ElRewards + + reward := entry.ElRewards + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.Proposer]; ok { + reward = reward.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + elRewards[entry.Epoch][entry.GroupId] = elRewards[entry.Epoch][entry.GroupId].Add(reward) } return nil }) @@ -356,13 +444,25 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da return nil, nil, fmt.Errorf("error retrieving validator dashboard rewards data: %w", err) } + // Add smoothing pool rewards to el rewards + if rpInfos != nil && protocolModes.RocketPool { + for epoch, groupRewards := range smoothingPoolRewards { + for groupId, reward := range groupRewards { + if _, ok := elRewards[epoch]; !ok { + elRewards[epoch] = make(map[int64]decimal.Decimal) + } + elRewards[epoch][groupId] = elRewards[epoch][groupId].Add(reward) + } + } + } + // ------------------------------------------------------------------------------------------------------------------ // Create the result without the total rewards first resultWoTotal := make([]t.VDBRewardsTableRow, 0) type TotalEpochInfo struct { Groups []uint64 - ClRewards int64 + ClRewards decimal.Decimal ElRewards decimal.Decimal AttestationsScheduled uint64 AttestationsExecuted uint64 @@ -372,9 +472,9 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da SyncExecuted uint64 Slashings uint64 } - totalEpochInfo := make(map[uint64]*TotalEpochInfo, 0) - for _, res := range queryResult { + totalEpochInfo := make(map[uint64]*TotalEpochInfo, 0) + for _, res := range queryResultSum { duty := t.VDBRewardsTableDuty{} if res.AttestationsScheduled > 0 { attestationPercentage := (float64(res.AttestationsExecuted) / float64(res.AttestationsScheduled)) * 100.0 @@ -389,7 +489,7 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da duty.Sync = &SyncPercentage } - slashings := res.SlashedInEpoch + res.SlashedAmount + slashings := res.Slashed + res.BlocksSlashingCount if slashings > 0 { duty.Slashing = &slashings } @@ -402,7 +502,7 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da GroupId: res.GroupId, Reward: t.ClElValue[decimal.Decimal]{ El: elRewards[res.Epoch][res.GroupId], - Cl: utils.GWeiToWei(big.NewInt(res.ClRewards)), + Cl: res.ClRewards, }, }) @@ -411,7 +511,7 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da totalEpochInfo[res.Epoch] = &TotalEpochInfo{} } totalEpochInfo[res.Epoch].Groups = append(totalEpochInfo[res.Epoch].Groups, uint64(res.GroupId)) - totalEpochInfo[res.Epoch].ClRewards += res.ClRewards + totalEpochInfo[res.Epoch].ClRewards = totalEpochInfo[res.Epoch].ClRewards.Add(res.ClRewards) totalEpochInfo[res.Epoch].ElRewards = totalEpochInfo[res.Epoch].ElRewards.Add(elRewards[res.Epoch][res.GroupId]) totalEpochInfo[res.Epoch].AttestationsScheduled += res.AttestationsScheduled totalEpochInfo[res.Epoch].AttestationsExecuted += res.AttestationsExecuted @@ -458,7 +558,7 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da GroupId: t.AllGroups, Reward: t.ClElValue[decimal.Decimal]{ El: totalInfo.ElRewards, - Cl: utils.GWeiToWei(big.NewInt(totalInfo.ClRewards)), + Cl: totalInfo.ClRewards, }, } } @@ -517,10 +617,10 @@ func (d *DataAccessService) GetValidatorDashboardRewards(ctx context.Context, da } func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Context, dashboardId t.VDBId, groupId int64, epoch uint64, protocolModes t.VDBProtocolModes) (*t.VDBGroupRewardsData, error) { - // @DATA-ACCESS incorporate protocolModes ret := &t.VDBGroupRewardsData{} wg := errgroup.Group{} + var err error if dashboardId.AggregateGroups { // If we are aggregating groups then ignore the group id and sum up everything @@ -528,35 +628,47 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex } // ------------------------------------------------------------------------------------------------------------------ - // Build the query that serves as base for both the main and EL rewards queries + // Get rocketpool minipool infos if needed + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, groupId) + if err != nil { + return nil, err + } + } + + // ------------------------------------------------------------------------------------------------------------------ + // Build the main and EL rewards queries rewardsDs := goqu.Dialect("postgres"). From(goqu.L("validator_dashboard_data_epoch e")). With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( - goqu.L("COALESCE(e.attestations_source_reward, 0) AS attestations_source_reward"), - goqu.L("COALESCE(e.attestations_target_reward, 0) AS attestations_target_reward"), - goqu.L("COALESCE(e.attestations_head_reward, 0) AS attestations_head_reward"), - goqu.L("COALESCE(e.attestations_inactivity_reward, 0) AS attestations_inactivity_reward"), - goqu.L("COALESCE(e.attestations_inclusion_reward, 0) AS attestations_inclusion_reward"), - goqu.L("COALESCE(e.attestations_scheduled, 0) AS attestations_scheduled"), - goqu.L("COALESCE(e.attestation_head_executed, 0) AS attestation_head_executed"), - goqu.L("COALESCE(e.attestation_source_executed, 0) AS attestation_source_executed"), - goqu.L("COALESCE(e.attestation_target_executed, 0) AS attestation_target_executed"), - goqu.L("COALESCE(e.blocks_scheduled, 0) AS blocks_scheduled"), - goqu.L("COALESCE(e.blocks_proposed, 0) AS blocks_proposed"), - goqu.L("COALESCE(e.blocks_cl_reward, 0) AS blocks_cl_reward"), - goqu.L("COALESCE(e.sync_scheduled, 0) AS sync_scheduled"), - goqu.L("COALESCE(e.sync_executed, 0) AS sync_executed"), - goqu.L("COALESCE(e.sync_rewards, 0) AS sync_rewards"), - goqu.L("(CASE WHEN e.slashed THEN 1 ELSE 0 END) AS slashed_in_epoch"), - goqu.L("COALESCE(e.blocks_slashing_count, 0) AS slashed_amount"), - goqu.L("COALESCE(e.blocks_cl_slasher_reward, 0) AS slasher_reward"), - goqu.L("COALESCE(e.blocks_cl_attestations_reward, 0) AS blocks_cl_attestations_reward"), - goqu.L("COALESCE(e.blocks_cl_sync_aggregate_reward, 0) AS blocks_cl_sync_aggregate_reward")). + goqu.L("e.validator_index"), + goqu.L("e.attestations_source_reward"), + goqu.L("e.attestations_target_reward"), + goqu.L("e.attestations_head_reward"), + goqu.L("e.attestations_inactivity_reward"), + goqu.L("e.attestations_inclusion_reward"), + goqu.L("e.attestations_scheduled"), + goqu.L("e.attestation_head_executed"), + goqu.L("e.attestation_source_executed"), + goqu.L("e.attestation_target_executed"), + goqu.L("e.blocks_scheduled"), + goqu.L("e.blocks_proposed"), + goqu.L("e.blocks_cl_reward"), + goqu.L("e.sync_scheduled"), + goqu.L("e.sync_executed"), + goqu.L("e.sync_rewards"), + goqu.L("e.slashed"), + goqu.L("e.blocks_slashing_count"), + goqu.L("e.blocks_cl_slasher_reward"), + goqu.L("e.blocks_cl_attestations_reward"), + goqu.L("e.blocks_cl_sync_aggregate_reward")). Where(goqu.L("e.epoch_timestamp = fromUnixTimestamp(?)", utils.EpochToTime(epoch).Unix())) elDs := goqu.Dialect("postgres"). Select( + goqu.L("b.proposer"), goqu.L("COALESCE(SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)), 0) AS blocks_el_reward")). From(goqu.L("users_val_dashboards_validators v")). LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("v.validator_index = b.proposer AND b.status = '1'"))). @@ -566,16 +678,23 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex From("relays_blocks"). Select( goqu.L("exec_block_hash"), + goqu.L("proposer_fee_recipient"), goqu.MAX("value").As("value")). Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")). - GroupBy("exec_block_hash")).As("rb"), + GroupBy("exec_block_hash", "proposer_fee_recipient")).As("rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")), ). - Where(goqu.L("b.epoch = ?", epoch)) + Where(goqu.L("b.epoch = ?", epoch)). + GroupBy(goqu.L("b.proposer")) - // handle the case when we have a list of validators + if rpInfos != nil && protocolModes.RocketPool { + // Exclude rewards that went to the smoothing pool + elDs = elDs. + Where(goqu.L("(b.exec_fee_recipient != ? OR (rb.proposer_fee_recipient IS NOT NULL AND rb.proposer_fee_recipient != ?))", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } if dashboardId.Validators == nil { + // handle the case when we have a dashboard id and an optional group id rewardsDs = rewardsDs. InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("e.validator_index = v.validator_index"))). Where(goqu.L("e.validator_index IN (SELECT validator_index FROM validators)")) @@ -585,7 +704,8 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex rewardsDs = rewardsDs.Where(goqu.L("v.group_id = ?", groupId)) elDs = elDs.Where(goqu.L("v.group_id = ?", groupId)) } - } else { // handle the case when we have a dashboard id and an optional group id + } else { + // handle the case when we have a list of validators rewardsDs = rewardsDs. Where(goqu.L("e.validator_index IN ?", dashboardId.Validators)) elDs = elDs. @@ -595,6 +715,8 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex // ------------------------------------------------------------------------------------------------------------------ // Build the main query and get the data queryResult := []struct { + ValidatorIndex uint64 `db:"validator_index"` + AttestationSourceReward decimal.Decimal `db:"attestations_source_reward"` AttestationTargetReward decimal.Decimal `db:"attestations_target_reward"` AttestationHeadReward decimal.Decimal `db:"attestations_head_reward"` @@ -614,10 +736,10 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex SyncExecuted uint32 `db:"sync_executed"` SyncRewards decimal.Decimal `db:"sync_rewards"` - SlashedInEpoch bool `db:"slashed_in_epoch"` - SlashedAmount uint32 `db:"slashed_amount"` - SlasherRewards decimal.Decimal `db:"slasher_reward"` + Slashed bool `db:"slashed"` + BlocksSlashingCount uint32 `db:"blocks_slashing_count"` + BlocksClSlasherReward decimal.Decimal `db:"blocks_cl_slasher_reward"` BlocksClAttestationsReward decimal.Decimal `db:"blocks_cl_attestations_reward"` BlockClSyncAggregateReward decimal.Decimal `db:"blocks_cl_sync_aggregate_reward"` }{} @@ -637,21 +759,30 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex // ------------------------------------------------------------------------------------------------------------------ // Get the EL rewards - var elRewards decimal.Decimal + elRewards := make(map[uint64]decimal.Decimal) wg.Go(func() error { + elQueryResult := []struct { + Proposer uint64 `db:"proposer"` + ElRewards decimal.Decimal `db:"blocks_el_reward"` + }{} + query, args, err := elDs.Prepared(true).ToSQL() if err != nil { return fmt.Errorf("error preparing query: %w", err) } - err = d.readerDb.GetContext(ctx, &elRewards, query, args...) - if err != nil && !errors.Is(err, sql.ErrNoRows) { + err = d.readerDb.SelectContext(ctx, &elQueryResult, query, args...) + if err != nil { return fmt.Errorf("error retrieving el rewards data for group rewards: %w", err) } + + for _, entry := range elQueryResult { + elRewards[entry.Proposer] = entry.ElRewards + } return nil }) - err := wg.Wait() + err = wg.Wait() if err != nil { return nil, fmt.Errorf("error retrieving validator dashboard group rewards data: %w", err) } @@ -661,57 +792,70 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(ctx context.Contex gWei := decimal.NewFromInt(1e9) for _, entry := range queryResult { - ret.AttestationsHead.Income = ret.AttestationsHead.Income.Add(entry.AttestationHeadReward.Mul(gWei)) + rpFactor := decimal.NewFromInt(1) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.ValidatorIndex]; ok { + rpFactor = d.getRocketPoolOperatorFactor(rpValidator) + } + } + + ret.AttestationsHead.Income = ret.AttestationsHead.Income.Add(entry.AttestationHeadReward.Mul(gWei).Mul(rpFactor)) ret.AttestationsHead.StatusCount.Success += uint64(entry.AttestationHeadExecuted) ret.AttestationsHead.StatusCount.Failed += uint64(entry.AttestationsScheduled) - uint64(entry.AttestationHeadExecuted) - ret.AttestationsSource.Income = ret.AttestationsSource.Income.Add(entry.AttestationSourceReward.Mul(gWei)) + ret.AttestationsSource.Income = ret.AttestationsSource.Income.Add(entry.AttestationSourceReward.Mul(gWei).Mul(rpFactor)) ret.AttestationsSource.StatusCount.Success += uint64(entry.AttestationSourceExecuted) ret.AttestationsSource.StatusCount.Failed += uint64(entry.AttestationsScheduled) - uint64(entry.AttestationSourceExecuted) - ret.AttestationsTarget.Income = ret.AttestationsTarget.Income.Add(entry.AttestationTargetReward.Mul(gWei)) + ret.AttestationsTarget.Income = ret.AttestationsTarget.Income.Add(entry.AttestationTargetReward.Mul(gWei).Mul(rpFactor)) ret.AttestationsTarget.StatusCount.Success += uint64(entry.AttestationTargetExecuted) ret.AttestationsTarget.StatusCount.Failed += uint64(entry.AttestationsScheduled) - uint64(entry.AttestationTargetExecuted) - ret.Inactivity.Income = ret.Inactivity.Income.Add(entry.AttestationInactivitytReward.Mul(gWei)) + ret.Inactivity.Income = ret.Inactivity.Income.Add(entry.AttestationInactivitytReward.Mul(gWei).Mul(rpFactor)) if entry.AttestationInactivitytReward.LessThan(decimal.Zero) { ret.Inactivity.StatusCount.Failed++ } else { ret.Inactivity.StatusCount.Success++ } - ret.Proposal.Income = ret.Proposal.Income.Add(entry.BlocksClReward.Mul(gWei)) + elReward := elRewards[entry.ValidatorIndex].Mul(rpFactor) + if rpInfos != nil && protocolModes.RocketPool { + if _, ok := rpInfos.Minipool[entry.ValidatorIndex]; ok { + if _, ok := rpInfos.Minipool[entry.ValidatorIndex].SmoothingPoolRewards[epoch]; ok { + elReward = elReward.Add(rpInfos.Minipool[entry.ValidatorIndex].SmoothingPoolRewards[epoch]) + } + } + } + + ret.Proposal.Income = ret.Proposal.Income.Add(entry.BlocksClReward.Mul(gWei).Mul(rpFactor).Add(elReward)) + ret.ProposalElReward = ret.ProposalElReward.Add(elReward) ret.Proposal.StatusCount.Success += uint64(entry.BlocksProposed) ret.Proposal.StatusCount.Failed += uint64(entry.BlocksScheduled) - uint64(entry.BlocksProposed) - ret.Sync.Income = ret.Sync.Income.Add(entry.SyncRewards.Mul(gWei)) + ret.Sync.Income = ret.Sync.Income.Add(entry.SyncRewards.Mul(gWei).Mul(rpFactor)) ret.Sync.StatusCount.Success += uint64(entry.SyncExecuted) ret.Sync.StatusCount.Failed += uint64(entry.SyncScheduled) - uint64(entry.SyncExecuted) - ret.Slashing.Income = ret.Slashing.Income.Add(entry.SlasherRewards.Mul(gWei)) - ret.Slashing.StatusCount.Success += uint64(entry.SlashedAmount) - if entry.SlashedInEpoch { + ret.Slashing.StatusCount.Success += uint64(entry.BlocksSlashingCount) + if entry.Slashed { ret.Slashing.StatusCount.Failed++ } - ret.ProposalClAttIncReward = ret.ProposalClAttIncReward.Add(entry.BlocksClAttestationsReward.Mul(gWei)) - ret.ProposalClSyncIncReward = ret.ProposalClSyncIncReward.Add(entry.BlockClSyncAggregateReward.Mul(gWei)) - ret.ProposalClSlashingIncReward = ret.ProposalClSlashingIncReward.Add(entry.SlasherRewards.Mul(gWei)) + ret.ProposalClAttIncReward = ret.ProposalClAttIncReward.Add(entry.BlocksClAttestationsReward.Mul(gWei).Mul(rpFactor)) + ret.ProposalClSyncIncReward = ret.ProposalClSyncIncReward.Add(entry.BlockClSyncAggregateReward.Mul(gWei).Mul(rpFactor)) + ret.ProposalClSlashingIncReward = ret.ProposalClSlashingIncReward.Add(entry.BlocksClSlasherReward.Mul(gWei).Mul(rpFactor)) } - ret.Proposal.Income = ret.Proposal.Income.Add(elRewards) - ret.ProposalElReward = elRewards - return ret, nil } func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Context, dashboardId t.VDBId, protocolModes t.VDBProtocolModes) (*t.ChartData[int, decimal.Decimal], error) { - // @DATA-ACCESS incorporate protocolModes // bar chart for the CL and EL rewards for each group for each epoch. // NO series for all groups combined except if AggregateGroups is true. // series id is group id, series property is 'cl' or 'el' wg := errgroup.Group{} + var err error latestFinalizedEpoch := cache.LatestFinalizedEpoch.Get() const epochLookBack = 224 @@ -721,17 +865,29 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex } // ------------------------------------------------------------------------------------------------------------------ - // Build the query that serves as base for both the main and EL rewards queries + // Get rocketpool minipool infos if needed + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, err + } + } + + // ------------------------------------------------------------------------------------------------------------------ + // Build the main and EL rewards queries rewardsDs := goqu.Dialect("postgres"). Select( + goqu.L("e.validator_index"), goqu.L("e.epoch"), - goqu.L(`SUM(COALESCE(e.attestations_reward, 0) + COALESCE(e.blocks_cl_reward, 0) + COALESCE(e.sync_rewards, 0)) AS cl_rewards`)). + goqu.L(`(e.attestations_reward + e.blocks_cl_reward + e.sync_rewards) AS cl_rewards`)). From(goqu.L("validator_dashboard_data_epoch e")). With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Where(goqu.L("e.epoch_timestamp >= fromUnixTimestamp(?)", utils.EpochToTime(startEpoch).Unix())) elDs := goqu.Dialect("postgres"). Select( + goqu.L("b.proposer"), goqu.L("b.epoch"), goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). From(goqu.L("users_val_dashboards_validators v")). @@ -742,12 +898,20 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex From("relays_blocks"). Select( goqu.L("exec_block_hash"), + goqu.L("proposer_fee_recipient"), goqu.MAX("value").As("value")). Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")). - GroupBy("exec_block_hash")).As("rb"), + GroupBy("exec_block_hash", "proposer_fee_recipient")).As("rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")), ). - Where(goqu.L("b.epoch >= ?", startEpoch)) + Where(goqu.L("b.epoch >= ?", startEpoch)). + GroupBy(goqu.L("b.epoch"), goqu.L("b.proposer")) + + if rpInfos != nil && protocolModes.RocketPool { + // Exclude rewards that went to the smoothing pool + elDs = elDs. + Where(goqu.L("(b.exec_fee_recipient != ? OR (rb.proposer_fee_recipient IS NOT NULL AND rb.proposer_fee_recipient != ?))", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } if dashboardId.Validators == nil { rewardsDs = rewardsDs. @@ -759,20 +923,17 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex if dashboardId.AggregateGroups { rewardsDs = rewardsDs. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - GroupBy(goqu.L("e.epoch")). Order(goqu.L("e.epoch").Asc()) elDs = elDs. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - GroupBy(goqu.L("b.epoch")). Order(goqu.L("b.epoch").Asc()) } else { rewardsDs = rewardsDs. SelectAppend(goqu.L("v.group_id AS result_group_id")). - GroupBy(goqu.L("e.epoch"), goqu.L("result_group_id")). Order(goqu.L("e.epoch").Asc(), goqu.L("result_group_id").Asc()) elDs = elDs. SelectAppend(goqu.L("v.group_id AS result_group_id")). - GroupBy(goqu.L("b.epoch"), goqu.L("result_group_id")). + GroupByAppend(goqu.L("result_group_id")). Order(goqu.L("b.epoch").Asc(), goqu.L("result_group_id").Asc()) } } else { @@ -780,24 +941,31 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex rewardsDs = rewardsDs. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). Where(goqu.L("e.validator_index IN ?", dashboardId.Validators)). - GroupBy(goqu.L("e.epoch")). Order(goqu.L("e.epoch").Asc()) elDs = elDs. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))). - GroupBy(goqu.L("b.epoch")). Order(goqu.L("b.epoch").Asc()) } // ------------------------------------------------------------------------------------------------------------------ // Build the main query and get the data - queryResult := []struct { - Epoch uint64 `db:"epoch"` - GroupId uint64 `db:"result_group_id"` - ClRewards int64 `db:"cl_rewards"` - }{} + type QueryResultSum struct { + Epoch uint64 + GroupId uint64 + ClRewards decimal.Decimal + } + var queryResultSum []QueryResultSum + smoothingPoolRewards := make(map[uint64]map[uint64]decimal.Decimal, 0) // epoch -> group -> reward wg.Go(func() error { + queryResult := []struct { + ValidatorIndex uint64 `db:"validator_index"` + Epoch uint64 `db:"epoch"` + GroupId uint64 `db:"result_group_id"` + ClRewards int64 `db:"cl_rewards"` + }{} + query, args, err := rewardsDs.Prepared(true).ToSQL() if err != nil { return fmt.Errorf("error preparing query: %w", err) @@ -805,8 +973,45 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving rewards chart data: %w", err) + return fmt.Errorf("error retrieving rewards chart data: %v", err) + } + + validatorGroupMap := make(map[uint64]uint64) + for _, entry := range queryResult { + if len(queryResultSum) == 0 || + queryResultSum[len(queryResultSum)-1].Epoch != entry.Epoch || + queryResultSum[len(queryResultSum)-1].GroupId != entry.GroupId { + queryResultSum = append(queryResultSum, QueryResultSum{ + Epoch: entry.Epoch, + GroupId: entry.GroupId, + }) + } + + validatorGroupMap[entry.ValidatorIndex] = entry.GroupId + + current := &queryResultSum[len(queryResultSum)-1] + reward := utils.GWeiToWei(big.NewInt(entry.ClRewards)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.ValidatorIndex]; ok { + reward = reward.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + current.ClRewards = current.ClRewards.Add(reward) + } + + // Calculate smoothing pool rewards + // Has to be done here in the cl and not el part because here we have the list of all relevant validators + if rpInfos != nil && protocolModes.RocketPool { + for validatorIndex, groupId := range validatorGroupMap { + for epoch, reward := range rpInfos.Minipool[validatorIndex].SmoothingPoolRewards { + if _, ok := smoothingPoolRewards[epoch]; !ok { + smoothingPoolRewards[epoch] = make(map[uint64]decimal.Decimal) + } + smoothingPoolRewards[epoch][groupId] = smoothingPoolRewards[epoch][groupId].Add(reward) + } + } } + return nil }) @@ -815,6 +1020,7 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex elRewards := make(map[uint64]map[uint64]decimal.Decimal) wg.Go(func() error { elQueryResult := []struct { + Proposer uint64 `db:"proposer"` Epoch uint64 `db:"epoch"` GroupId uint64 `db:"result_group_id"` ElRewards decimal.Decimal `db:"el_rewards"` @@ -834,22 +1040,42 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex if _, ok := elRewards[entry.Epoch]; !ok { elRewards[entry.Epoch] = make(map[uint64]decimal.Decimal) } - elRewards[entry.Epoch][entry.GroupId] = entry.ElRewards + + reward := entry.ElRewards + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.Proposer]; ok { + reward = reward.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + elRewards[entry.Epoch][entry.GroupId] = elRewards[entry.Epoch][entry.GroupId].Add(reward) } + return nil }) - err := wg.Wait() + err = wg.Wait() if err != nil { return nil, fmt.Errorf("error retrieving validator dashboard rewards chart data: %w", err) } + // Add smoothing pool rewards to el rewards + if rpInfos != nil && protocolModes.RocketPool { + for epoch, groupRewards := range smoothingPoolRewards { + for groupId, reward := range groupRewards { + if _, ok := elRewards[epoch]; !ok { + elRewards[epoch] = make(map[uint64]decimal.Decimal) + } + elRewards[epoch][groupId] = elRewards[epoch][groupId].Add(reward) + } + } + } + // ------------------------------------------------------------------------------------------------------------------ // Create a map structure to store the data epochData := make(map[uint64]map[uint64]t.ClElValue[decimal.Decimal]) epochList := make([]uint64, 0) - for _, res := range queryResult { + for _, res := range queryResultSum { if _, ok := epochData[res.Epoch]; !ok { epochData[res.Epoch] = make(map[uint64]t.ClElValue[decimal.Decimal]) epochList = append(epochList, res.Epoch) @@ -857,7 +1083,7 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(ctx context.Contex epochData[res.Epoch][res.GroupId] = t.ClElValue[decimal.Decimal]{ El: elRewards[res.Epoch][res.GroupId], - Cl: utils.GWeiToWei(big.NewInt(res.ClRewards)), + Cl: res.ClRewards, } } @@ -952,36 +1178,46 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das } } + // ------------------------------------------------------------------------------------------------------------------ + // Get rocketpool minipool infos if needed + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, groupId) + if err != nil { + return nil, nil, err + } + } + // ------------------------------------------------------------------------------------------------------------------ // Build the main and EL rewards queries rewardsDs := goqu.Dialect("postgres"). Select( goqu.L("e.validator_index"), - goqu.L("COALESCE(e.attestations_scheduled, 0) AS attestations_scheduled"), - goqu.L("COALESCE(e.attestation_source_executed, 0) AS attestation_source_executed"), - goqu.L("COALESCE(e.attestations_source_reward, 0) AS attestations_source_reward"), - goqu.L("COALESCE(e.attestation_target_executed, 0) AS attestation_target_executed"), - goqu.L("COALESCE(e.attestations_target_reward, 0) AS attestations_target_reward"), - goqu.L("COALESCE(e.attestation_head_executed, 0) AS attestation_head_executed"), - goqu.L("COALESCE(e.attestations_head_reward, 0) AS attestations_head_reward"), - goqu.L("COALESCE(e.sync_scheduled, 0) AS sync_scheduled"), - goqu.L("COALESCE(e.sync_executed, 0) AS sync_executed"), - goqu.L("COALESCE(e.sync_rewards, 0) AS sync_rewards"), - goqu.L("e.slashed AS slashed_in_epoch"), - goqu.L("COALESCE(e.blocks_slashing_count, 0) AS slashed_amount"), - goqu.L("COALESCE(e.blocks_cl_slasher_reward, 0) AS slasher_reward"), - goqu.L("COALESCE(e.blocks_scheduled, 0) AS blocks_scheduled"), - goqu.L("COALESCE(e.blocks_proposed, 0) AS blocks_proposed"), - goqu.L("COALESCE(e.blocks_cl_attestations_reward, 0) AS blocks_cl_attestations_reward"), - goqu.L("COALESCE(e.blocks_cl_sync_aggregate_reward, 0) AS blocks_cl_sync_aggregate_reward")). + goqu.L("e.attestations_scheduled"), + goqu.L("e.attestation_source_executed"), + goqu.L("e.attestations_source_reward"), + goqu.L("e.attestation_target_executed"), + goqu.L("e.attestations_target_reward"), + goqu.L("e.attestation_head_executed"), + goqu.L("e.attestations_head_reward"), + goqu.L("e.sync_scheduled"), + goqu.L("e.sync_executed"), + goqu.L("e.sync_rewards"), + goqu.L("e.slashed"), + goqu.L("e.blocks_slashing_count"), + goqu.L("e.blocks_cl_slasher_reward"), + goqu.L("e.blocks_scheduled"), + goqu.L("e.blocks_proposed"), + goqu.L("e.blocks_cl_attestations_reward"), + goqu.L("e.blocks_cl_sync_aggregate_reward")). From(goqu.L("validator_dashboard_data_epoch e")). Where(goqu.L("e.epoch_timestamp = fromUnixTimestamp(?)", utils.EpochToTime(epoch).Unix())). Where(goqu.L(` - (COALESCE(e.attestations_scheduled, 0) + - COALESCE(e.sync_scheduled,0) + - COALESCE(e.blocks_scheduled,0) + + (e.attestations_scheduled + + e.sync_scheduled + + e.blocks_scheduled + CASE WHEN e.slashed THEN 1 ELSE 0 END + - COALESCE(e.blocks_slashing_count, 0)) > 0`)) + e.blocks_slashing_count) > 0`)) elDs := goqu.Dialect("postgres"). Select( @@ -994,15 +1230,22 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das From("relays_blocks"). Select( goqu.L("exec_block_hash"), + goqu.L("proposer_fee_recipient"), goqu.MAX("value").As("value")). Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")). - GroupBy("exec_block_hash")).As("rb"), + GroupBy("exec_block_hash", "proposer_fee_recipient")).As("rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")), ). Where(goqu.L("b.epoch = ?", epoch)). Where(goqu.L("b.status = '1'")). GroupBy(goqu.L("b.proposer")) + if rpInfos != nil && protocolModes.RocketPool { + // Exclude rewards that went to the smoothing pool + elDs = elDs. + Where(goqu.L("(b.exec_fee_recipient != ? OR (rb.proposer_fee_recipient IS NOT NULL AND rb.proposer_fee_recipient != ?))", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } + // ------------------------------------------------------------------------------------------------------------------ // Add further conditions if dashboardId.Validators == nil { @@ -1041,28 +1284,46 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das // ------------------------------------------------------------------------------------------------------------------ // Get the main data - queryResult := []struct { - ValidatorIndex uint64 `db:"validator_index"` - AttestationsScheduled uint64 `db:"attestations_scheduled"` - AttestationsSourceExecuted uint64 `db:"attestation_source_executed"` - AttestationsSourceReward int64 `db:"attestations_source_reward"` - AttestationsTargetExecuted uint64 `db:"attestation_target_executed"` - AttestationsTargetReward int64 `db:"attestations_target_reward"` - AttestationsHeadExecuted uint64 `db:"attestation_head_executed"` - AttestationsHeadReward int64 `db:"attestations_head_reward"` - SyncScheduled uint64 `db:"sync_scheduled"` - SyncExecuted uint64 `db:"sync_executed"` - SyncRewards int64 `db:"sync_rewards"` - SlashedInEpoch bool `db:"slashed_in_epoch"` - SlashedAmount uint64 `db:"slashed_amount"` - SlasherReward int64 `db:"slasher_reward"` - BlocksScheduled uint64 `db:"blocks_scheduled"` - BlocksProposed uint64 `db:"blocks_proposed"` - BlocksClAttestationsReward int64 `db:"blocks_cl_attestations_reward"` - BlocksClSyncAggregateReward int64 `db:"blocks_cl_sync_aggregate_reward"` - }{} + type QueryResultBase struct { + ValidatorIndex uint64 `db:"validator_index"` + AttestationsScheduled uint64 `db:"attestations_scheduled"` + AttestationsSourceExecuted uint64 `db:"attestation_source_executed"` + AttestationsTargetExecuted uint64 `db:"attestation_target_executed"` + AttestationsHeadExecuted uint64 `db:"attestation_head_executed"` + SyncScheduled uint64 `db:"sync_scheduled"` + SyncExecuted uint64 `db:"sync_executed"` + Slashed bool `db:"slashed"` + BlocksSlashingCount uint64 `db:"blocks_slashing_count"` + BlocksScheduled uint64 `db:"blocks_scheduled"` + BlocksProposed uint64 `db:"blocks_proposed"` + } + + type QueryResult struct { + QueryResultBase + AttestationsSourceReward int64 `db:"attestations_source_reward"` + AttestationsTargetReward int64 `db:"attestations_target_reward"` + AttestationsHeadReward int64 `db:"attestations_head_reward"` + SyncRewards int64 `db:"sync_rewards"` + BlocksClSlasherReward int64 `db:"blocks_cl_slasher_reward"` + BlocksClAttestationsReward int64 `db:"blocks_cl_attestations_reward"` + BlocksClSyncAggregateReward int64 `db:"blocks_cl_sync_aggregate_reward"` + } + type QueryResultAdjusted struct { + QueryResultBase + AttestationsSourceReward decimal.Decimal + AttestationsTargetReward decimal.Decimal + AttestationsHeadReward decimal.Decimal + SyncRewards decimal.Decimal + BlocksClSlasherReward decimal.Decimal + BlocksClAttestationsReward decimal.Decimal + BlocksClSyncAggregateReward decimal.Decimal + } + + var queryResultAdjusted []QueryResultAdjusted wg.Go(func() error { + var queryResult []QueryResult + query, args, err := rewardsDs.Prepared(true).ToSQL() if err != nil { return fmt.Errorf("error preparing query: %w", err) @@ -1070,8 +1331,44 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving validator rewards data: %w", err) + return fmt.Errorf("error retrieving validator rewards data: %v", err) + } + + for _, entry := range queryResult { + queryResultAdjusted = append(queryResultAdjusted, QueryResultAdjusted{ + QueryResultBase: QueryResultBase{ + ValidatorIndex: entry.ValidatorIndex, + AttestationsScheduled: entry.AttestationsScheduled, + AttestationsSourceExecuted: entry.AttestationsSourceExecuted, + AttestationsTargetExecuted: entry.AttestationsTargetExecuted, + AttestationsHeadExecuted: entry.AttestationsHeadExecuted, + SyncScheduled: entry.SyncScheduled, + SyncExecuted: entry.SyncExecuted, + Slashed: entry.Slashed, + BlocksSlashingCount: entry.BlocksSlashingCount, + BlocksScheduled: entry.BlocksScheduled, + BlocksProposed: entry.BlocksProposed, + }, + }) + + current := &queryResultAdjusted[len(queryResultAdjusted)-1] + + rpFactor := decimal.NewFromInt(1) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.ValidatorIndex]; ok { + rpFactor = d.getRocketPoolOperatorFactor(rpValidator) + } + } + + current.AttestationsSourceReward = utils.GWeiToWei(big.NewInt(entry.AttestationsSourceReward)).Mul(rpFactor) + current.AttestationsTargetReward = utils.GWeiToWei(big.NewInt(entry.AttestationsTargetReward)).Mul(rpFactor) + current.AttestationsHeadReward = utils.GWeiToWei(big.NewInt(entry.AttestationsHeadReward)).Mul(rpFactor) + current.SyncRewards = utils.GWeiToWei(big.NewInt(entry.SyncRewards)).Mul(rpFactor) + current.BlocksClSlasherReward = utils.GWeiToWei(big.NewInt(entry.BlocksClSlasherReward)).Mul(rpFactor) + current.BlocksClAttestationsReward = utils.GWeiToWei(big.NewInt(entry.BlocksClAttestationsReward)).Mul(rpFactor) + current.BlocksClSyncAggregateReward = utils.GWeiToWei(big.NewInt(entry.BlocksClSyncAggregateReward)).Mul(rpFactor) } + return nil }) @@ -1080,8 +1377,8 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das elRewards := make(map[uint64]decimal.Decimal) wg.Go(func() error { elQueryResult := []struct { - ValidatorIndex uint64 `db:"proposer"` - ElRewards decimal.Decimal `db:"el_rewards"` + Proposer uint64 `db:"proposer"` + ElRewards decimal.Decimal `db:"el_rewards"` }{} query, args, err := elDs.Prepared(true).ToSQL() @@ -1095,7 +1392,13 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das } for _, entry := range elQueryResult { - elRewards[entry.ValidatorIndex] = entry.ElRewards + reward := entry.ElRewards + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.Proposer]; ok { + reward = reward.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + elRewards[entry.Proposer] = reward } return nil }) @@ -1108,11 +1411,10 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das // ------------------------------------------------------------------------------------------------------------------ // Create the result cursorData := make([]t.ValidatorDutiesCursor, 0) - for _, res := range queryResult { - clReward := utils.GWeiToWei(big.NewInt( - res.AttestationsHeadReward + res.AttestationsSourceReward + res.AttestationsTargetReward + - res.SyncRewards + - res.BlocksClAttestationsReward + res.BlocksClSyncAggregateReward + res.SlasherReward)) + for _, res := range queryResultAdjusted { + clReward := res.AttestationsHeadReward.Add(res.AttestationsSourceReward).Add(res.AttestationsTargetReward). + Add(res.SyncRewards). + Add(res.BlocksClAttestationsReward).Add(res.BlocksClSyncAggregateReward).Add(res.BlocksClSlasherReward) totalReward := clReward.Add(elRewards[res.ValidatorIndex]) row := t.VDBEpochDutiesTableRow{ @@ -1129,16 +1431,16 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das row.Duties.SyncCount = res.SyncExecuted // Get slashing data - if res.SlashedInEpoch || res.SlashedAmount > 0 { + if res.Slashed || res.BlocksSlashingCount > 0 { slashedEvent := t.ValidatorHistoryEvent{ - Income: utils.GWeiToWei(big.NewInt(res.SlasherReward)), + Income: res.BlocksClSlasherReward, } - if res.SlashedInEpoch { - if res.SlashedAmount > 0 { + if res.Slashed { + if res.BlocksSlashingCount > 0 { slashedEvent.Status = "partial" } slashedEvent.Status = "failed" - } else if res.SlashedAmount > 0 { + } else if res.BlocksSlashingCount > 0 { slashedEvent.Status = "success" } row.Duties.Slashing = &slashedEvent @@ -1148,9 +1450,9 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das if res.BlocksScheduled > 0 { proposalEvent := t.ValidatorHistoryProposal{ ElIncome: elRewards[res.ValidatorIndex], - ClAttestationInclusionIncome: utils.GWeiToWei(big.NewInt(res.BlocksClAttestationsReward)), - ClSyncInclusionIncome: utils.GWeiToWei(big.NewInt(res.BlocksClSyncAggregateReward)), - ClSlashingInclusionIncome: utils.GWeiToWei(big.NewInt(res.SlasherReward)), + ClAttestationInclusionIncome: res.BlocksClAttestationsReward, + ClSyncInclusionIncome: res.BlocksClSyncAggregateReward, + ClSlashingInclusionIncome: res.BlocksClSlasherReward, } if res.BlocksProposed == 0 { @@ -1278,10 +1580,10 @@ func (d *DataAccessService) GetValidatorDashboardDuties(ctx context.Context, das return result, p, nil } -func (d *DataAccessService) getValidatorHistoryEvent(income int64, scheduledEvents, executedEvents uint64) *t.ValidatorHistoryEvent { +func (d *DataAccessService) getValidatorHistoryEvent(income decimal.Decimal, scheduledEvents, executedEvents uint64) *t.ValidatorHistoryEvent { if scheduledEvents > 0 { validatorHistoryEvent := t.ValidatorHistoryEvent{ - Income: utils.GWeiToWei(big.NewInt(income)), + Income: income, } if executedEvents == 0 { validatorHistoryEvent.Status = "failed" diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 87c45bcf8..d49a9bb9c 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "maps" "math" "math/big" "slices" @@ -29,7 +30,6 @@ import ( ) func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, dashboardId t.VDBId, period enums.TimePeriod, cursor string, colSort t.Sort[enums.VDBSummaryColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBSummaryTableRow, *t.Paging, error) { - // @DATA-ACCESS incorporate protocolModes result := make([]t.VDBSummaryTableRow, 0) var paging t.Paging @@ -99,7 +99,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da var queryResult []struct { GroupId int64 `db:"result_group_id"` GroupName string `db:"group_name"` - ValidatorIndices []uint64 `db:"validator_indices"` + ValidatorIndex uint64 `db:"validator_index"` ClRewards int64 `db:"cl_rewards"` AttestationReward decimal.Decimal `db:"attestations_reward"` AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"` @@ -109,27 +109,26 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da BlocksScheduled uint64 `db:"blocks_scheduled"` SyncExecuted uint64 `db:"sync_executed"` SyncScheduled uint64 `db:"sync_scheduled"` - MinEpochStart int64 `db:"min_epoch_start"` - MaxEpochEnd int64 `db:"max_epoch_end"` + EpochStart int64 `db:"epoch_start"` + EpochEnd int64 `db:"epoch_end"` } ds := goqu.Dialect("postgres"). From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))). With("validators", goqu.L("(SELECT dashboard_id, group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( - goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), - goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS cl_rewards"), - goqu.L("COALESCE(SUM(r.attestations_reward)::decimal, 0) AS attestations_reward"), - goqu.L("COALESCE(SUM(r.attestations_ideal_reward)::decimal, 0) AS attestations_ideal_reward"), - goqu.L("COALESCE(SUM(r.attestations_executed), 0) AS attestations_executed"), - goqu.L("COALESCE(SUM(r.attestations_scheduled), 0) AS attestations_scheduled"), - goqu.L("COALESCE(SUM(r.blocks_proposed), 0) AS blocks_proposed"), - goqu.L("COALESCE(SUM(r.blocks_scheduled), 0) AS blocks_scheduled"), - goqu.L("COALESCE(SUM(r.sync_executed), 0) AS sync_executed"), - goqu.L("COALESCE(SUM(r.sync_scheduled), 0) AS sync_scheduled"), - goqu.L("COALESCE(MIN(r.epoch_start), 0) AS min_epoch_start"), - goqu.L("COALESCE(MAX(r.epoch_end), 0) AS max_epoch_end")). - GroupBy(goqu.L("result_group_id")) + goqu.L("r.validator_index AS validator_index"), + goqu.L("(r.balance_end + r.withdrawals_amount - r.deposits_amount - r.balance_start) AS cl_rewards"), + goqu.L("r.attestations_reward::decimal AS attestations_reward"), + goqu.L("r.attestations_ideal_reward::decimal AS attestations_ideal_reward"), + goqu.L("r.attestations_executed AS attestations_executed"), + goqu.L("r.attestations_scheduled AS attestations_scheduled"), + goqu.L("r.blocks_proposed AS blocks_proposed"), + goqu.L("r.blocks_scheduled AS blocks_scheduled"), + goqu.L("r.sync_executed AS sync_executed"), + goqu.L("r.sync_scheduled AS sync_scheduled"), + goqu.L("r.epoch_start AS epoch_start"), + goqu.L("r.epoch_end AS epoch_end")) if len(validators) > 0 { ds = ds. @@ -152,8 +151,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da // Get the group names since we can filter and/or sort for them ds = ds. SelectAppend(goqu.L("g.name AS group_name")). - InnerJoin(goqu.L("users_val_dashboards_groups g"), goqu.On(goqu.L("v.group_id = g.id AND v.dashboard_id = g.dashboard_id"))). - GroupByAppend(goqu.L("group_name")) + InnerJoin(goqu.L("users_val_dashboards_groups g"), goqu.On(goqu.L("v.group_id = g.id AND v.dashboard_id = g.dashboard_id"))) } } @@ -172,22 +170,80 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da return result, &paging, nil } - epochMin := int64(math.MaxInt32) - epochMax := int64(0) + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, nil, err + } + } + + type QueryResultSum struct { + GroupId int64 + GroupName string + ValidatorIndices []uint64 + ClRewards decimal.Decimal + AttestationReward decimal.Decimal + AttestationIdealReward decimal.Decimal + AttestationsExecuted uint64 + AttestationsScheduled uint64 + BlocksProposed uint64 + BlocksScheduled uint64 + SyncExecuted uint64 + SyncScheduled uint64 + } + + epochStart := int64(math.MaxInt32) + epochEnd := int64(0) + + queryResultSumMap := make(map[int64]QueryResultSum) + validatorGroupMap := make(map[uint64]int64) for _, row := range queryResult { - if row.MinEpochStart < epochMin { - epochMin = row.MinEpochStart + if row.EpochStart < epochStart { + epochStart = row.EpochStart + } + if row.EpochEnd > epochEnd { + epochEnd = row.EpochEnd } - if row.MaxEpochEnd > epochMax { - epochMax = row.MaxEpochEnd + + if _, ok := queryResultSumMap[row.GroupId]; !ok { + queryResultSumMap[row.GroupId] = QueryResultSum{ + GroupId: row.GroupId, + GroupName: row.GroupName, + } + } + + validatorGroupMap[row.ValidatorIndex] = row.GroupId + + groupSum := queryResultSumMap[row.GroupId] + groupSum.ValidatorIndices = append(groupSum.ValidatorIndices, row.ValidatorIndex) + groupSum.AttestationReward = groupSum.AttestationReward.Add(row.AttestationReward) + groupSum.AttestationIdealReward = groupSum.AttestationIdealReward.Add(row.AttestationIdealReward) + groupSum.AttestationsExecuted += row.AttestationsExecuted + groupSum.AttestationsScheduled += row.AttestationsScheduled + groupSum.BlocksProposed += row.BlocksProposed + groupSum.BlocksScheduled += row.BlocksScheduled + groupSum.SyncExecuted += row.SyncExecuted + groupSum.SyncScheduled += row.SyncScheduled + + clRewardWei := utils.GWeiToWei(big.NewInt(row.ClRewards)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[row.ValidatorIndex]; ok { + clRewardWei = clRewardWei.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } } + groupSum.ClRewards = groupSum.ClRewards.Add(clRewardWei) + queryResultSumMap[row.GroupId] = groupSum } + queryResultSum := slices.Collect(maps.Values(queryResultSumMap)) + // ------------------------------------------------------------------------------------------------------------------ // Get the EL rewards elRewards := make(map[int64]decimal.Decimal) ds = goqu.Dialect("postgres"). Select( + goqu.L("b.proposer"), goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). From(goqu.L("blocks b")). LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))). @@ -196,13 +252,20 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da From("relays_blocks"). Select( goqu.L("exec_block_hash"), + goqu.L("proposer_fee_recipient"), goqu.MAX("value").As("value")). Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")). - GroupBy("exec_block_hash")).As("rb"), + GroupBy("exec_block_hash", "proposer_fee_recipient")).As("rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")), ). - Where(goqu.L("b.epoch >= ? AND b.epoch <= ? AND b.status = '1'", epochMin, epochMax)). - GroupBy(goqu.L("result_group_id")) + Where(goqu.L("b.epoch >= ? AND b.epoch <= ? AND b.status = '1'", epochStart, epochEnd)). + GroupBy(goqu.L("b.proposer")) + + if rpInfos != nil && protocolModes.RocketPool { + // Exclude rewards that went to the smoothing pool + ds = ds. + Where(goqu.L("(b.exec_fee_recipient != ? OR (rb.proposer_fee_recipient IS NOT NULL AND rb.proposer_fee_recipient != ?))", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } if len(validators) > 0 { ds = ds. @@ -214,7 +277,8 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) } else { ds = ds. - SelectAppend(goqu.L("v.group_id AS result_group_id")) + SelectAppend(goqu.L("v.group_id AS result_group_id")). + GroupByAppend(goqu.L("result_group_id")) } ds = ds. @@ -223,6 +287,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da } var elRewardsQueryResult []struct { + Proposer uint64 `db:"proposer"` GroupId int64 `db:"result_group_id"` ElRewards decimal.Decimal `db:"el_rewards"` } @@ -237,8 +302,26 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da return nil, nil, fmt.Errorf("error retrieving data from table blocks: %w", err) } + // Add up EL rewards for _, entry := range elRewardsQueryResult { - elRewards[entry.GroupId] = entry.ElRewards + elReward := entry.ElRewards + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[entry.Proposer]; ok { + elReward = elReward.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + elRewards[entry.GroupId] = elRewards[entry.GroupId].Add(elReward) + } + + // Add up smoothing pool rewards + if rpInfos != nil && protocolModes.RocketPool { + for validatorIndex, groupId := range validatorGroupMap { + for epoch, reward := range rpInfos.Minipool[validatorIndex].SmoothingPoolRewards { + if epoch >= uint64(epochStart) && epoch <= uint64(epochEnd) { + elRewards[groupId] = elRewards[groupId].Add(reward) + } + } + } } // ------------------------------------------------------------------------------------------------------------------ @@ -260,11 +343,11 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da // ------------------------------------------------------------------------------------------------------------------ // Sort by group name, after this the name is no longer relevant if groupNameSearchEnabled && colSort.Column == enums.VDBSummaryColumns.Group { - sort.Slice(queryResult, func(i, j int) bool { + sort.Slice(queryResultSum, func(i, j int) bool { if colSort.Desc { - return queryResult[i].GroupName > queryResult[j].GroupName + return queryResultSum[i].GroupName > queryResultSum[j].GroupName } else { - return queryResult[i].GroupName < queryResult[j].GroupName + return queryResultSum[i].GroupName < queryResultSum[j].GroupName } }) } @@ -288,7 +371,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da GroupId: t.AllGroups, } - for _, queryEntry := range queryResult { + for _, queryEntry := range queryResultSum { resultEntry := t.VDBSummaryTableRow{ GroupId: queryEntry.GroupId, AverageNetworkEfficiency: averageNetworkEfficiency, @@ -346,7 +429,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da resultEntry.Proposals.Failed = queryEntry.BlocksScheduled - queryEntry.BlocksProposed // Rewards - resultEntry.Reward.Cl = utils.GWeiToWei(big.NewInt(queryEntry.ClRewards)) + resultEntry.Reward.Cl = queryEntry.ClRewards if _, ok := elRewards[queryEntry.GroupId]; ok { resultEntry.Reward.El = elRewards[queryEntry.GroupId] } @@ -455,7 +538,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da // ------------------------------------------------------------------------------------------------------------------ // Calculate the total - if len(queryResult) > 1 && len(result) > 0 { + if len(queryResultSum) > 1 && len(result) > 0 { // We have more than one group and at least one group remains after the filtering so we need to show the total row totalEntry := t.VDBSummaryTableRow{ GroupId: total.GroupId, @@ -501,7 +584,6 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex // TODO: implement data retrieval for the following new field // Fetch validator list for user dashboard from the dashboard table when querying the past sync committees as the rolling table might miss exited validators // TotalMissedRewards - // @DATA-ACCESS incorporate protocolModes // @DATA-ACCESS implement data retrieval for Rocket Pool stats (if present) var err error @@ -736,15 +818,23 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } } - _, ret.Apr.El, _, ret.Apr.Cl, err = d.internal_getElClAPR(ctx, dashboardId, groupId, hours) - if err != nil { - return nil, err - } - if len(validators) > 0 { validatorArr = validators } + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, groupId) + if err != nil { + return nil, fmt.Errorf("error retrieving rocketpool validators: %w", err) + } + } + + _, ret.Apr.El, _, ret.Apr.Cl, err = d.internal_getElClAPR(ctx, dashboardId, groupId, protocolModes, rpInfos, hours) + if err != nil { + return nil, err + } + pastSyncPeriodCutoff := utils.SyncPeriodOfEpoch(rows[0].EpochStart) currentSyncPeriod := utils.SyncPeriodOfEpoch(latestEpoch) err = d.readerDb.GetContext(ctx, &ret.SyncCommitteeCount.PastPeriods, `SELECT COUNT(*) FROM sync_committees WHERE period >= $1 AND period < $2 AND validatorindex = ANY($3)`, pastSyncPeriodCutoff, currentSyncPeriod, validatorArr) @@ -822,7 +912,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex return ret, nil } -func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId t.VDBId, groupId int64, hours int) (elIncome decimal.Decimal, elAPR float64, clIncome decimal.Decimal, clAPR float64, err error) { +func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId t.VDBId, groupId int64, protocolModes t.VDBProtocolModes, rpInfos *t.RPInfo, hours int) (elIncome decimal.Decimal, elAPR float64, clIncome decimal.Decimal, clAPR float64, err error) { table := "" switch hours { @@ -840,24 +930,29 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("invalid hours value: %v", hours) } - type RewardsResult struct { - EpochStart uint64 `db:"epoch_start"` - EpochEnd uint64 `db:"epoch_end"` - ValidatorCount uint64 `db:"validator_count"` - Reward sql.NullInt64 `db:"reward"` + type ClRewardsResult struct { + ValidatorIndex uint64 `db:"validator_index"` + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + Reward int64 `db:"reward"` + } + + type ElRewardsResult struct { + ValidatorIndex uint64 `db:"validator_index"` + Reward decimal.Decimal `db:"el_reward"` } - var rewardsResultTable RewardsResult - var rewardsResultTotal RewardsResult + var clRewardsResult []ClRewardsResult + var elRewardsResult []ElRewardsResult rewardsDs := goqu.Dialect("postgres"). From(goqu.L(fmt.Sprintf("%s AS r FINAL", table))). With("validators", goqu.L("(SELECT group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( - goqu.L("MIN(epoch_start) AS epoch_start"), - goqu.L("MAX(epoch_end) AS epoch_end"), - goqu.L("COUNT(*) AS validator_count"), - goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS reward")) + goqu.L("validator_index"), + goqu.L("epoch_start"), + goqu.L("epoch_end"), + goqu.L("(COALESCE(r.balance_end,0) + COALESCE(r.withdrawals_amount,0) - COALESCE(r.deposits_amount,0) - COALESCE(r.balance_start,0)) AS reward")) if len(dashboardId.Validators) > 0 { rewardsDs = rewardsDs. @@ -878,26 +973,51 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %w", err) } - err = d.clickhouseReader.GetContext(ctx, &rewardsResultTable, query, args...) - if err != nil || !rewardsResultTable.Reward.Valid { + err = d.clickhouseReader.SelectContext(ctx, &clRewardsResult, query, args...) + if err != nil || len(clRewardsResult) == 0 { return decimal.Zero, 0, decimal.Zero, 0, err } - if rewardsResultTable.ValidatorCount == 0 { - return decimal.Zero, 0, decimal.Zero, 0, nil + epochStart := uint64(math.MaxInt32) + epochEnd := uint64(0) + epochStartTotal := uint64(math.MaxInt32) + epochEndTotal := uint64(0) + + validatorGroupMap := make(map[uint64]int64) + + rewards := decimal.Zero + deposits := decimal.Zero + + for _, row := range clRewardsResult { + if row.EpochStart < epochStart { + epochStart = row.EpochStart + } + if row.EpochEnd > epochEnd { + epochEnd = row.EpochEnd + } + + validatorGroupMap[row.ValidatorIndex] = groupId + + reward := utils.GWeiToWei(big.NewInt(row.Reward)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[row.ValidatorIndex]; ok { + rewards = rewards.Add(reward.Mul(d.getRocketPoolOperatorFactor(rpValidator))) + deposits = deposits.Add(rpValidator.NodeDepositBalance) + continue + } + } + rewards = rewards.Add(reward) + deposits = deposits.Add(decimal.New(32, 18)) } aprDivisor := hours if hours == -1 { // for all time APR aprDivisor = 90 * 24 } - clAPR = ((float64(rewardsResultTable.Reward.Int64) / float64(aprDivisor)) / (float64(32e9) * float64(rewardsResultTable.ValidatorCount))) * 24.0 * 365.0 * 100.0 - if math.IsNaN(clAPR) { - clAPR = 0 + if !deposits.IsZero() { + clAPR = rewards.Div(decimal.NewFromInt(int64(aprDivisor))).Div(deposits).Mul(decimal.NewFromInt(24 * 365 * 100)).InexactFloat64() } - clIncome = decimal.NewFromInt(rewardsResultTable.Reward.Int64).Mul(decimal.NewFromInt(1e9)) - if hours == -1 { rewardsDs = rewardsDs. From(goqu.L("validator_dashboard_data_rolling_total AS r FINAL")) @@ -907,16 +1027,36 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %w", err) } - err = d.clickhouseReader.GetContext(ctx, &rewardsResultTotal, query, args...) - if err != nil || !rewardsResultTotal.Reward.Valid { + err = d.clickhouseReader.SelectContext(ctx, &clRewardsResult, query, args...) + if err != nil || len(clRewardsResult) == 0 { return decimal.Zero, 0, decimal.Zero, 0, err } - clIncome = decimal.NewFromInt(rewardsResultTotal.Reward.Int64).Mul(decimal.NewFromInt(1e9)) + rewards = decimal.Zero + for _, row := range clRewardsResult { + if row.EpochStart < epochStartTotal { + epochStartTotal = row.EpochStart + } + if row.EpochEnd > epochEndTotal { + epochEndTotal = row.EpochEnd + } + + reward := utils.GWeiToWei(big.NewInt(row.Reward)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[row.ValidatorIndex]; ok { + rewards = rewards.Add(reward.Mul(d.getRocketPoolOperatorFactor(rpValidator))) + continue + } + } + rewards = rewards.Add(reward) + } } + clIncome = rewards elDs := goqu.Dialect("postgres"). - Select(goqu.L("COALESCE(SUM(COALESCE(rb.value / 1e18, fee_recipient_reward)), 0) AS el_reward")). + Select( + goqu.L("b.proposer AS validator_index"), + goqu.L("COALESCE(SUM(COALESCE(rb.value, fee_recipient_reward * 1e18)), 0) AS el_reward")). From(goqu.L("blocks AS b")). LeftJoin(goqu.L("execution_payloads AS ep"), goqu.On(goqu.L("b.exec_block_hash = ep.block_hash"))). LeftJoin( @@ -924,12 +1064,20 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId From("relays_blocks"). Select( goqu.L("exec_block_hash"), + goqu.L("proposer_fee_recipient"), goqu.MAX("value").As("value")). Where(goqu.L("relays_blocks.exec_block_hash = b.exec_block_hash")). - GroupBy("exec_block_hash")).As("rb"), + GroupBy("exec_block_hash", "proposer_fee_recipient")).As("rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash")), ). - Where(goqu.L("b.status = '1'")) + Where(goqu.L("b.status = '1'")). + GroupBy(goqu.L("b.proposer")) + + if rpInfos != nil && protocolModes.RocketPool { + // Exclude rewards that went to the smoothing pool + elDs = elDs. + Where(goqu.L("(b.exec_fee_recipient != ? OR (rb.proposer_fee_recipient IS NOT NULL AND rb.proposer_fee_recipient != ?))", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress)) + } if len(dashboardId.Validators) > 0 { elDs = elDs. @@ -946,38 +1094,85 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId } elTableDs := elDs. - Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", rewardsResultTable.EpochStart, rewardsResultTable.EpochEnd)) + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", epochStart, epochEnd)) query, args, err = elTableDs.Prepared(true).ToSQL() if err != nil { return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %w", err) } - err = d.alloyReader.GetContext(ctx, &elIncome, query, args...) + err = d.alloyReader.SelectContext(ctx, &elRewardsResult, query, args...) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } - elIncomeFloat, _ := elIncome.Float64() // EL income is in ETH - elAPR = ((elIncomeFloat / float64(aprDivisor)) / (float64(32) * float64(rewardsResultTable.ValidatorCount))) * 24.0 * 365.0 * 100.0 - if math.IsNaN(elAPR) { - elAPR = 0 + + // Add up EL rewards + rewards = decimal.Zero + for _, row := range elRewardsResult { + reward := row.Reward + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[row.ValidatorIndex]; ok { + rewards = rewards.Add(reward.Mul(d.getRocketPoolOperatorFactor(rpValidator))) + continue + } + } + rewards = rewards.Add(reward) + } + + // Add up smoothing pool rewards + if rpInfos != nil && protocolModes.RocketPool { + for validatorIndex := range validatorGroupMap { + for epoch, reward := range rpInfos.Minipool[validatorIndex].SmoothingPoolRewards { + if epoch >= epochStart && epoch <= epochEnd { + rewards = rewards.Add(reward) + } + } + } + } + + if !deposits.IsZero() { + elAPR = rewards.Div(deposits).Div(decimal.NewFromInt(int64(aprDivisor))).Mul(decimal.NewFromInt(24 * 365 * 100)).InexactFloat64() } if hours == -1 { elTotalDs := elDs. - Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", rewardsResultTotal.EpochStart, rewardsResultTotal.EpochEnd)) + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", epochStartTotal, epochEndTotal)) query, args, err = elTotalDs.Prepared(true).ToSQL() if err != nil { return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %w", err) } - err = d.alloyReader.GetContext(ctx, &elIncome, query, args...) + err = d.alloyReader.SelectContext(ctx, &elRewardsResult, query, args...) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } + + // Add up EL rewards + rewards = decimal.Zero + for _, row := range elRewardsResult { + reward := row.Reward + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[row.ValidatorIndex]; ok { + rewards = rewards.Add(reward.Mul(d.getRocketPoolOperatorFactor(rpValidator))) + continue + } + } + rewards = rewards.Add(reward) + } + + // Add up smoothing pool rewards + if rpInfos != nil && protocolModes.RocketPool { + for validatorIndex := range validatorGroupMap { + for epoch, reward := range rpInfos.Minipool[validatorIndex].SmoothingPoolRewards { + if epoch >= epochStart && epoch <= epochEnd { + rewards = rewards.Add(reward) + } + } + } + } } - elIncome = elIncome.Mul(decimal.NewFromInt(1e18)) + elIncome = rewards return elIncome, elAPR, clIncome, clAPR, nil } diff --git a/backend/pkg/api/data_access/vdb_withdrawals.go b/backend/pkg/api/data_access/vdb_withdrawals.go index 93124cd73..4974e8571 100644 --- a/backend/pkg/api/data_access/vdb_withdrawals.go +++ b/backend/pkg/api/data_access/vdb_withdrawals.go @@ -11,6 +11,8 @@ import ( "strconv" "strings" + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/exp" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/gobitfly/beaconchain/pkg/api/enums" @@ -39,11 +41,17 @@ func (d *DataAccessService) GetValidatorDashboardWithdrawals(ctx context.Context } // Prepare the sorting + isReverseDirection := (colSort.Desc && !currentCursor.IsReverse()) || (!colSort.Desc && currentCursor.IsReverse()) sortSearchDirection := ">" - sortSearchOrder := " ASC" - if (colSort.Desc && !currentCursor.IsReverse()) || (!colSort.Desc && currentCursor.IsReverse()) { + if isReverseDirection { sortSearchDirection = "<" - sortSearchOrder = " DESC" + } + + orderFunc := func(col string) exp.OrderedExpression { + if isReverseDirection { + return goqu.I(col).Desc() + } + return goqu.I(col).Asc() } // Analyze the search term @@ -119,28 +127,20 @@ func (d *DataAccessService) GetValidatorDashboardWithdrawals(ctx context.Context Amount uint64 `db:"amount"` }{} - queryParams := []interface{}{} - withdrawalsQuery := ` - SELECT - w.block_slot, - b.exec_block_number, - w.withdrawalindex, - w.validatorindex, - w.address, - w.amount - FROM - blocks_withdrawals w - INNER JOIN blocks b ON w.block_slot = b.slot AND w.block_root = b.blockroot AND b.status = '1' - ` - - // Limit the query to relevant validators - queryParams = append(queryParams, pq.Array(validators)) - whereQuery := fmt.Sprintf(` - WHERE - validatorindex = ANY ($%d)`, len(queryParams)) + ds := goqu.Dialect("postgres"). + Select( + goqu.L("w.block_slot"), + goqu.L("b.exec_block_number"), + goqu.L("w.withdrawalindex"), + goqu.L("w.validatorindex"), + goqu.L("w.address"), + goqu.L("w.amount"), + ). + From(goqu.L("blocks_withdrawals AS w")). + InnerJoin(goqu.L("blocks AS b"), goqu.On(goqu.L("w.block_slot = b.slot AND w.block_root = b.blockroot AND b.status = '1'"))). + Where(goqu.L("validatorindex = ANY(?)", pq.Array(validators))) // Limit the query using sorting and the cursor - orderQuery := "" sortColName := "" sortColCursor := interface{}(nil) switch colSort.Column { @@ -160,35 +160,38 @@ func (d *DataAccessService) GetValidatorDashboardWithdrawals(ctx context.Context colSort.Column == enums.VDBWithdrawalsColumns.Slot { if currentCursor.IsValid() { // If we have a valid cursor only check the results before/after it - queryParams = append(queryParams, currentCursor.Slot, currentCursor.WithdrawalIndex) - whereQuery += fmt.Sprintf(" AND (w.block_slot%[1]s$%[2]d OR (w.block_slot=$%[2]d AND w.withdrawalindex%[1]s$%[3]d))", - sortSearchDirection, len(queryParams)-1, len(queryParams)) + ds = ds. + Where(goqu.L(fmt.Sprintf("(w.block_slot%[1]s? OR (w.block_slot=? AND w.withdrawalindex%[1]s?))", + sortSearchDirection), currentCursor.Slot, currentCursor.Slot, currentCursor.WithdrawalIndex)) } - orderQuery = fmt.Sprintf(" ORDER BY w.block_slot %[1]s, w.withdrawalindex %[1]s", sortSearchOrder) + ds = ds. + Order(orderFunc("w.block_slot"), orderFunc("w.withdrawalindex")) } else { if currentCursor.IsValid() { // If we have a valid cursor only check the results before/after it - queryParams = append(queryParams, sortColCursor, currentCursor.Slot, currentCursor.WithdrawalIndex) // The additional WHERE requirement is // WHERE sortColName>cursor OR (sortColName=cursor AND (block_slot>cursor OR (block_slot=cursor AND withdrawalindex>cursor))) // with the > flipped if the sort is descending - whereQuery += fmt.Sprintf(" AND (%[1]s%[2]s$%[3]d OR (%[1]s=$%[3]d AND (w.block_slot%[2]s$%[4]d OR (w.block_slot=$%[4]d AND w.withdrawalindex%[2]s$%[5]d))))", - sortColName, sortSearchDirection, len(queryParams)-2, len(queryParams)-1, len(queryParams)) + ds = ds. + Where(goqu.L(fmt.Sprintf("(%[1]s%[2]s? OR (%[1]s=? AND (w.block_slot%[2]s? OR (w.block_slot=? AND w.withdrawalindex%[2]s?))))", sortColName, sortSearchDirection), sortColCursor, sortColCursor, currentCursor.Slot, currentCursor.Slot, currentCursor.WithdrawalIndex)) } // The ordering is // ORDER BY sortColName ASC, block_slot ASC, withdrawalindex ASC // with the ASC flipped if the sort is descending - orderQuery = fmt.Sprintf(" ORDER BY %[1]s %[2]s, w.block_slot %[2]s, w.withdrawalindex %[2]s", - sortColName, sortSearchOrder) + ds = ds. + Order(orderFunc(sortColName), orderFunc("w.block_slot"), orderFunc("w.withdrawalindex")) } - queryParams = append(queryParams, limit+1) - limitQuery := fmt.Sprintf(" LIMIT $%d", len(queryParams)) + ds = ds. + Limit(uint(limit) + 1) - withdrawalsQuery += whereQuery + orderQuery + limitQuery + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, nil, fmt.Errorf("error preparing withdrawals query: %w", err) + } - err = d.readerDb.SelectContext(ctx, &queryResult, withdrawalsQuery, queryParams...) + err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { return nil, nil, fmt.Errorf("error getting withdrawals for dashboardId: %d (%d validators): %w", dashboardId.Id, len(validators), err) } @@ -223,17 +226,31 @@ func (d *DataAccessService) GetValidatorDashboardWithdrawals(ctx context.Context return nil, nil, err } + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, nil, err + } + } + // Create the result cursorData := make([]t.WithdrawalsCursor, 0) for i, withdrawal := range queryResult { address := hexutil.Encode(withdrawal.Address) + amount := utils.GWeiToWei(big.NewInt(int64(withdrawal.Amount))) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[withdrawal.ValidatorIndex]; ok { + amount = amount.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } result = append(result, t.VDBWithdrawalsTableRow{ Epoch: withdrawal.BlockSlot / utils.Config.Chain.ClConfig.SlotsPerEpoch, Slot: withdrawal.BlockSlot, Index: withdrawal.ValidatorIndex, Recipient: *addressMapping[address], GroupId: validatorGroupMap[withdrawal.ValidatorIndex], - Amount: utils.GWeiToWei(big.NewInt(int64(withdrawal.Amount))), + Amount: amount, }) result[i].Recipient.IsContract = contractStatuses[i] == types.CONTRACT_CREATION || contractStatuses[i] == types.CONTRACT_PRESENT cursorData = append(cursorData, t.WithdrawalsCursor{ @@ -272,6 +289,12 @@ func (d *DataAccessService) GetValidatorDashboardWithdrawals(ctx context.Context nextData.GroupId = validatorGroupMap[nextData.Index] // TODO integrate label/ens data for "next" row // nextData.Recipient.Ens = addressEns[string(nextData.Recipient.Hash)] + + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[nextData.Index]; ok { + nextData.Amount = nextData.Amount.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } } else { // If there is no next data, add a missing estimate row nextData = &t.VDBWithdrawalsTableRow{ @@ -458,43 +481,31 @@ func (d *DataAccessService) GetValidatorDashboardTotalWithdrawals(ctx context.Co Amount int64 `db:"acc_withdrawals_amount"` }{} - withdrawalsQuery := ` - WITH validators AS ( - SELECT validator_index FROM users_val_dashboards_validators WHERE (dashboard_id = $1) - ) - SELECT - validator_index, - SUM(withdrawals_amount) AS acc_withdrawals_amount, - MAX(epoch_end) AS epoch_end - FROM validator_dashboard_data_rolling_total FINAL - INNER JOIN validators v ON validator_dashboard_data_rolling_total.validator_index = v.validator_index - WHERE validator_index IN (select validator_index FROM validators) - GROUP BY validator_index - ` - - if dashboardId.Validators != nil { - withdrawalsQuery = ` - SELECT - validator_index, - SUM(withdrawals_amount) AS acc_withdrawals_amount, - MAX(epoch_end) AS epoch_end - from validator_dashboard_data_rolling_total FINAL - where validator_index IN ($1) - group by validator_index - ` - } + ds := goqu.Dialect("postgres"). + Select( + goqu.L("validator_index"), + goqu.L("SUM(withdrawals_amount) AS acc_withdrawals_amount"), + goqu.L("MAX(epoch_end) AS epoch_end"), + ). + From(goqu.L("validator_dashboard_data_rolling_total AS t FINAL")). + GroupBy("validator_index") - dashboardValidators := make([]t.VDBValidator, 0) - if dashboardId.Validators != nil { - dashboardValidators = dashboardId.Validators + if dashboardId.Validators == nil { + ds = ds. + With("validators", goqu.L("(SELECT validator_index FROM users_val_dashboards_validators WHERE (dashboard_id = ?))", dashboardId.Id)). + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("t.validator_index = v.validator_index"))). + Where(goqu.L("validator_index IN (SELECT validator_index FROM validators)")) + } else { + ds = ds. + Where(goqu.L("validator_index IN ?", dashboardId.Validators)) } - if len(dashboardValidators) > 0 { - err = d.clickhouseReader.SelectContext(ctx, &queryResult, withdrawalsQuery, dashboardValidators) - } else { - err = d.clickhouseReader.SelectContext(ctx, &queryResult, withdrawalsQuery, dashboardId.Id) + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, fmt.Errorf("error preparing total withdrawals query: %w", err) } + err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { return nil, fmt.Errorf("error getting total withdrawals for validators: %+v: %w", dashboardId, err) } @@ -504,34 +515,54 @@ func (d *DataAccessService) GetValidatorDashboardTotalWithdrawals(ctx context.Co return result, nil } - var totalAmount int64 + var rpInfos *t.RPInfo + if protocolModes.RocketPool { + rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups) + if err != nil { + return nil, err + } + } + var validators []t.VDBValidator lastEpoch := queryResult[0].Epoch lastSlot := (lastEpoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1 for _, res := range queryResult { - // Calculate the total amount of withdrawals - totalAmount += res.Amount + amount := utils.GWeiToWei(big.NewInt(res.Amount)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[res.ValidatorIndex]; ok { + amount = amount.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + result.TotalAmount = result.TotalAmount.Add(amount) // Calculate the current validators validators = append(validators, res.ValidatorIndex) } - var latestWithdrawalsAmount int64 - err = d.readerDb.GetContext(ctx, &latestWithdrawalsAmount, ` + err = d.readerDb.SelectContext(ctx, &queryResult, ` SELECT - COALESCE(SUM(w.amount), 0) + w.validatorindex AS validator_index, + SUM(w.amount) AS acc_withdrawals_amount FROM blocks_withdrawals w INNER JOIN blocks b ON w.block_slot = b.slot AND w.block_root = b.blockroot AND b.status = '1' WHERE w.block_slot > $1 AND w.validatorindex = ANY ($2) + GROUP BY w.validatorindex `, lastSlot, validators) if err != nil && !errors.Is(err, sql.ErrNoRows) { return nil, fmt.Errorf("error getting latest withdrawals for validators: %+v: %w", dashboardId, err) } - totalAmount += latestWithdrawalsAmount - result.TotalAmount = utils.GWeiToWei(big.NewInt(totalAmount)) + for _, res := range queryResult { + amount := utils.GWeiToWei(big.NewInt(res.Amount)) + if rpInfos != nil && protocolModes.RocketPool { + if rpValidator, ok := rpInfos.Minipool[res.ValidatorIndex]; ok { + amount = amount.Mul(d.getRocketPoolOperatorFactor(rpValidator)) + } + } + result.TotalAmount = result.TotalAmount.Add(amount) + } return result, nil } diff --git a/backend/pkg/api/types/archiver.go b/backend/pkg/api/types/archiver.go index ac1666e4a..a2ead994e 100644 --- a/backend/pkg/api/types/archiver.go +++ b/backend/pkg/api/types/archiver.go @@ -1,6 +1,8 @@ package types -import "github.com/gobitfly/beaconchain/pkg/api/enums" +import ( + "github.com/gobitfly/beaconchain/pkg/api/enums" +) type ArchiverDashboard struct { DashboardId uint64 diff --git a/backend/pkg/api/types/rocketpool.go b/backend/pkg/api/types/rocketpool.go index 1d173bce8..d34324e5f 100644 --- a/backend/pkg/api/types/rocketpool.go +++ b/backend/pkg/api/types/rocketpool.go @@ -8,3 +8,15 @@ type RPNetworkStats struct { EffectiveRPLStaked decimal.Decimal `db:"effective_rpl_staked"` RPLPrice decimal.Decimal `db:"rpl_price"` } + +type RPInfo struct { + Minipool map[uint64]RPMinipoolInfo + SmoothingPoolAddress []byte +} + +type RPMinipoolInfo struct { + NodeFee float64 + NodeDepositBalance decimal.Decimal + UserDepositBalance decimal.Decimal + SmoothingPoolRewards map[uint64]decimal.Decimal +} diff --git a/backend/pkg/api/types/validator_dashboard.go b/backend/pkg/api/types/validator_dashboard.go index db3886072..ac38ba94c 100644 --- a/backend/pkg/api/types/validator_dashboard.go +++ b/backend/pkg/api/types/validator_dashboard.go @@ -98,11 +98,6 @@ type VDBGroupSummaryData struct { Apr ClElValue[float64] `json:"apr"` Luck Luck `json:"luck"` - - RocketPool struct { - Minipools uint64 `json:"minipools"` - Collateral float64 `json:"collateral"` - } `json:"rocket_pool,omitempty"` } type GetValidatorDashboardGroupSummaryResponse ApiDataResponse[VDBGroupSummaryData] diff --git a/frontend/types/api/validator_dashboard.ts b/frontend/types/api/validator_dashboard.ts index e85eac671..ac4546c0e 100644 --- a/frontend/types/api/validator_dashboard.ts +++ b/frontend/types/api/validator_dashboard.ts @@ -81,10 +81,6 @@ export interface VDBGroupSummaryData { missed_rewards: VDBGroupSummaryMissedRewards; apr: ClElValue; luck: Luck; - rocket_pool?: { - minipools: number /* uint64 */; - collateral: number /* float64 */; - }; } export type GetValidatorDashboardGroupSummaryResponse = ApiDataResponse; export type GetValidatorDashboardSummaryChartResponse = ApiDataResponse>; // line chart, series id is group id