Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Beds 151/add rp mode #895

Open
wants to merge 22 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions backend/pkg/api/data_access/mobile.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,12 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex
data.NetworkEfficiency = utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime])

protocolModes := t.VDBProtocolModes{RocketPool: true}
rpInfos, err := d.getRocketPoolInfos(ctx, wrappedDashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool infos: %w", err)
}

// Validator status
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand Down Expand Up @@ -262,7 +268,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveApr := func(hours int, apr *float64) {
eg.Go(func() error {
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
_, elApr, _, clApr, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand All @@ -273,7 +279,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex

retrieveRewards := func(hours int, rewards *decimal.Decimal) {
eg.Go(func() error {
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, -1, hours)
clRewards, _, elRewards, _, err := d.internal_getElClAPR(ctx, wrappedDashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
52 changes: 40 additions & 12 deletions backend/pkg/api/data_access/vdb_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ import (
)

func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBBlocksColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBBlocksTableRow, *t.Paging, error) {
// @DATA-ACCESS incorporate protocolModes

// -------------------------------------
// Setup
var err error
Expand All @@ -42,6 +40,15 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
}
}

// Get the rocketpool minipool infos
var rpInfos *t.RPInfo
if protocolModes.RocketPool {
rpInfos, err = d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, nil, err
}
}

searchPubkey := regexp.MustCompile(`^0x[0-9a-fA-F]{96}$`).MatchString(search)
searchGroup := regexp.MustCompile(`^[a-zA-Z0-9_\-.\ ]+$`).MatchString(search)
searchIndex := regexp.MustCompile(`^[0-9]+$`).MatchString(search)
Expand Down Expand Up @@ -175,6 +182,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.COALESCE(goqu.L("rb.value / 1e18"), goqu.I("ep.fee_recipient_reward")).As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
blocksDs = blocksDs.
SelectAppend(goqu.L("(blocks.exec_fee_recipient = ? AND (rb.proposer_fee_recipient IS NULL OR rb.proposer_fee_recipient = ?)) AS is_smoothing_pool", rpInfos.SmoothingPoolAddress, rpInfos.SmoothingPoolAddress))
}

// 3. Sorting and pagination
defaultColumns := []t.SortColumn{
{Column: enums.VDBBlocksColumns.Slot.ToExpr(), Desc: true, Offset: currentCursor.Slot},
Expand Down Expand Up @@ -272,6 +284,11 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
goqu.L("NULL::NUMERIC").As("el_reward"),
)

if rpInfos != nil && protocolModes.RocketPool {
scheduledDs = scheduledDs.
SelectAppend(goqu.L("NULL::BOOL").As("is_smoothing_pool"))
}

// We don't have access to exec_block_number and status for a WHERE without wrapping the query so if we sort by those get all the data
if colSort.Column == enums.VDBBlocksColumns.Proposer || colSort.Column == enums.VDBBlocksColumns.Slot {
scheduledDs = scheduledDs.
Expand Down Expand Up @@ -307,16 +324,17 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
// -------------------------------------
// Execute query
var proposals []struct {
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
Proposer t.VDBValidator `db:"validator_index"`
Group uint64 `db:"group_id"`
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
GraffitiText sql.NullString `db:"graffiti_text"`
IsSmoothingPool sql.NullBool `db:"is_smoothing_pool"`

// for cursor only
Reward decimal.Decimal
Expand Down Expand Up @@ -444,9 +462,19 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
TraceIdx: -1,
})
reward.El = proposal.ElReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool && !(proposal.IsSmoothingPool.Valid && proposal.IsSmoothingPool.Bool) {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.El = reward.El.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
if clReward, ok := clRewards[proposal.Slot]; ok && clReward.Valid {
reward.Cl = clReward.Decimal.Mul(decimal.NewFromInt(1e18))
if rpInfos != nil && protocolModes.RocketPool {
if rpValidator, ok := rpInfos.Minipool[proposal.Proposer]; ok {
reward.Cl = reward.Cl.Mul(d.getRocketPoolOperatorFactor(rpValidator))
}
}
}
proposals[i].Reward = proposal.ElReward.Decimal.Add(proposal.ClReward.Decimal)
data[i].Reward = &reward
Expand Down
136 changes: 136 additions & 0 deletions backend/pkg/api/data_access/vdb_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"time"

"github.com/doug-martin/goqu/v9"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gobitfly/beaconchain/pkg/api/enums"
t "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/lib/pq"
"github.com/pkg/errors"
"github.com/shopspring/decimal"
"golang.org/x/sync/errgroup"
)

//////////////////// Helper functions (must be used by more than one VDB endpoint!)
Expand Down Expand Up @@ -130,3 +133,136 @@ func (d *DataAccessService) getTimeToNextWithdrawal(distance uint64) time.Time {

return timeToWithdrawal
}

func (d *DataAccessService) getRocketPoolInfos(ctx context.Context, dashboardId t.VDBId, groupId int64) (*t.RPInfo, error) {
wg := errgroup.Group{}

queryResult := []struct {
ValidatorIndex uint64 `db:"validatorindex"`
NodeAddress []byte `db:"node_address"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
EndTime sql.NullTime `db:"end_time"`
SmoothingPoolAddress []byte `db:"smoothing_pool_address"`
SmoothingPoolEth *decimal.Decimal `db:"smoothing_pool_eth"`
}{}

wg.Go(func() error {
ds := goqu.Dialect("postgres").
Select(
goqu.L("v.validatorindex"),
goqu.L("rplm.node_address"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance"),
goqu.L("rplrs.end_time"),
goqu.L("rploc.smoothing_pool_address"),
goqu.L("rplrs.smoothing_pool_eth"),
).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))).
LeftJoin(goqu.L("rocketpool_rewards_summary AS rplrs"), goqu.On(goqu.L("rplm.node_address = rplrs.node_address"))).
LeftJoin(goqu.L("rocketpool_onchain_configs AS rploc"), goqu.On(goqu.L("rplm.rocketpool_storage_address = rploc.rocketpool_storage_address"))).
Where(goqu.L("rplm.node_deposit_balance IS NOT NULL")).
Where(goqu.L("rplm.user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))

if groupId != t.AllGroups {
ds = ds.
Where(goqu.L("uvdv.group_id = ?", groupId))
}
} else {
ds = ds.
Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

return nil
})

nodeMinipoolCount := make(map[string]uint64)
wg.Go(func() error {
queryResult := []struct {
NodeAddress []byte `db:"node_address"`
MinipoolCount uint64 `db:"minipool_count"`
}{}

err := d.alloyReader.SelectContext(ctx, &queryResult, `
SELECT
node_address,
COUNT(node_address) AS minipool_count
FROM rocketpool_minipools
GROUP BY node_address`)
if err != nil {
return fmt.Errorf("error retrieving rocketpool node deposits data: %w", err)
}

for _, res := range queryResult {
node := hexutil.Encode(res.NodeAddress)
nodeMinipoolCount[node] = res.MinipoolCount
}

return nil
})

err := wg.Wait()
if err != nil {
return nil, err
}

if len(queryResult) == 0 {
return nil, nil
}

rpInfo := t.RPInfo{
Minipool: make(map[uint64]t.RPMinipoolInfo),
// Smoothing pool address is the same for all nodes on the network so take the first result
SmoothingPoolAddress: queryResult[0].SmoothingPoolAddress,
}

for _, res := range queryResult {
if _, ok := rpInfo.Minipool[res.ValidatorIndex]; !ok {
rpInfo.Minipool[res.ValidatorIndex] = t.RPMinipoolInfo{
NodeFee: res.NodeFee,
NodeDepositBalance: res.NodeDepositBalance,
UserDepositBalance: res.UserDepositBalance,
SmoothingPoolRewards: make(map[uint64]decimal.Decimal),
}
}

node := hexutil.Encode(res.NodeAddress)
if res.EndTime.Valid && res.SmoothingPoolEth != nil {
epoch := uint64(utils.TimeToEpoch(res.EndTime.Time))
splitReward := res.SmoothingPoolEth.Div(decimal.NewFromUint64(nodeMinipoolCount[node]))
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch] =
rpInfo.Minipool[res.ValidatorIndex].SmoothingPoolRewards[epoch].Add(splitReward)
}
}

return &rpInfo, nil
}

func (d *DataAccessService) getRocketPoolOperatorFactor(minipool t.RPMinipoolInfo) decimal.Decimal {
fullDeposit := minipool.UserDepositBalance.Add(minipool.NodeDepositBalance)
operatorShare := minipool.NodeDepositBalance.Div(fullDeposit)
invOperatorShare := decimal.NewFromInt(1).Sub(operatorShare)

commissionReward := invOperatorShare.Mul(decimal.NewFromFloat(minipool.NodeFee))
operatorFactor := operatorShare.Add(commissionReward)

return operatorFactor
}
69 changes: 16 additions & 53 deletions backend/pkg/api/data_access/vdb_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,11 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
})
}

rpInfos, err := d.getRocketPoolInfos(ctx, dashboardId, t.AllGroups)
if err != nil {
return nil, fmt.Errorf("error retrieving rocketpool validators: %w", err)
}

// Validator status and balance
eg.Go(func() error {
validatorMapping, err := d.services.GetCurrentValidatorMapping()
Expand All @@ -346,10 +351,13 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
data.Groups = append(data.Groups, t.VDBOverviewGroup{Id: t.DefaultGroupId, Name: t.DefaultGroupName, Count: uint64(len(validators))})
}

// Status
// Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators
var nonRpDashboardId t.VDBId

for _, validator := range validators {
metadata := validatorMapping.ValidatorMetadata[validator]

// Status
switch constypes.ValidatorDbStatus(metadata.Status) {
case constypes.DbExitingOnline, constypes.DbSlashingOnline, constypes.DbActiveOnline:
data.Validators.Online++
Expand All @@ -362,61 +370,16 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
case constypes.DbExited:
data.Validators.Exited++
}
}

// Find rocketpool validators
type RpOperatorInfo struct {
ValidatorIndex uint64 `db:"validatorindex"`
NodeFee float64 `db:"node_fee"`
NodeDepositBalance decimal.Decimal `db:"node_deposit_balance"`
UserDepositBalance decimal.Decimal `db:"user_deposit_balance"`
}
var queryResult []RpOperatorInfo

ds := goqu.Dialect("postgres").
Select(
goqu.L("v.validatorindex"),
goqu.L("rplm.node_fee"),
goqu.L("rplm.node_deposit_balance"),
goqu.L("rplm.user_deposit_balance")).
From(goqu.L("rocketpool_minipools AS rplm")).
LeftJoin(goqu.L("validators AS v"), goqu.On(goqu.L("rplm.pubkey = v.pubkey"))).
Where(goqu.L("node_deposit_balance IS NOT NULL")).
Where(goqu.L("user_deposit_balance IS NOT NULL"))

if len(dashboardId.Validators) == 0 {
ds = ds.
LeftJoin(goqu.L("users_val_dashboards_validators uvdv"), goqu.On(goqu.L("uvdv.validator_index = v.validatorindex"))).
Where(goqu.L("uvdv.dashboard_id = ?", dashboardId.Id))
} else {
ds = ds.
Where(goqu.L("v.validatorindex = ANY(?)", pq.Array(dashboardId.Validators)))
}

query, args, err := ds.Prepared(true).ToSQL()
if err != nil {
return fmt.Errorf("error preparing query: %w", err)
}

err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...)
if err != nil {
return fmt.Errorf("error retrieving rocketpool validators data: %w", err)
}

rpValidators := make(map[uint64]RpOperatorInfo)
for _, res := range queryResult {
rpValidators[res.ValidatorIndex] = res
}

// Create a new sub-dashboard to get the total cl deposits for non-rocketpool validators
var nonRpDashboardId t.VDBId

for _, validator := range validators {
metadata := validatorMapping.ValidatorMetadata[validator]
// Balance
validatorBalance := utils.GWeiToWei(big.NewInt(int64(metadata.Balance)))
effectiveBalance := utils.GWeiToWei(big.NewInt(int64(metadata.EffectiveBalance)))

if rpValidator, ok := rpValidators[validator]; ok {
if rpInfos == nil {
data.Balances.Total = data.Balances.Total.Add(validatorBalance)

nonRpDashboardId.Validators = append(nonRpDashboardId.Validators, validator)
} else if rpValidator, ok := rpInfos.Minipool[validator]; ok {
if protocolModes.RocketPool {
// Calculate the balance of the operator
fullDeposit := rpValidator.UserDepositBalance.Add(rpValidator.NodeDepositBalance)
Expand Down Expand Up @@ -457,7 +420,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
retrieveRewardsAndEfficiency := func(table string, hours int, rewards *t.ClElValue[decimal.Decimal], apr *t.ClElValue[float64], efficiency *float64) {
// Rewards + APR
eg.Go(func() error {
(*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, -1, hours)
(*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, t.AllGroups, protocolModes, rpInfos, hours)
if err != nil {
return err
}
Expand Down
Loading
Loading