Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 151/add rp mode test #1167

Open
wants to merge 5 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions backend/pkg/api/data_access/mobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex
data.NetworkEfficiency = utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime])

protocolModes := t.VDBProtocolModes{RocketPool: true}
rpInfos, err := d.getRocketPoolInfos(ctx, wrappedDashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool infos: %w", err)
}

// Validator status
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand Down Expand Up @@ -262,7 +268,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveApr := func(hours int, apr *float64) {
eg.Go(func() error {
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand All @@ -273,7 +279,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveRewards := func(hours int, rewards *decimal.Decimal) {
eg.Go(func() error {
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
52 changes: 40 additions & 12 deletions backend/pkg/api/data_access/vdb_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
)

func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBBlocksColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBBlocksTableRow, *t.Paging, error) {
// @DATA-ACCESS incorporate protocolModes

// -------------------------------------
// Setup
var err error
Expand All @@ -42,6 +40,15 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
}
}

// Get the rocketpool minipool infos
var rpInfos *t.RPInfo
if protocolModes.RocketPool {
rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, nil, err
}
}

searchPubkey := regexp.MustCompile(`^0x[0-9a-fA-F]{96}$`).MatchString(search)
searchGroup := regexp.MustCompile(`^[a-zA-Z0-9_\-.\ ]+$`).MatchString(search)
searchIndex := regexp.MustCompile(`^[0-9]+$`).MatchString(search)
Expand Down Expand Up @@ -175,6 +182,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.COALESCE(goqu.L("rb.value / 1e18"), goqu.I("ep.fee_recipient_reward")).As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
blocksDs = blocksDs.
SelectAppend(goqu.L("(blocks.exec_fee_recipient = ? AND (rb.proposer_fee_recipient IS NULL OR rb.proposer_fee_recipient = ?)) AS is_smoothing_pool", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress))
}

// 3. Sorting and pagination
defaultColumns := []t.SortColumn{
{Column: enums.VDBBlocksColumns.Slot.ToExpr(), Desc: true, Offset: currentCursor.Slot},
Expand Down Expand Up @@ -272,6 +284,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.L("NULL::NUMERIC").As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
scheduledDs = scheduledDs.
SelectAppend(goqu.L("NULL::BOOL").As("is_smoothing_pool"))
}

// We don't have access to exec_block_number and status for a WHERE without wrapping the query so if we sort by those get all the data
if colSort.Column == enums.VDBBlocksColumns.Proposer || colSort.Column == enums.VDBBlocksColumns.Slot {
scheduledDs = scheduledDs.
Expand Down Expand Up @@ -307,16 +324,17 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
// -------------------------------------
// Execute query
var proposals []struct {
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
IsSmoothingPool sql.NullBool `db:"is_smoothing_pool"`

// for cursor only
Reward decimal.Decimal
Expand Down Expand Up @@ -444,9 +462,19 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
TraceIdx: -1,
})
reward.El = proposal.ElReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool && !(proposal.IsSmoothingPool.Valid && proposal.IsSmoothingPool.Bool) {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.El = reward.El.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
if clReward, ok := clRewards[proposal.Slot]; ok && clReward.Valid {
reward.Cl = clReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.Cl = reward.Cl.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
proposals[i].Reward = proposal.ElReward.Decimal.Add(proposal.ClReward.Decimal)
data[i].Reward = &reward
Expand Down
136 changes: 136 additions & 0 deletions backend/pkg/api/data_access/vdb_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"time"

"github.com/doug-martin/goqu/v9"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gobitfly/beaconchain/pkg/api/enums"
t "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
)

//////////////////// Helper functions (must be used by more than one VDB endpoint!)
Expand Down Expand Up @@ -130,3 +133,136 @@ func (d *DataAccessService) getTimeToNextWithdrawal(distance uint64) time.Time {

return timeToWithdrawal
}

func (d *DataAccessService) getRocketPoolInfos(ctx context.Context, dashboardId t.VDBId, groupId int64) (*t.RPInfo, error) {
var rpInfo t.RPInfo
wg := errgroup.Group{}

queryResult := []struct {
ValidatorIndex uint64 `db:"validator_index"`
NodeAddress []byte `db:"node_address"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
EndTime sql.NullTime `db:"end_time"`
SmoothingPoolEth *decimal.Decimal `db:"smoothing_pool_eth"`
}{}

wg.Go(func() error {
ds := goqu.Dialect("postgres").
Select(
goqu.L("rplm.validator_index"),
goqu.L("rplm.node_address"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance"),
goqu.L("rplrs.end_time"),
goqu.L("rplrs.smoothing_pool_eth"),
).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("rocketpool_rewards_summary AS rplrs"), goqu.On(goqu.L("rplm.node_address = rplrs.node_address"))).
Where(goqu.L("rplm.node_deposit_balance IS NOT NULL")).
Where(goqu.L("rplm.user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = rplm.validator_index"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))

if groupId != t.AllGroups {
ds = ds.
Where(goqu.L("uvdv.group_id = ?", groupId))
}
} else {
ds = ds.
Where(goqu.L("rplm.validator_index = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

return nil
})

nodeMinipoolCount := make(map[string]uint64)
wg.Go(func() error {
queryResult := []struct {
NodeAddress []byte `db:"node_address"`
MinipoolCount uint64 `db:"minipool_count"`
SmoothingPoolAddress []byte `db:"smoothing_pool_address"`
}{}

err := d.alloyReader.SelectContext(ctx, &queryResult, `
SELECT
rplm.node_address,
COUNT(rplm.node_address) AS minipool_count,
rploc.smoothing_pool_address
FROM rocketpool_minipools AS rplm
LEFT JOIN rocketpool_onchain_configs AS rploc ON rplm.rocketpool_storage_address = rploc.rocketpool_storage_address
GROUP BY node_address, smoothing_pool_address`)
if err != nil {
return fmt.Errorf("error retrieving rocketpool node deposits data: %w", err)
}

for _, res := range queryResult {
node := hexutil.Encode(res.NodeAddress)
nodeMinipoolCount[node] = res.MinipoolCount
}
if len(queryResult) > 0 {
// Smoothing pool address is the same for all nodes on the network so take the first result
rpInfo.SmoothingPoolAddress = queryResult[0].SmoothingPoolAddress
}

return nil
})

err := wg.Wait()
if err != nil {
return nil, err
}

if len(queryResult) == 0 {
return nil, nil
}

rpInfo.Minipool = make(map[uint64]t.RPMinipoolInfo)

for _, res := range queryResult {
if _, ok := rpInfo.Minipool[res.ValidatorIndex]; !ok {
rpInfo.Minipool[res.ValidatorIndex] = t.RPMinipoolInfo{
NodeFee: res.NodeFee,
NodeDepositBalance: res.NodeDepositBalance,
UserDepositBalance: res.UserDepositBalance,
SmoothingPoolRewards: make(map[uint64]decimal.Decimal),
}
}

node := hexutil.Encode(res.NodeAddress)
if res.EndTime.Valid && res.SmoothingPoolEth != nil {
epoch := uint64(utils.TimeToEpoch(res.EndTime.Time))
splitReward := res.SmoothingPoolEth.Div(decimal.NewFromUint64(nodeMinipoolCount[node]))
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch] =
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch].Add(splitReward)
}
Comment on lines +238 to +253
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite complicated, I don't think we need to handle multiple result rows with the same validator index, do we? Should be unique (unless we're planning for reusable indices maybe, but then the math would be wrong).

But I could be missing something, so as it's also not wrong I'll keep it as is

}

return &rpInfo, nil
}

func (d *DataAccessService) getRocketPoolOperatorFactor(minipool t.RPMinipoolInfo) decimal.Decimal {
fullDeposit := minipool.UserDepositBalance.Add(minipool.NodeDepositBalance)
operatorShare := minipool.NodeDepositBalance.Div(fullDeposit)
invOperatorShare := decimal.NewFromInt(1).Sub(operatorShare)

commissionReward := invOperatorShare.Mul(decimal.NewFromFloat(minipool.NodeFee))
operatorFactor := operatorShare.Add(commissionReward)

return operatorFactor
}
69 changes: 16 additions & 53 deletions backend/pkg/api/data_access/vdb_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
})
}

rpInfos, err := d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool validators: %w", err)
}

// Validator status and balance
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand All @@ -346,10 +351,13 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
data.Groups = append(data.Groups, t.VDBOverviewGroup{Id: t.DefaultGroupId, Name: t.DefaultGroupName, Count: uint64(len(validators))})
}

// Status
// Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators
var nonRpDashboardId t.VDBId

for _, validator := range validators {
metadata := validatorMapping.ValidatorMetadata[validator]

// Status
switch constypes.ValidatorDbStatus(metadata.Status) {
case constypes.DbExitingOnline, constypes.DbSlashingOnline, constypes.DbActiveOnline:
data.Validators.Online++
Expand All @@ -362,61 +370,16 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
case constypes.DbExited:
data.Validators.Exited++
}
}

// Find rocketpool validators
type RpOperatorInfo struct {
ValidatorIndex uint64 `db:"validatorindex"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
}
var queryResult []RpOperatorInfo

ds := goqu.Dialect("postgres").
Select(
goqu.L("v.validatorindex"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance")).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))).
Where(goqu.L("node_deposit_balance IS NOT NULL")).
Where(goqu.L("user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))
} else {
ds = ds.
Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

rpValidators := make(map[uint64]RpOperatorInfo)
for _, res := range queryResult {
rpValidators[res.ValidatorIndex] = res
}

// Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators
var nonRpDashboardId t.VDBId

for _, validator := range validators {
metadata := validatorMapping.ValidatorMetadata[validator]
// Balance
validatorBalance := utils.GWeiToWei(big.NewInt(int64(metadata.Balance)))
effectiveBalance := utils.GWeiToWei(big.NewInt(int64(metadata.EffectiveBalance)))

if rpValidator, ok := rpValidators[validator]; ok {
if rpInfos == nil {
data.Balances.Total = data.Balances.Total.Add(validatorBalance)

nonRpDashboardId.Validators = append(nonRpDashboardId.Validators, validator)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does the same as the else block, cleaned this up slightly

} else if rpValidator, ok := rpInfos.Minipool[validator]; ok {
if protocolModes.RocketPool {
// Calculate the balance of the operator
fullDeposit := rpValidator.UserDepositBalance.Add(rpValidator.NodeDepositBalance)
Expand Down Expand Up @@ -457,7 +420,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
retrieveRewardsAndEfficiency := func(table string, hours int, rewards *t.ClElValue[decimal.Decimal], apr *t.ClElValue[float64], efficiency *float64) {
// Rewards + APR
eg.Go(func() error {
(*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, -1, hours)
(*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
Loading
Loading