Skip to content

Commit

Permalink
feat(BEDS-151): Implement val dashboard RPL protocol mode
Browse files Browse the repository at this point in the history
  • Loading branch information
Eisei24 committed Nov 27, 2024
1 parent 1db1b7c commit 989033f
Show file tree
Hide file tree
Showing 11 changed files with 1,077 additions and 411 deletions.
10 changes: 8 additions & 2 deletions backend/pkg/api/data_access/mobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex
data.NetworkEfficiency = utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime])

protocolModes := t.VDBProtocolModes{RocketPool: true}
rpInfos, err := d.getRocketPoolInfos(ctx, wrappedDashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool infos: %w", err)
}

// Validator status
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand Down Expand Up @@ -262,7 +268,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveApr := func(hours int, apr *float64) {
eg.Go(func() error {
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand All @@ -273,7 +279,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveRewards := func(hours int, rewards *decimal.Decimal) {
eg.Go(func() error {
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
52 changes: 40 additions & 12 deletions backend/pkg/api/data_access/vdb_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
)

func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBBlocksColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBBlocksTableRow, *t.Paging, error) {
// @DATA-ACCESS incorporate protocolModes

// -------------------------------------
// Setup
var err error
Expand All @@ -42,6 +40,15 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
}
}

// Get the rocketpool minipool infos
var rpInfos *t.RPInfo
if protocolModes.RocketPool {
rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, nil, err
}
}

searchPubkey := regexp.MustCompile(`^0x[0-9a-fA-F]{96}$`).MatchString(search)
searchGroup := regexp.MustCompile(`^[a-zA-Z0-9_\-.\ ]+$`).MatchString(search)
searchIndex := regexp.MustCompile(`^[0-9]+$`).MatchString(search)
Expand Down Expand Up @@ -175,6 +182,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.COALESCE(goqu.L("rb.value / 1e18"), goqu.I("ep.fee_recipient_reward")).As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
blocksDs = blocksDs.
SelectAppend(goqu.L("(blocks.exec_fee_recipient = ? AND (rb.proposer_fee_recipient IS NULL OR rb.proposer_fee_recipient = ?)) AS is_smoothing_pool", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress))
}

// 3. Sorting and pagination
defaultColumns := []t.SortColumn{
{Column: enums.VDBBlocksColumns.Slot.ToExpr(), Desc: true, Offset: currentCursor.Slot},
Expand Down Expand Up @@ -272,6 +284,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.L("NULL::NUMERIC").As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
scheduledDs = scheduledDs.
SelectAppend(goqu.L("NULL::BOOL").As("is_smoothing_pool"))
}

// We don't have access to exec_block_number and status for a WHERE without wrapping the query so if we sort by those get all the data
if colSort.Column == enums.VDBBlocksColumns.Proposer || colSort.Column == enums.VDBBlocksColumns.Slot {
scheduledDs = scheduledDs.
Expand Down Expand Up @@ -307,16 +324,17 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
// -------------------------------------
// Execute query
var proposals []struct {
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
IsSmoothingPool sql.NullBool `db:"is_smoothing_pool"`

// for cursor only
Reward decimal.Decimal
Expand Down Expand Up @@ -444,9 +462,19 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
TraceIdx: -1,
})
reward.El = proposal.ElReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool && !(proposal.IsSmoothingPool.Valid && proposal.IsSmoothingPool.Bool) {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.El = reward.El.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
if clReward, ok := clRewards[proposal.Slot]; ok && clReward.Valid {
reward.Cl = clReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.Cl = reward.Cl.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
proposals[i].Reward = proposal.ElReward.Decimal.Add(proposal.ClReward.Decimal)
data[i].Reward = &reward
Expand Down
136 changes: 136 additions & 0 deletions backend/pkg/api/data_access/vdb_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"time"

"github.com/doug-martin/goqu/v9"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gobitfly/beaconchain/pkg/api/enums"
t "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
)

//////////////////// Helper functions (must be used by more than one VDB endpoint!)
Expand Down Expand Up @@ -130,3 +133,136 @@ func (d *DataAccessService) getTimeToNextWithdrawal(distance uint64) time.Time {

return timeToWithdrawal
}

func (d *DataAccessService) getRocketPoolInfos(ctx context.Context, dashboardId t.VDBId, groupId int64) (*t.RPInfo, error) {
wg := errgroup.Group{}

queryResult := []struct {
ValidatorIndex uint64 `db:"validatorindex"`
NodeAddress []byte `db:"node_address"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
EndTime sql.NullTime `db:"end_time"`
SmoothingPoolAddress []byte `db:"smoothing_pool_address"`
SmoothingPoolEth *decimal.Decimal `db:"smoothing_pool_eth"`
}{}

wg.Go(func() error {
ds := goqu.Dialect("postgres").
Select(
goqu.L("v.validatorindex"),
goqu.L("rplm.node_address"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance"),
goqu.L("rplrs.end_time"),
goqu.L("rploc.smoothing_pool_address"),
goqu.L("rplrs.smoothing_pool_eth"),
).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))).
LeftJoin(goqu.L("rocketpool_rewards_summary AS rplrs"), goqu.On(goqu.L("rplm.node_address = rplrs.node_address"))).
LeftJoin(goqu.L("rocketpool_onchain_configs AS rploc"), goqu.On(goqu.L("rplm.rocketpool_storage_address = rploc.rocketpool_storage_address"))).
Where(goqu.L("rplm.node_deposit_balance IS NOT NULL")).
Where(goqu.L("rplm.user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))

if groupId != t.AllGroups {
ds = ds.
Where(goqu.L("uvdv.group_id = ?", groupId))
}
} else {
ds = ds.
Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

return nil
})

nodeMinipoolCount := make(map[string]uint64)
wg.Go(func() error {
queryResult := []struct {
NodeAddress []byte `db:"node_address"`
MinipoolCount uint64 `db:"minipool_count"`
}{}

err := d.alloyReader.SelectContext(ctx, &queryResult, `
SELECT
node_address,
COUNT(node_address) AS minipool_count
FROM rocketpool_minipools
GROUP BY node_address`)
if err != nil {
return fmt.Errorf("error retrieving rocketpool node deposits data: %w", err)
}

for _, res := range queryResult {
node := hexutil.Encode(res.NodeAddress)
nodeMinipoolCount[node] = res.MinipoolCount
}

return nil
})

err := wg.Wait()
if err != nil {
return nil, err
}

if len(queryResult) == 0 {
return nil, nil
}

rpInfo := t.RPInfo{
Minipool: make(map[uint64]t.RPMinipoolInfo),
// Smoothing pool address is the same for all nodes on the network so take the first result
SmoothingPoolAddress: queryResult[0].SmoothingPoolAddress,
}

for _, res := range queryResult {
if _, ok := rpInfo.Minipool[res.ValidatorIndex]; !ok {
rpInfo.Minipool[res.ValidatorIndex] = t.RPMinipoolInfo{
NodeFee: res.NodeFee,
NodeDepositBalance: res.NodeDepositBalance,
UserDepositBalance: res.UserDepositBalance,
SmoothingPoolRewards: make(map[uint64]decimal.Decimal),
}
}

node := hexutil.Encode(res.NodeAddress)
if res.EndTime.Valid && res.SmoothingPoolEth != nil {
epoch := uint64(utils.TimeToEpoch(res.EndTime.Time))
splitReward := res.SmoothingPoolEth.Div(decimal.NewFromUint64(nodeMinipoolCount[node]))
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch] =
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch].Add(splitReward)
}
}

return &rpInfo, nil
}

func (d *DataAccessService) getRocketPoolOperatorFactor(minipool t.RPMinipoolInfo) decimal.Decimal {
fullDeposit := minipool.UserDepositBalance.Add(minipool.NodeDepositBalance)
operatorShare := minipool.NodeDepositBalance.Div(fullDeposit)
invOperatorShare := decimal.NewFromInt(1).Sub(operatorShare)

commissionReward := invOperatorShare.Mul(decimal.NewFromFloat(minipool.NodeFee))
operatorFactor := operatorShare.Add(commissionReward)

return operatorFactor
}
69 changes: 16 additions & 53 deletions backend/pkg/api/data_access/vdb_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
})
}

rpInfos, err := d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool validators: %w", err)
}

// Validator status and balance
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand All @@ -346,10 +351,13 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
data.Groups = append(data.Groups, t.VDBOverviewGroup{Id: t.DefaultGroupId, Name: t.DefaultGroupName, Count: uint64(len(validators))})
}

// Status
// Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators
var nonRpDashboardId t.VDBId

for _, validator := range validators {
metadata := validatorMapping.ValidatorMetadata[validator]

// Status
switch constypes.ValidatorDbStatus(metadata.Status) {
case constypes.DbExitingOnline, constypes.DbSlashingOnline, constypes.DbActiveOnline:
data.Validators.Online++
Expand All @@ -362,61 +370,16 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
case constypes.DbExited:
data.Validators.Exited++
}
}

// Find rocketpool validators
type RpOperatorInfo struct {
ValidatorIndex uint64 `db:"validatorindex"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
}
var queryResult []RpOperatorInfo

ds := goqu.Dialect("postgres").
Select(
goqu.L("v.validatorindex"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance")).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))).
Where(goqu.L("node_deposit_balance IS NOT NULL")).
Where(goqu.L("user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))
} else {
ds = ds.
Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

rpValidators := make(map[uint64]RpOperatorInfo)
for _, res := range queryResult {
rpValidators[res.ValidatorIndex] = res
}

// Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators
var nonRpDashboardId t.VDBId

for _, validator := range validators {
metadata := validatorMapping.ValidatorMetadata[validator]
// Balance
validatorBalance := utils.GWeiToWei(big.NewInt(int64(metadata.Balance)))
effectiveBalance := utils.GWeiToWei(big.NewInt(int64(metadata.EffectiveBalance)))

if rpValidator, ok := rpValidators[validator]; ok {
if rpInfos == nil {
data.Balances.Total = data.Balances.Total.Add(validatorBalance)

nonRpDashboardId.Validators = append(nonRpDashboardId.Validators, validator)
} else if rpValidator, ok := rpInfos.Minipool[validator]; ok {
if protocolModes.RocketPool {
// Calculate the balance of the operator
fullDeposit := rpValidator.UserDepositBalance.Add(rpValidator.NodeDepositBalance)
Expand Down Expand Up @@ -457,7 +420,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
retrieveRewardsAndEfficiency := func(table string, hours int, rewards *t.ClElValue[decimal.Decimal], apr *t.ClElValue[float64], efficiency *float64) {
// Rewards + APR
eg.Go(func() error {
(*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, -1, hours)
(*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 989033f

Please sign in to comment.