Skip to content

Commit

Permalink
Merge pull request #402 from gobitfly/BIDS-3096/Aggregate_validator_d…
Browse files Browse the repository at this point in the history
…ashboard_group_rewards_and_withdrawals

Bids 3096/aggregate validator dashboard group rewards and withdrawals
  • Loading branch information
Eisei24 authored Jun 12, 2024
2 parents bf24fca + 478fe0c commit dbb6322
Show file tree
Hide file tree
Showing 3 changed files with 109 additions and 55 deletions.
130 changes: 90 additions & 40 deletions backend/pkg/api/data_access/vdb_rewards.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,54 +130,88 @@ func (d *DataAccessService) GetValidatorDashboardRewards(dashboardId t.VDBId, cu
}

if search != "" {
// Create a secondary query to get the group ids that match the search term
// We cannot do everything in one query because we need to know the "epoch total" even for groups we do not search for
groupIdQueryParams := []interface{}{}
if dashboardId.AggregateGroups {
if epochSearch == -1 && indexSearch == -1 {
// If we have a search term but no epoch or index search then we can return empty results
return result, &paging, nil
}

indexSearchQuery := ""
if indexSearch != -1 {
groupIdQueryParams = append(groupIdQueryParams, indexSearch)
indexSearchQuery = fmt.Sprintf(" OR v.validator_index = $%d", len(groupIdQueryParams))
}
found := false
if indexSearch != -1 {
// Find whether the index is in the dashboard
// If it is then show all the data
err = d.alloyReader.Get(&found, `
SELECT EXISTS(
SELECT 1
FROM users_val_dashboards_validators
WHERE dashboard_id = $1 AND validator_index = $2)
`, dashboardId.Id, indexSearch)
if err != nil {
return nil, nil, err
}
}
if !found && epochSearch != -1 {
queryParams = append(queryParams, epochSearch)
whereQuery += fmt.Sprintf(" AND e.epoch = $%d", len(queryParams))
}
} else {
// Create a secondary query to get the group ids that match the search term
// We cannot do everything in one query because we need to know the "epoch total" even for groups we do not search for
groupIdQueryParams := []interface{}{}

indexSearchQuery := ""
if indexSearch != -1 {
groupIdQueryParams = append(groupIdQueryParams, indexSearch)
indexSearchQuery = fmt.Sprintf(" OR v.validator_index = $%d", len(groupIdQueryParams))
}

groupIdQueryParams = append(groupIdQueryParams, dashboardId.Id, search)
groupIdQuery := fmt.Sprintf(`
groupIdQueryParams = append(groupIdQueryParams, dashboardId.Id, search)
groupIdQuery := fmt.Sprintf(`
SELECT
DISTINCT(group_id)
FROM users_val_dashboards_validators v
INNER JOIN users_val_dashboards_groups g ON v.group_id = g.id AND v.dashboard_id = g.dashboard_id
WHERE v.dashboard_id = $%d AND (g.name ILIKE ($%d||'%%') %s)
`, len(groupIdQueryParams)-1, len(groupIdQueryParams), indexSearchQuery)

var groupIdSearch []uint64
err = d.alloyReader.Select(&groupIdSearch, groupIdQuery, groupIdQueryParams...)
if err != nil {
return nil, nil, err
}
var groupIdSearch []uint64
err = d.alloyReader.Select(&groupIdSearch, groupIdQuery, groupIdQueryParams...)
if err != nil {
return nil, nil, err
}

// Convert to a map for an easy check later
for _, groupId := range groupIdSearch {
groupIdSearchMap[groupId] = true
}
// Convert to a map for an easy check later
for _, groupId := range groupIdSearch {
groupIdSearchMap[groupId] = true
}

if len(groupIdSearchMap) == 0 {
if epochSearch != -1 {
// If we have an epoch search but no group search then we can restrict the query to the epoch
queryParams = append(queryParams, epochSearch)
whereQuery += fmt.Sprintf(" AND e.epoch = $%d", len(queryParams))
} else {
// No search for goup or epoch possible, return empty results
return result, &paging, nil
if len(groupIdSearchMap) == 0 {
if epochSearch != -1 {
// If we have an epoch search but no group search then we can restrict the query to the epoch
queryParams = append(queryParams, epochSearch)
whereQuery += fmt.Sprintf(" AND e.epoch = $%d", len(queryParams))
} else {
// No search for goup or epoch possible, return empty results
return result, &paging, nil
}
}
}
}

groupIdQuery := "v.group_id,"
groupByQuery := "GROUP BY e.epoch, v.group_id"
orderQuery := fmt.Sprintf("ORDER BY e.epoch %[1]s, v.group_id %[1]s", sortSearchOrder)
if dashboardId.AggregateGroups {
queryParams = append(queryParams, t.DefaultGroupId)
groupIdQuery = fmt.Sprintf("$%d::smallint AS group_id,", len(queryParams))
groupByQuery = "GROUP BY e.epoch"
orderQuery = fmt.Sprintf("ORDER BY e.epoch %s", sortSearchOrder)
}

rewardsQuery = fmt.Sprintf(`
SELECT
e.epoch,
v.group_id,
%s
%s
FROM validator_dashboard_data_epoch e
INNER JOIN users_val_dashboards_validators v ON e.validator_index = v.validator_index
Expand All @@ -186,8 +220,8 @@ func (d *DataAccessService) GetValidatorDashboardRewards(dashboardId t.VDBId, cu
LEFT JOIN execution_payloads ep ON ep.block_hash = b.exec_block_hash
LEFT JOIN relays_blocks r ON r.exec_block_hash = b.exec_block_hash
%s
GROUP BY e.epoch, v.group_id
%s`, rewardsDataQuery, whereQuery, orderQuery)
%s
%s`, groupIdQuery, rewardsDataQuery, whereQuery, groupByQuery, orderQuery)
} else {
// In case a list of validators is provided set the group to the default id
queryParams = append(queryParams, pq.Array(dashboardId.Validators), latestFinalizedEpoch-epochLookBack)
Expand All @@ -206,12 +240,7 @@ func (d *DataAccessService) GetValidatorDashboardRewards(dashboardId t.VDBId, cu
if indexSearch != -1 {
// Find whether the index is in the list of validators
// If it is then show all the data
for _, validator := range dashboardId.Validators {
if validator == t.VDBValidator(indexSearch) {
found = true
break
}
}
found = utils.ElementExists(dashboardId.Validators, t.VDBValidator(indexSearch))
}
if !found && epochSearch != -1 {
queryParams = append(queryParams, epochSearch)
Expand Down Expand Up @@ -404,6 +433,11 @@ func (d *DataAccessService) GetValidatorDashboardRewards(dashboardId t.VDBId, cu
func (d *DataAccessService) GetValidatorDashboardGroupRewards(dashboardId t.VDBId, groupId int64, epoch uint64) (*t.VDBGroupRewardsData, error) {
ret := &t.VDBGroupRewardsData{}

if dashboardId.AggregateGroups {
// If we are aggregating groups then ignore the group id and sum up everything
groupId = t.AllGroups
}

type queryResult struct {
AttestationSourceReward decimal.Decimal `db:"attestations_source_reward"`
AttestationTargetReward decimal.Decimal `db:"attestations_target_reward"`
Expand Down Expand Up @@ -541,7 +575,8 @@ func (d *DataAccessService) GetValidatorDashboardGroupRewards(dashboardId t.VDBI
}

func (d *DataAccessService) GetValidatorDashboardRewardsChart(dashboardId t.VDBId) (*t.ChartData[int, decimal.Decimal], error) {
// bar chart for the CL and EL rewards for each group for each epoch. NO series for all groups combined
// bar chart for the CL and EL rewards for each group for each epoch.
// NO series for all groups combined except if AggregateGroups is true.
// series id is group id, series property is 'cl' or 'el'

latestFinalizedEpoch := cache.LatestFinalizedEpoch.Get()
Expand All @@ -564,20 +599,30 @@ func (d *DataAccessService) GetValidatorDashboardRewardsChart(dashboardId t.VDBI
`

if dashboardId.Validators == nil {
groupIdQuery := "v.group_id,"
groupByQuery := "GROUP BY e.epoch, v.group_id"
orderQuery := "ORDER BY e.epoch, v.group_id"
if dashboardId.AggregateGroups {
queryParams = append(queryParams, t.DefaultGroupId)
groupIdQuery = fmt.Sprintf("$%d::smallint AS group_id,", len(queryParams))
groupByQuery = "GROUP BY e.epoch"
orderQuery = "ORDER BY e.epoch"
}

queryParams = append(queryParams, dashboardId.Id, latestFinalizedEpoch-epochLookBack)
rewardsQuery = fmt.Sprintf(`
SELECT
e.epoch,
v.group_id,
%s
%s
FROM validator_dashboard_data_epoch e
INNER JOIN users_val_dashboards_validators v ON e.validator_index = v.validator_index
LEFT JOIN blocks b ON e.epoch = b.epoch AND e.validator_index = b.proposer AND b.status = '1'
LEFT JOIN execution_payloads ep ON ep.block_hash = b.exec_block_hash
LEFT JOIN relays_blocks r ON r.exec_block_hash = b.exec_block_hash
WHERE v.dashboard_id = $%d AND e.epoch > $%d
GROUP BY e.epoch, v.group_id
ORDER BY e.epoch, v.group_id`, rewardsDataQuery, len(queryParams)-1, len(queryParams))
%s
%s`, groupIdQuery, rewardsDataQuery, len(queryParams)-1, len(queryParams), groupByQuery, orderQuery)
} else {
// In case a list of validators is provided set the group to the default id
queryParams = append(queryParams, t.DefaultGroupId, pq.Array(dashboardId.Validators), latestFinalizedEpoch-epochLookBack)
Expand Down Expand Up @@ -662,6 +707,11 @@ func (d *DataAccessService) GetValidatorDashboardDuties(dashboardId t.VDBId, epo
result := make([]t.VDBEpochDutiesTableRow, 0)
var paging t.Paging

if dashboardId.AggregateGroups {
// If we are aggregating groups then ignore the group id and sum up everything
groupId = t.AllGroups
}

// Initialize the cursor
var currentCursor t.ValidatorDutiesCursor
var err error
Expand Down
32 changes: 18 additions & 14 deletions backend/pkg/api/data_access/vdb_withdrawals.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,27 +64,31 @@ func (d *DataAccessService) GetValidatorDashboardWithdrawals(dashboardId t.VDBId
GroupId uint64 `db:"group_id"`
}{}

queryArgs := []interface{}{dashboardId.Id}
queryParams := []interface{}{dashboardId.Id}
validatorsQuery := fmt.Sprintf(`
SELECT
validator_index,
group_id
FROM users_val_dashboards_validators
WHERE dashboard_id = $%d
`, len(queryArgs))
`, len(queryParams))

if len(validatorSearch) > 0 {
queryArgs = append(queryArgs, pq.Array(validatorSearch))
validatorsQuery += fmt.Sprintf(" AND validator_index = ANY ($%d)", len(queryArgs))
queryParams = append(queryParams, pq.Array(validatorSearch))
validatorsQuery += fmt.Sprintf(" AND validator_index = ANY ($%d)", len(queryParams))
}

err := d.alloyReader.Select(&queryResult, validatorsQuery, queryArgs...)
err := d.alloyReader.Select(&queryResult, validatorsQuery, queryParams...)
if err != nil {
return nil, nil, err
}

for _, res := range queryResult {
validatorGroupMap[res.ValidatorIndex] = res.GroupId
groupId := res.GroupId
if dashboardId.AggregateGroups {
groupId = t.DefaultGroupId
}
validatorGroupMap[res.ValidatorIndex] = groupId
validators = append(validators, res.ValidatorIndex)
}
} else {
Expand Down Expand Up @@ -472,7 +476,7 @@ func (d *DataAccessService) GetValidatorDashboardTotalWithdrawals(dashboardId t.
Amount int64 `db:"acc_withdrawals_amount"`
}{}

queryArgs := []interface{}{}
queryParams := []interface{}{}
withdrawalsQuery := `
SELECT
t.validator_index,
Expand All @@ -484,14 +488,14 @@ func (d *DataAccessService) GetValidatorDashboardTotalWithdrawals(dashboardId t.
`

if dashboardId.Validators == nil {
queryArgs = append(queryArgs, dashboardId.Id)
queryParams = append(queryParams, dashboardId.Id)
dashboardIdQuery := fmt.Sprintf(`
INNER JOIN users_val_dashboards_validators v ON v.validator_index = t.validator_index
WHERE v.dashboard_id = $%d`, len(queryArgs))
WHERE v.dashboard_id = $%d`, len(queryParams))

if len(validatorSearch) > 0 {
queryArgs = append(queryArgs, pq.Array(validatorSearch))
dashboardIdQuery += fmt.Sprintf(" AND t.validator_index = ANY ($%d)", len(queryArgs))
queryParams = append(queryParams, pq.Array(validatorSearch))
dashboardIdQuery += fmt.Sprintf(" AND t.validator_index = ANY ($%d)", len(queryParams))
}

withdrawalsQuery = fmt.Sprintf(withdrawalsQuery, dashboardIdQuery)
Expand All @@ -509,14 +513,14 @@ func (d *DataAccessService) GetValidatorDashboardTotalWithdrawals(dashboardId t.
return result, nil
}

queryArgs = append(queryArgs, pq.Array(validators))
queryParams = append(queryParams, pq.Array(validators))
validatorsQuery := fmt.Sprintf(`
WHERE t.validator_index = ANY ($%d)`, len(queryArgs))
WHERE t.validator_index = ANY ($%d)`, len(queryParams))

withdrawalsQuery = fmt.Sprintf(withdrawalsQuery, validatorsQuery)
}

err = d.alloyReader.Select(&queryResult, withdrawalsQuery, queryArgs...)
err = d.alloyReader.Select(&queryResult, withdrawalsQuery, queryParams...)
if err != nil {
return nil, fmt.Errorf("error getting total withdrawals for validators: %+v: %w", dashboardId, err)
}
Expand Down
2 changes: 1 addition & 1 deletion backend/pkg/commons/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func GetNetwork() string {
return strings.ToLower(Config.Chain.ClConfig.ConfigName)
}

func ElementExists(arr []string, el string) bool {
func ElementExists[T comparable](arr []T, el T) bool {
for _, e := range arr {
if e == el {
return true
Expand Down

0 comments on commit dbb6322

Please sign in to comment.