diff --git a/backend/pkg/api/data_access/vdb_management.go b/backend/pkg/api/data_access/vdb_management.go index 0790ed759..04c77e0d2 100644 --- a/backend/pkg/api/data_access/vdb_management.go +++ b/backend/pkg/api/data_access/vdb_management.go @@ -315,7 +315,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d retrieveRewardsAndEfficiency := func(table string, days int, rewards *t.ClElValue[decimal.Decimal], apr *t.ClElValue[float64], efficiency *float64) { // Rewards + APR wg.Go(func() error { - (*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, validators, days) + (*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, -1, days) if err != nil { return err } diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 8f4ce2e78..578b09b9c 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -17,7 +17,6 @@ import ( "github.com/gobitfly/beaconchain/pkg/api/enums" t "github.com/gobitfly/beaconchain/pkg/api/types" "github.com/gobitfly/beaconchain/pkg/commons/cache" - "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/utils" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" @@ -36,7 +35,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da wg := errgroup.Group{} // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, nil, err } @@ -104,7 +103,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da var queryResult []struct { GroupId int64 `db:"result_group_id"` GroupName string `db:"group_name"` - ValidatorIndices pq.Int64Array `db:"validator_indices"` + ValidatorIndices []uint64 `db:"validator_indices"` ClRewards int64 `db:"cl_rewards"` AttestationReward decimal.Decimal `db:"attestations_reward"` AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"` @@ -114,113 +113,123 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da BlocksScheduled uint64 `db:"blocks_scheduled"` SyncExecuted uint64 `db:"sync_executed"` SyncScheduled uint64 `db:"sync_scheduled"` + MinEpochStart int64 `db:"min_epoch_start"` + MaxEpochEnd int64 `db:"max_epoch_end"` } - wg.Go(func() error { - ds := goqu.Dialect("postgres"). - Select( - goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), - goqu.L("SUM(COALESCE(r.attestations_reward, 0) + COALESCE(r.blocks_cl_reward, 0) + COALESCE(r.sync_rewards, 0) + COALESCE(r.slasher_reward, 0)) AS cl_rewards"), - goqu.L("COALESCE(SUM(r.attestations_reward)::decimal, 0) AS attestations_reward"), - goqu.L("COALESCE(SUM(r.attestations_ideal_reward)::decimal, 0) AS attestations_ideal_reward"), - goqu.L("COALESCE(SUM(r.attestations_executed), 0) AS attestations_executed"), - goqu.L("COALESCE(SUM(r.attestations_scheduled), 0) AS attestations_scheduled"), - goqu.L("COALESCE(SUM(r.blocks_proposed), 0) AS blocks_proposed"), - goqu.L("COALESCE(SUM(r.blocks_scheduled), 0) AS blocks_scheduled"), - goqu.L("COALESCE(SUM(r.sync_executed), 0) AS sync_executed"), - goqu.L("COALESCE(SUM(r.sync_scheduled), 0) AS sync_scheduled")). - From(goqu.T(table).As("r")). - GroupBy(goqu.L("result_group_id")) - - if len(validators) > 0 { + ds := goqu.Dialect("postgres"). + From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))). + With("validators", goqu.L("(SELECT dashboard_id, group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). + Select( + goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), + goqu.L("COALESCE(SUM(r.attestations_reward + r.blocks_cl_reward + r.sync_rewards + r.blocks_cl_slasher_reward), 0) AS cl_rewards"), + goqu.L("COALESCE(SUM(r.attestations_reward)::decimal, 0) AS attestations_reward"), + goqu.L("COALESCE(SUM(r.attestations_ideal_reward)::decimal, 0) AS attestations_ideal_reward"), + goqu.L("COALESCE(SUM(r.attestations_executed), 0) AS attestations_executed"), + goqu.L("COALESCE(SUM(r.attestations_scheduled), 0) AS attestations_scheduled"), + goqu.L("COALESCE(SUM(r.blocks_proposed), 0) AS blocks_proposed"), + goqu.L("COALESCE(SUM(r.blocks_scheduled), 0) AS blocks_scheduled"), + goqu.L("COALESCE(SUM(r.sync_executed), 0) AS sync_executed"), + goqu.L("COALESCE(SUM(r.sync_scheduled), 0) AS sync_scheduled"), + goqu.L("COALESCE(MIN(r.epoch_start), 0) AS min_epoch_start"), + goqu.L("COALESCE(MAX(r.epoch_end), 0) AS max_epoch_end")). + GroupBy(goqu.L("result_group_id")) + + if len(validators) > 0 { + ds = ds. + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). + Where(goqu.L("r.validator_index IN ?", validators)) + } else { + if dashboardId.AggregateGroups { ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(validators))) + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) } else { - if dashboardId.AggregateGroups { - ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) - } else { - ds = ds. - SelectAppend(goqu.L("v.group_id AS result_group_id")) - } - ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) - - if groupNameSearchEnabled && (search != "" || colSort.Column == enums.VDBSummaryColumns.Group) { - // Get the group names since we can filter and/or sort for them - ds = ds. - SelectAppend(goqu.L("g.name AS group_name")). - InnerJoin(goqu.L("users_val_dashboards_groups g"), goqu.On(goqu.L("v.group_id = g.id AND v.dashboard_id = g.dashboard_id"))). - GroupByAppend(goqu.L("group_name")) - } + SelectAppend(goqu.L("v.group_id AS result_group_id")) } - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %v", err) - } + ds = ds. + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) - if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) + if groupNameSearchEnabled && (search != "" || colSort.Column == enums.VDBSummaryColumns.Group) { + // Get the group names since we can filter and/or sort for them + ds = ds. + SelectAppend(goqu.L("g.name AS group_name")). + InnerJoin(goqu.L("users_val_dashboards_groups g"), goqu.On(goqu.L("v.group_id = g.id AND v.dashboard_id = g.dashboard_id"))). + GroupByAppend(goqu.L("group_name")) } - return nil - }) + } + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, nil, fmt.Errorf("error preparing query: %v", err) + } + + err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) + if err != nil { + return nil, nil, fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err) + } + + epochMin := int64(math.MaxInt64) + epochMax := int64(0) + + for _, row := range queryResult { + if row.MinEpochStart < epochMin { + epochMin = row.MinEpochStart + } + if row.MaxEpochEnd > epochMax { + epochMax = row.MaxEpochEnd + } + } // ------------------------------------------------------------------------------------------------------------------ // Get the EL rewards elRewards := make(map[int64]decimal.Decimal) - wg.Go(func() error { - ds := goqu.Dialect("postgres"). - Select( - goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). - From(goqu.T(table).As("r")). - LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("b.epoch >= r.epoch_start AND b.epoch <= r.epoch_end AND r.validator_index = b.proposer AND b.status = '1'"))). - LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))). - LeftJoin(goqu.L("relays_blocks rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash"))). - GroupBy(goqu.L("result_group_id")) - - if len(validators) > 0 { + ds = goqu.Dialect("postgres"). + Select( + goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). + From(goqu.L("blocks b")). + LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))). + LeftJoin(goqu.L("relays_blocks rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash"))). + Where(goqu.L("b.epoch >= ? AND b.epoch <= ? AND b.status = '1'", epochMin, epochMax)). + GroupBy(goqu.L("result_group_id")) + + if len(validators) > 0 { + ds = ds. + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). + Where(goqu.L("b.proposer = ANY(?)", pq.Array(validators))) + } else { + if dashboardId.AggregateGroups { ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(validators))) + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) } else { - if dashboardId.AggregateGroups { - ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) - } else { - ds = ds. - SelectAppend(goqu.L("v.group_id AS result_group_id")) - } - ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + SelectAppend(goqu.L("v.group_id AS result_group_id")) } - var queryResult []struct { - GroupId int64 `db:"result_group_id"` - ElRewards decimal.Decimal `db:"el_rewards"` - } + ds = ds. + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). + Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + } - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %v", err) - } + var elRewardsQueryResult []struct { + GroupId int64 `db:"result_group_id"` + ElRewards decimal.Decimal `db:"el_rewards"` + } - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) - if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) - } + query, args, err = ds.Prepared(true).ToSQL() + if err != nil { + return nil, nil, fmt.Errorf("error preparing query: %v", err) + } - for _, entry := range queryResult { - elRewards[entry.GroupId] = entry.ElRewards - } - return nil - }) + err = d.alloyReader.SelectContext(ctx, &elRewardsQueryResult, query, args...) + if err != nil { + return nil, nil, fmt.Errorf("error retrieving data from table blocks: %v", err) + } + + for _, entry := range elRewardsQueryResult { + elRewards[entry.GroupId] = entry.ElRewards + } // ------------------------------------------------------------------------------------------------------------------ // Get the current and next sync committee validators @@ -270,18 +279,13 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da } for _, queryEntry := range queryResult { - uiValidatorIndices := make([]uint64, len(queryEntry.ValidatorIndices)) - for i, validatorIndex := range queryEntry.ValidatorIndices { - uiValidatorIndices[i] = uint64(validatorIndex) - } - resultEntry := t.VDBSummaryTableRow{ GroupId: queryEntry.GroupId, AverageNetworkEfficiency: averageNetworkEfficiency, } // Status - for _, validatorIndex := range uiValidatorIndices { + for _, validatorIndex := range queryEntry.ValidatorIndices { if currentSyncCommitteeValidators[validatorIndex] { resultEntry.Status.CurrentSyncCount++ } @@ -299,7 +303,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da return nil, nil, err } - for _, validator := range uiValidatorIndices { + for _, validator := range queryEntry.ValidatorIndices { metadata := validatorMapping.ValidatorMetadata[validator] // As deposited and pending validators are neither online nor offline they are counted as the third state (exited) @@ -369,7 +373,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da // If the search permits it add the entry to the result if search != "" { prefixSearch := strings.ToLower(search) - for _, validatorIndex := range uiValidatorIndices { + for _, validatorIndex := range queryEntry.ValidatorIndices { if searchValidator != -1 && validatorIndex == uint64(searchValidator) || (groupNameSearchEnabled && strings.HasPrefix(strings.ToLower(queryEntry.GroupName), prefixSearch)) { result = append(result, resultEntry) @@ -507,13 +511,17 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } // Get the table names based on the period - table, slashedByCountTable, days, err := d.getTablesForPeriod(period) + clickhouseTable, days, err := d.getTablesForPeriod(period) if err != nil { return nil, err } - query := `select - users_val_dashboards_validators.validator_index, + query := ` + WITH validators AS ( + SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE (dashboard_id = $1 and (group_id = $2 OR $2 = -1)) + ) + select + validator_index, epoch_start, COALESCE(attestations_reward, 0) as attestations_reward, COALESCE(attestations_ideal_reward, 0) as attestations_ideal_reward, @@ -525,15 +533,14 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(blocks_proposed, 0) as blocks_proposed, COALESCE(sync_scheduled, 0) as sync_scheduled, COALESCE(sync_executed, 0) as sync_executed, - %[1]s.slashed_by IS NOT NULL AS slashed_in_period, - COALESCE(%[2]s.slashed_amount, 0) AS slashed_amount, + slashed AS slashed_in_period, + COALESCE(blocks_slashing_count, 0) AS slashed_amount, COALESCE(blocks_expected, 0) as blocks_expected, COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected - from users_val_dashboards_validators - inner join %[1]s on %[1]s.validator_index = users_val_dashboards_validators.validator_index - left join %[2]s on %[2]s.slashed_by = users_val_dashboards_validators.validator_index - where (dashboard_id = $1 and (group_id = $2 OR $2 = -1)) + from %[1]s FINAL + inner join validators v on %[1]s.validator_index = v.validator_index + where validator_index IN (select validator_index FROM validators) ` if dashboardId.Validators != nil { @@ -550,14 +557,13 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(blocks_proposed, 0) as blocks_proposed, COALESCE(sync_scheduled, 0) as sync_scheduled, COALESCE(sync_executed, 0) as sync_executed, - %[1]s.slashed_by IS NOT NULL AS slashed_in_period, - COALESCE(%[2]s.slashed_amount, 0) AS slashed_amount, + slashed AS slashed_in_period, + COALESCE(blocks_slashing_count, 0) AS slashed_amount, COALESCE(blocks_expected, 0) as blocks_expected, COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected - from %[1]s - left join %[2]s on %[2]s.slashed_by = %[1]s.validator_index - where %[1]s.validator_index = ANY($1) + from %[1]s FINAL + where %[1]s.validator_index IN ($1) ` } @@ -595,9 +601,9 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex var rows []*queryResult if len(validators) > 0 { - err = d.alloyReader.SelectContext(ctx, &rows, fmt.Sprintf(query, table, slashedByCountTable), validators) + err = d.clickhouseReader.SelectContext(ctx, &rows, fmt.Sprintf(query, clickhouseTable), validators) } else { - err = d.alloyReader.SelectContext(ctx, &rows, fmt.Sprintf(query, table, slashedByCountTable), dashboardId.Id, groupId) + err = d.clickhouseReader.SelectContext(ctx, &rows, fmt.Sprintf(query, clickhouseTable), dashboardId.Id, groupId) } if err != nil { @@ -674,7 +680,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } } - _, ret.Apr.El, _, ret.Apr.Cl, err = d.internal_getElClAPR(ctx, validatorArr, days) + _, ret.Apr.El, _, ret.Apr.Cl, err = d.internal_getElClAPR(ctx, dashboardId, groupId, days) if err != nil { return nil, err } @@ -733,27 +739,62 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex return ret, nil } -func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators []t.VDBValidator, days int) (elIncome decimal.Decimal, elAPR float64, clIncome decimal.Decimal, clAPR float64, err error) { - var reward sql.NullInt64 +func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId t.VDBId, groupId int64, days int) (elIncome decimal.Decimal, elAPR float64, clIncome decimal.Decimal, clAPR float64, err error) { table := "" switch days { case 1: - table = "validator_dashboard_data_rolling_daily" + table = "validator_dashboard_data_rolling_24h" case 7: - table = "validator_dashboard_data_rolling_weekly" + table = "validator_dashboard_data_rolling_7d" case 30: - table = "validator_dashboard_data_rolling_monthly" + table = "validator_dashboard_data_rolling_30d" case -1: table = "validator_dashboard_data_rolling_90d" default: return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("invalid days value: %v", days) } - query := `select (SUM(COALESCE(balance_end,0)) + SUM(COALESCE(withdrawals_amount,0)) - SUM(COALESCE(deposits_amount,0)) - SUM(COALESCE(balance_start,0))) reward FROM %s WHERE validator_index = ANY($1)` + type RewardsResult struct { + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + ValidatorCount uint64 `db:"validator_count"` + Reward sql.NullInt64 `db:"reward"` + } - err = db.AlloyReader.GetContext(ctx, &reward, fmt.Sprintf(query, table), validators) - if err != nil || !reward.Valid { + var rewardsResultTable RewardsResult + var rewardsResultTotal RewardsResult + + rewardsDs := goqu.Dialect("postgres"). + From(goqu.L(fmt.Sprintf("%s AS r FINAL", table))). + With("validators", goqu.L("(SELECT group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). + Select( + goqu.L("MIN(epoch_start) AS epoch_start"), + goqu.L("MAX(epoch_end) AS epoch_end"), + goqu.L("COUNT(*) AS validator_count"), + goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS reward")) + + if len(dashboardId.Validators) > 0 { + rewardsDs = rewardsDs. + Where(goqu.L("validator_index IN ?", dashboardId.Validators)) + } else { + rewardsDs = rewardsDs. + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) + + if groupId != -1 { + rewardsDs = rewardsDs. + Where(goqu.L("v.group_id = ?", groupId)) + } + } + + query, args, err := rewardsDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.clickhouseReader.GetContext(ctx, &rewardsResultTable, query, args...) + if err != nil || !rewardsResultTable.Reward.Valid { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -761,34 +802,76 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators if days == -1 { // for all time APR aprDivisor = 90 } - clAPR = ((float64(reward.Int64) / float64(aprDivisor)) / (float64(32e9) * float64(len(validators)))) * 365.0 * 100.0 + clAPR = ((float64(rewardsResultTable.Reward.Int64) / float64(aprDivisor)) / (float64(32e9) * float64(rewardsResultTable.ValidatorCount))) * 365.0 * 100.0 if math.IsNaN(clAPR) { clAPR = 0 } + + clIncome = decimal.NewFromInt(rewardsResultTable.Reward.Int64).Mul(decimal.NewFromInt(1e9)) + if days == -1 { - err = db.AlloyReader.GetContext(ctx, &reward, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators) - if err != nil || !reward.Valid { + rewardsDs = rewardsDs. + From(goqu.L("validator_dashboard_data_rolling_total AS r FINAL")) + + query, args, err = rewardsDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.clickhouseReader.GetContext(ctx, &rewardsResultTotal, query, args...) + if err != nil || !rewardsResultTotal.Reward.Valid { return decimal.Zero, 0, decimal.Zero, 0, err } + + clIncome = decimal.NewFromInt(rewardsResultTotal.Reward.Int64).Mul(decimal.NewFromInt(1e9)) } - clIncome = decimal.NewFromInt(reward.Int64).Mul(decimal.NewFromInt(1e9)) - query = ` - SELECT - COALESCE(SUM(COALESCE(rb.value / 1e18, fee_recipient_reward)), 0) - FROM blocks - LEFT JOIN execution_payloads ON blocks.exec_block_hash = execution_payloads.block_hash - LEFT JOIN relays_blocks rb ON blocks.exec_block_hash = rb.exec_block_hash - WHERE proposer = ANY($1) AND status = '1' AND slot >= (SELECT MIN(epoch_start) * $2 FROM %s WHERE validator_index = ANY($1));` - err = db.AlloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, table), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) + elDs := goqu.Dialect("postgres"). + Select(goqu.L("COALESCE(SUM(COALESCE(rb.value / 1e18, fee_recipient_reward)), 0) AS el_reward")). + From(goqu.L("blocks AS b")). + LeftJoin(goqu.L("execution_payloads AS ep"), goqu.On(goqu.L("b.exec_block_hash = ep.block_hash"))). + LeftJoin(goqu.L("relays_blocks AS rb"), goqu.On(goqu.L("b.exec_block_hash = rb.exec_block_hash"))). + Where(goqu.L("b.status = '1'")) + + if len(dashboardId.Validators) > 0 { + elDs = elDs. + Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))) + } else { + elDs = elDs. + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). + Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + + if groupId != -1 { + elDs = elDs. + Where(goqu.L("v.group_id = ?", groupId)) + } + } + + elTableDs := elDs. + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", rewardsResultTable.EpochStart, rewardsResultTable.EpochEnd)) + + query, args, err = elTableDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.alloyReader.GetContext(ctx, &elIncome, query, args...) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } elIncomeFloat, _ := elIncome.Float64() - elAPR = ((elIncomeFloat / float64(aprDivisor)) / (float64(32e18) * float64(len(validators)))) * 365.0 * 100.0 + elAPR = ((elIncomeFloat / float64(aprDivisor)) / (float64(32e18) * float64(rewardsResultTable.ValidatorCount))) * 365.0 * 100.0 if days == -1 { - err = db.AlloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) + elTotalDs := elDs. + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", rewardsResultTotal.EpochStart, rewardsResultTotal.EpochEnd)) + + query, args, err = elTotalDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.alloyReader.GetContext(ctx, &elIncome, query, args...) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -1154,7 +1237,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte wg := errgroup.Group{} // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1198,7 +1281,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte ds := goqu.Dialect("postgres"). Select( goqu.L("epoch_start")). - From(table). + From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). Limit(1) query, args, err := ds.Prepared(true).ToSQL() @@ -1207,7 +1290,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte } var epochStart uint64 - err = d.alloyReader.GetContext(ctx, &epochStart, query, args...) + err = d.clickhouseReader.GetContext(ctx, &epochStart, query, args...) if err != nil { return fmt.Errorf("error retrieving cutoff epoch for past sync committees: %w", err) } @@ -1219,7 +1302,6 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte Select( goqu.L("sc.validatorindex")). From(goqu.L("sync_committees sc")). - LeftJoin(goqu.I(table).As("r"), goqu.On(goqu.L("sc.validatorindex = r.validator_index"))). Where(goqu.L("period >= ? AND period < ? AND validatorindex = ANY(?)", pastSyncPeriodCutoff, currentSyncPeriod, pq.Array(validatorIndices))) query, args, err = ds.Prepared(true).ToSQL() @@ -1272,7 +1354,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } // Get the table names based on the period - table, slashedByCountTable, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1283,32 +1365,33 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } var queryResult []struct { - EpochStart uint64 `db:"epoch_start"` - EpochEnd uint64 `db:"epoch_end"` - ValidatorIndex uint64 `db:"validator_index"` - SlashedBy sql.NullInt64 `db:"slashed_by"` - SlashedAmount uint32 `db:"slashed_amount"` + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + ValidatorIndex uint64 `db:"validator_index"` + Slashed bool `db:"slashed"` + SlashedAmount uint32 `db:"slashed_amount"` } // Build the query - ds := goqu.Dialect("postgres").Select( - goqu.L("r.epoch_start"), - goqu.L("r.epoch_end"), - goqu.L("r.validator_index"), - goqu.L("r.slashed_by"), - goqu.L("COALESCE(s.slashed_amount, 0) AS slashed_amount")). - From(goqu.T(table).As("r")). - LeftJoin(goqu.T(slashedByCountTable).As("s"), goqu.On(goqu.L("r.validator_index = s.slashed_by"))). - Where(goqu.L("(r.slashed_by IS NOT NULL OR s.slashed_amount > 0)")) + ds := goqu.Dialect("postgres"). + From(goqu.L(fmt.Sprintf("%s AS r FINAL", clickhouseTable))). + With("validators", goqu.L("(SELECT group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). + Select( + goqu.L("r.epoch_start"), + goqu.L("r.epoch_end"), + goqu.L("r.validator_index"), + goqu.L("r.slashed"), + goqu.L("COALESCE(r.blocks_slashing_count, 0) AS slashed_amount")). + Where(goqu.L("(r.slashed OR r.blocks_slashing_count > 0)")) // handle the case when we have a list of validators if len(dashboardId.Validators) > 0 { ds = ds. - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(dashboardId.Validators))) + Where(goqu.L("r.validator_index IN ?", dashboardId.Validators)) } else { ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) if groupId != t.AllGroups { ds = ds.Where(goqu.L("v.group_id = ?", groupId)) @@ -1320,7 +1403,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx return nil, err } - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) + err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { log.Error(err, "error while getting validator dashboard slashed validators list", 0) return nil, err @@ -1328,27 +1411,30 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx // Process the data and get the slashing validators var slashingValidators []uint64 + var slashedValidators []uint64 for _, queryEntry := range queryResult { - if queryEntry.SlashedBy.Valid { - result.GotSlashed = append(result.GotSlashed, GotSlashedStruct{ - Index: queryEntry.ValidatorIndex, - SlashedBy: uint64(queryEntry.SlashedBy.Int64), - }) - } - if queryEntry.SlashedAmount > 0 { slashingValidators = append(slashingValidators, queryEntry.ValidatorIndex) } + + if queryEntry.Slashed { + slashedValidators = append(slashedValidators, queryEntry.ValidatorIndex) + } } - if len(slashingValidators) == 0 { - // We don't have any slashing validators so we can return early + if len(slashingValidators) == 0 && len(slashedValidators) == 0 { + // We don't have any slashing or slashed validators so we can return early return result, nil } + slashingValidatorsMap := utils.SliceToMap(slashingValidators) + slashedValidatorsMap := utils.SliceToMap(slashedValidators) + // If we have slashing validators then get the validators that got slashed proposalSlashings := make(map[uint64][]uint64) + proposalSlashed := make(map[uint64]uint64) attestationSlashings := make(map[uint64][]uint64) + attestationSlashed := make(map[uint64]uint64) slotStart := queryResult[0].EpochStart * utils.Config.Chain.ClConfig.SlotsPerEpoch slotEnd := (queryResult[0].EpochEnd+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1 @@ -1368,7 +1454,8 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("bps.proposerindex")). From(goqu.L("blocks_proposerslashings bps")). LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("b.slot = bps.block_slot"))). - Where(goqu.L("bps.block_slot >= ? AND bps.block_slot <= ? AND b.proposer = ANY(?)", slotStart, slotEnd, pq.Array(slashingValidators))) + Where(goqu.L("bps.block_slot >= ? AND bps.block_slot <= ?", slotStart, slotEnd)). + Where(goqu.L("(b.proposer = ANY(?) OR bps.proposerindex = ANY(?))", pq.Array(slashingValidators), pq.Array(slashedValidators))) query, args, err := ds.Prepared(true).ToSQL() if err != nil { @@ -1377,14 +1464,19 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) + return fmt.Errorf("error retrieving data from table blocks_proposerslashings: %v", err) } for _, queryEntry := range queryResult { - if _, ok := proposalSlashings[queryEntry.ProposerSlashing]; !ok { - proposalSlashings[queryEntry.ProposerSlashing] = make([]uint64, 0) + if _, ok := slashingValidatorsMap[queryEntry.ProposerSlashing]; ok { + if _, ok := proposalSlashings[queryEntry.ProposerSlashing]; !ok { + proposalSlashings[queryEntry.ProposerSlashing] = make([]uint64, 0) + } + proposalSlashings[queryEntry.ProposerSlashing] = append(proposalSlashings[queryEntry.ProposerSlashing], queryEntry.ProposerSlashed) + } + if _, ok := slashedValidatorsMap[queryEntry.ProposerSlashed]; ok { + proposalSlashed[queryEntry.ProposerSlashed] = queryEntry.ProposerSlashing } - proposalSlashings[queryEntry.ProposerSlashing] = append(proposalSlashings[queryEntry.ProposerSlashing], queryEntry.ProposerSlashed) } return nil }) @@ -1404,7 +1496,13 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("bas.attestation2_indices")). From(goqu.L("blocks_attesterslashings bas")). LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("b.slot = bas.block_slot"))). - Where(goqu.L("bas.block_slot >= ? AND bas.block_slot <= ? AND b.proposer = ANY(?)", slotStart, slotEnd, pq.Array(slashingValidators))) + Where(goqu.L("bas.block_slot >= ? AND bas.block_slot <= ?", slotStart, slotEnd)) + + if len(slashedValidators) == 0 { + // If we don't have any slashed validators then we can just get the slashing validators + ds = ds. + Where(goqu.L("b.proposer = ANY(?)", pq.Array(slashingValidators))) + } query, args, err := ds.Prepared(true).ToSQL() if err != nil { @@ -1413,7 +1511,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) + return fmt.Errorf("error retrieving data from table blocks_attesterslashings: %v", err) } for _, queryEntry := range queryResult { @@ -1422,10 +1520,15 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx log.WarnWithStackTrace(nil, "No intersection found for attestation violation", 0) } for _, v := range inter { - if _, ok := attestationSlashings[queryEntry.Proposer]; !ok { - attestationSlashings[queryEntry.Proposer] = make([]uint64, 0) + if _, ok := slashingValidatorsMap[queryEntry.Proposer]; ok { + if _, ok := attestationSlashings[queryEntry.Proposer]; !ok { + attestationSlashings[queryEntry.Proposer] = make([]uint64, 0) + } + attestationSlashings[queryEntry.Proposer] = append(attestationSlashings[queryEntry.Proposer], uint64(v.(int64))) + } + if _, ok := slashedValidatorsMap[uint64(v.(int64))]; ok { + attestationSlashed[uint64(v.(int64))] = queryEntry.Proposer } - attestationSlashings[queryEntry.Proposer] = append(attestationSlashings[queryEntry.Proposer], uint64(v.(int64))) } } return nil @@ -1459,6 +1562,20 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx }) } + // Fill the slashed validators + for slashedIdx, slashingIdx := range proposalSlashed { + result.GotSlashed = append(result.GotSlashed, GotSlashedStruct{ + Index: slashedIdx, + SlashedBy: slashingIdx, + }) + } + for slashedIdx, slashingIdx := range attestationSlashed { + result.GotSlashed = append(result.GotSlashed, GotSlashedStruct{ + Index: slashedIdx, + SlashedBy: slashingIdx, + }) + } + return result, nil } @@ -1472,34 +1589,56 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c } // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } + var epochQueryResult struct { + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + } + + ds := goqu.Dialect("postgres"). + Select( + goqu.L("epoch_start"), + goqu.L("epoch_end")). + From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). + Limit(1) + + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, fmt.Errorf("error preparing query: %w", err) + } + + err = d.clickhouseReader.GetContext(ctx, &epochQueryResult, query, args...) + if err != nil { + return nil, fmt.Errorf("error retrieving epoch info for proposals: %w", err) + } + // Build the query and get the data var queryResult []struct { Slot uint64 `db:"slot"` Block sql.NullInt64 `db:"exec_block_number"` Status string `db:"status"` - ValidatorIndex uint64 `db:"validator_index"` + ValidatorIndex uint64 `db:"proposer"` } - ds := goqu.Dialect("postgres"). + ds = goqu.Dialect("postgres"). Select( goqu.L("b.slot"), goqu.L("b.exec_block_number"), goqu.L("b.status"), - goqu.L("r.validator_index")). - From(goqu.T(table).As("r")). - InnerJoin(goqu.L("blocks AS b"), goqu.On(goqu.L("b.epoch >= r.epoch_start AND b.epoch <= r.epoch_end AND r.validator_index = b.proposer"))) + goqu.L("b.proposer")). + From(goqu.L("blocks b")). + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", epochQueryResult.EpochStart, epochQueryResult.EpochEnd)) if len(dashboardId.Validators) > 0 { ds = ds. - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(dashboardId.Validators))) + Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))) } else { ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) if groupId != t.AllGroups { @@ -1507,14 +1646,14 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c } } - query, args, err := ds.Prepared(true).ToSQL() + query, args, err = ds.Prepared(true).ToSQL() if err != nil { return nil, fmt.Errorf("error preparing query: %v", err) } err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return nil, fmt.Errorf("error retrieving data from table %s: %v", table, err) + return nil, fmt.Errorf("error retrieving data from table blocks: %v", err) } // Process the data @@ -1591,31 +1730,26 @@ func (d *DataAccessService) getCurrentAndUpcomingSyncCommittees(ctx context.Cont return currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, nil } -func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, string, int, error) { - table := "" - slashedByCountTable := "" +func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, int, error) { + clickhouseTable := "" days := 0 switch period { case enums.TimePeriods.Last24h: - table = "validator_dashboard_data_rolling_daily" - slashedByCountTable = "validator_dashboard_data_rolling_daily_slashedby_count" + clickhouseTable = "validator_dashboard_data_rolling_24h" days = 1 case enums.TimePeriods.Last7d: - table = "validator_dashboard_data_rolling_weekly" - slashedByCountTable = "validator_dashboard_data_rolling_weekly_slashedby_count" + clickhouseTable = "validator_dashboard_data_rolling_7d" days = 7 case enums.TimePeriods.Last30d: - table = "validator_dashboard_data_rolling_monthly" - slashedByCountTable = "validator_dashboard_data_rolling_monthly_slashedby_count" + clickhouseTable = "validator_dashboard_data_rolling_30d" days = 30 case enums.TimePeriods.AllTime: - table = "validator_dashboard_data_rolling_total" - slashedByCountTable = "validator_dashboard_data_rolling_total_slashedby_count" + clickhouseTable = "validator_dashboard_data_rolling_total" days = -1 default: - return "", "", 0, fmt.Errorf("not-implemented time period: %v", period) + return "", 0, fmt.Errorf("not-implemented time period: %v", period) } - return table, slashedByCountTable, days, nil + return clickhouseTable, days, nil }