From 9b2fcc27b1f8e97033208e9542c4d9af17ac5647 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Tue, 23 Jul 2024 12:12:03 +0000 Subject: [PATCH 01/11] initial port of summary data acess funcs to clickhouse --- backend/pkg/api/data_access/vdb_summary.go | 237 +++++++++++---------- 1 file changed, 127 insertions(+), 110 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index c332b16d2..6f29d9f70 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -36,7 +36,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da wg := errgroup.Group{} // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + table, _, clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, nil, err } @@ -104,7 +104,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da var queryResult []struct { GroupId int64 `db:"result_group_id"` GroupName string `db:"group_name"` - ValidatorIndices pq.Int64Array `db:"validator_indices"` + ValidatorIndices []uint64 `db:"validator_indices"` ClRewards int64 `db:"cl_rewards"` AttestationReward decimal.Decimal `db:"attestations_reward"` AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"` @@ -114,113 +114,123 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da BlocksScheduled uint64 `db:"blocks_scheduled"` SyncExecuted uint64 `db:"sync_executed"` SyncScheduled uint64 `db:"sync_scheduled"` + MinEpochStart int64 `db:"min_epoch_start"` + MaxEpochEnd int64 `db:"max_epoch_end"` } - wg.Go(func() error { - ds := goqu.Dialect("postgres"). - Select( - goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), - goqu.L("SUM(COALESCE(r.attestations_reward, 0) + COALESCE(r.blocks_cl_reward, 0) + COALESCE(r.sync_rewards, 0) + COALESCE(r.slasher_reward, 0)) AS cl_rewards"), - goqu.L("COALESCE(SUM(r.attestations_reward)::decimal, 0) AS attestations_reward"), - goqu.L("COALESCE(SUM(r.attestations_ideal_reward)::decimal, 0) AS attestations_ideal_reward"), - goqu.L("COALESCE(SUM(r.attestations_executed), 0) AS attestations_executed"), - goqu.L("COALESCE(SUM(r.attestations_scheduled), 0) AS attestations_scheduled"), - goqu.L("COALESCE(SUM(r.blocks_proposed), 0) AS blocks_proposed"), - goqu.L("COALESCE(SUM(r.blocks_scheduled), 0) AS blocks_scheduled"), - goqu.L("COALESCE(SUM(r.sync_executed), 0) AS sync_executed"), - goqu.L("COALESCE(SUM(r.sync_scheduled), 0) AS sync_scheduled")). - From(goqu.T(table).As("r")). - GroupBy(goqu.L("result_group_id")) - - if len(validators) > 0 { + ds := goqu.Dialect("postgres"). + From(goqu.L(fmt.Sprintf(`"%s" AS "r" FINAL`, clickhouseTable))). + With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). + Select( + goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), + goqu.L("SUM(COALESCE(r.attestations_reward, 0) + COALESCE(r.blocks_cl_reward, 0) + COALESCE(r.sync_rewards, 0) + COALESCE(r.blocks_cl_slasher_reward, 0)) AS cl_rewards"), + goqu.L("COALESCE(SUM(r.attestations_reward)::decimal, 0) AS attestations_reward"), + goqu.L("COALESCE(SUM(r.attestations_ideal_reward)::decimal, 0) AS attestations_ideal_reward"), + goqu.L("COALESCE(SUM(r.attestations_executed), 0) AS attestations_executed"), + goqu.L("COALESCE(SUM(r.attestations_scheduled), 0) AS attestations_scheduled"), + goqu.L("COALESCE(SUM(r.blocks_proposed), 0) AS blocks_proposed"), + goqu.L("COALESCE(SUM(r.blocks_scheduled), 0) AS blocks_scheduled"), + goqu.L("COALESCE(SUM(r.sync_executed), 0) AS sync_executed"), + goqu.L("COALESCE(SUM(r.sync_scheduled), 0) AS sync_scheduled"), + goqu.L("COALESCE(MIN(r.epoch_start), 0) AS min_epoch_start"), + goqu.L("COALESCE(MAX(r.epoch_end), 0) AS max_epoch_end")). + GroupBy(goqu.L("result_group_id")) + + if len(validators) > 0 { + ds = ds. + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). + Where(goqu.L("r.validator_index IN (?)", validators)) + } else { + if dashboardId.AggregateGroups { ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(validators))) + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) } else { - if dashboardId.AggregateGroups { - ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) - } else { - ds = ds. - SelectAppend(goqu.L("v.group_id AS result_group_id")) - } - ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) - - if groupNameSearchEnabled && (search != "" || colSort.Column == enums.VDBSummaryColumns.Group) { - // Get the group names since we can filter and/or sort for them - ds = ds. - SelectAppend(goqu.L("g.name AS group_name")). - InnerJoin(goqu.L("users_val_dashboards_groups g"), goqu.On(goqu.L("v.group_id = g.id AND v.dashboard_id = g.dashboard_id"))). - GroupByAppend(goqu.L("group_name")) - } + SelectAppend(goqu.L("v.group_id AS result_group_id")) } - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %v", err) - } + ds = ds. + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) - if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) + if groupNameSearchEnabled && (search != "" || colSort.Column == enums.VDBSummaryColumns.Group) { + // Get the group names since we can filter and/or sort for them + ds = ds. + SelectAppend(goqu.L("g.name AS group_name")). + InnerJoin(goqu.L("users_val_dashboards_groups g"), goqu.On(goqu.L("v.group_id = g.id AND v.dashboard_id = g.dashboard_id"))). + GroupByAppend(goqu.L("group_name")) } - return nil - }) + } + + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, nil, fmt.Errorf("error preparing query: %v", err) + } + + err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) + if err != nil { + return nil, nil, fmt.Errorf("error retrieving data from table %s: %v", table, err) + } + + epochMin := int64(math.MaxInt64) + epochMax := int64(0) + for _, row := range queryResult { + if row.MinEpochStart < epochMin { + epochMin = row.MinEpochStart + } + if row.MaxEpochEnd > epochMax { + epochMax = row.MaxEpochEnd + } + } // ------------------------------------------------------------------------------------------------------------------ // Get the EL rewards elRewards := make(map[int64]decimal.Decimal) - wg.Go(func() error { - ds := goqu.Dialect("postgres"). - Select( - goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). - From(goqu.T(table).As("r")). - LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("b.epoch >= r.epoch_start AND b.epoch <= r.epoch_end AND r.validator_index = b.proposer AND b.status = '1'"))). - LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))). - LeftJoin(goqu.L("relays_blocks rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash"))). - GroupBy(goqu.L("result_group_id")) - - if len(validators) > 0 { + ds = goqu.Dialect("postgres"). + Select( + goqu.L("SUM(COALESCE(rb.value, ep.fee_recipient_reward * 1e18, 0)) AS el_rewards")). + From(goqu.L("blocks b")). + LeftJoin(goqu.L("execution_payloads ep"), goqu.On(goqu.L("ep.block_hash = b.exec_block_hash"))). + LeftJoin(goqu.L("relays_blocks rb"), goqu.On(goqu.L("rb.exec_block_hash = b.exec_block_hash"))). + Where(goqu.L("b.epoch >= ? AND b.epoch <= ? AND b.status = '1'", epochMin, epochMax)). + GroupBy(goqu.L("result_group_id")) + + if len(validators) > 0 { + ds = ds. + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). + Where(goqu.L("b.proposer = ANY(?)", pq.Array(validators))) + } else { + if dashboardId.AggregateGroups { ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(validators))) + SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) } else { - if dashboardId.AggregateGroups { - ds = ds. - SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)) - } else { - ds = ds. - SelectAppend(goqu.L("v.group_id AS result_group_id")) - } - ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + SelectAppend(goqu.L("v.group_id AS result_group_id")) } - var queryResult []struct { - GroupId int64 `db:"result_group_id"` - ElRewards decimal.Decimal `db:"el_rewards"` - } + ds = ds. + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). + Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + } - query, args, err := ds.Prepared(true).ToSQL() - if err != nil { - return fmt.Errorf("error preparing query: %v", err) - } + var elRewardsQueryResult []struct { + GroupId int64 `db:"result_group_id"` + ElRewards decimal.Decimal `db:"el_rewards"` + } - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) - if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) - } + query, args, err = ds.Prepared(true).ToSQL() + if err != nil { + return nil, nil, fmt.Errorf("error preparing query: %v", err) + } - for _, entry := range queryResult { - elRewards[entry.GroupId] = entry.ElRewards - } - return nil - }) + err = d.alloyReader.SelectContext(ctx, &elRewardsQueryResult, query, args...) + if err != nil { + return nil, nil, fmt.Errorf("error retrieving data from table %s: %v", table, err) + } + + for _, entry := range elRewardsQueryResult { + elRewards[entry.GroupId] = entry.ElRewards + } // ------------------------------------------------------------------------------------------------------------------ // Get the current and next sync committee validators @@ -272,7 +282,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da for _, queryEntry := range queryResult { uiValidatorIndices := make([]uint64, len(queryEntry.ValidatorIndices)) for i, validatorIndex := range queryEntry.ValidatorIndices { - uiValidatorIndices[i] = uint64(validatorIndex) + uiValidatorIndices[i] = validatorIndex } resultEntry := t.VDBSummaryTableRow{ @@ -496,13 +506,17 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } // Get the table names based on the period - table, slashedByCountTable, days, err := d.getTablesForPeriod(period) + _, _, clickhouseTable, days, err := d.getTablesForPeriod(period) if err != nil { return nil, err } - query := `select - users_val_dashboards_validators.validator_index, + query := ` + WITH validators AS ( + SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE (dashboard_id = $1 and (group_id = $2 OR $2 = -1)) + ) + select + validator_index, epoch_start, COALESCE(attestations_reward, 0) as attestations_reward, COALESCE(attestations_ideal_reward, 0) as attestations_ideal_reward, @@ -514,15 +528,14 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(blocks_proposed, 0) as blocks_proposed, COALESCE(sync_scheduled, 0) as sync_scheduled, COALESCE(sync_executed, 0) as sync_executed, - %[1]s.slashed_by IS NOT NULL AS slashed_in_period, - COALESCE(%[2]s.slashed_amount, 0) AS slashed_amount, + COALESCE(CASE WHEN slashed THEN 1 ELSE 0 END, 0) AS slashed_in_period, + COALESCE(blocks_slashing_count, 0) AS slashed_amount, COALESCE(blocks_expected, 0) as blocks_expected, COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected - from users_val_dashboards_validators - inner join %[1]s on %[1]s.validator_index = users_val_dashboards_validators.validator_index - left join %[2]s on %[2]s.slashed_by = users_val_dashboards_validators.validator_index - where (dashboard_id = $1 and (group_id = $2 OR $2 = -1)) + from %[1]s + inner join validators v on %[1]s.validator_index = v.validator_index + where validator_index IN (select validator_index FROM validators) ` if dashboardId.Validators != nil { @@ -539,14 +552,13 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(blocks_proposed, 0) as blocks_proposed, COALESCE(sync_scheduled, 0) as sync_scheduled, COALESCE(sync_executed, 0) as sync_executed, - %[1]s.slashed_by IS NOT NULL AS slashed_in_period, - COALESCE(%[2]s.slashed_amount, 0) AS slashed_amount, + COALESCE(CASE WHEN slashed THEN 1 ELSE 0 END, 0) AS slashed_in_period, + COALESCE(blocks_slashing_count, 0) AS slashed_amount, COALESCE(blocks_expected, 0) as blocks_expected, COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected from %[1]s - left join %[2]s on %[2]s.slashed_by = %[1]s.validator_index - where %[1]s.validator_index = ANY($1) + where %[1]s.validator_index IN($1) ` } @@ -584,9 +596,9 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex var rows []*queryResult if len(validators) > 0 { - err = d.alloyReader.SelectContext(ctx, &rows, fmt.Sprintf(query, table, slashedByCountTable), validators) + err = d.clickhouseReader.SelectContext(ctx, &rows, fmt.Sprintf(query, clickhouseTable), validators) } else { - err = d.alloyReader.SelectContext(ctx, &rows, fmt.Sprintf(query, table, slashedByCountTable), dashboardId.Id, groupId) + err = d.clickhouseReader.SelectContext(ctx, &rows, fmt.Sprintf(query, clickhouseTable), dashboardId.Id, groupId) } if err != nil { @@ -1118,7 +1130,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte wg := errgroup.Group{} // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + table, _, _, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1236,7 +1248,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } // Get the table names based on the period - table, slashedByCountTable, _, err := d.getTablesForPeriod(period) + table, slashedByCountTable, _, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1436,7 +1448,7 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c } // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + table, _, _, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1555,31 +1567,36 @@ func (d *DataAccessService) getCurrentAndUpcomingSyncCommittees(ctx context.Cont return currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, nil } -func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, string, int, error) { +func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, string, string, int, error) { table := "" + clickhouseTable := "" slashedByCountTable := "" days := 0 switch period { case enums.TimePeriods.Last24h: table = "validator_dashboard_data_rolling_daily" + clickhouseTable = "validator_dashboard_data_rolling_24h" slashedByCountTable = "validator_dashboard_data_rolling_daily_slashedby_count" days = 1 case enums.TimePeriods.Last7d: table = "validator_dashboard_data_rolling_weekly" + clickhouseTable = "validator_dashboard_data_rolling_7d" slashedByCountTable = "validator_dashboard_data_rolling_weekly_slashedby_count" days = 7 case enums.TimePeriods.Last30d: table = "validator_dashboard_data_rolling_monthly" + clickhouseTable = "validator_dashboard_data_rolling_30d" slashedByCountTable = "validator_dashboard_data_rolling_monthly_slashedby_count" days = 30 case enums.TimePeriods.AllTime: table = "validator_dashboard_data_rolling_total" + clickhouseTable = "validator_dashboard_data_rolling_total" slashedByCountTable = "validator_dashboard_data_rolling_total_slashedby_count" days = -1 default: - return "", "", 0, fmt.Errorf("not-implemented time period: %v", period) + return "", "", "", 0, fmt.Errorf("not-implemented time period: %v", period) } - return table, slashedByCountTable, days, nil + return table, slashedByCountTable, clickhouseTable, days, nil } From 660ec748b1f5831baffc643429b3d62ef5be6839 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Tue, 23 Jul 2024 12:48:26 +0000 Subject: [PATCH 02/11] fix query --- backend/pkg/api/data_access/vdb_summary.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 210ef9a8a..9c9e9e37b 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -538,12 +538,12 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(blocks_proposed, 0) as blocks_proposed, COALESCE(sync_scheduled, 0) as sync_scheduled, COALESCE(sync_executed, 0) as sync_executed, - COALESCE(CASE WHEN slashed THEN 1 ELSE 0 END, 0) AS slashed_in_period, + slashed AS slashed_in_period, COALESCE(blocks_slashing_count, 0) AS slashed_amount, COALESCE(blocks_expected, 0) as blocks_expected, COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected - from %[1]s + from %[1]s FINAL inner join validators v on %[1]s.validator_index = v.validator_index where validator_index IN (select validator_index FROM validators) ` @@ -562,12 +562,12 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(blocks_proposed, 0) as blocks_proposed, COALESCE(sync_scheduled, 0) as sync_scheduled, COALESCE(sync_executed, 0) as sync_executed, - COALESCE(CASE WHEN slashed THEN 1 ELSE 0 END, 0) AS slashed_in_period, + slashed AS slashed_in_period, COALESCE(blocks_slashing_count, 0) AS slashed_amount, COALESCE(blocks_expected, 0) as blocks_expected, COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected - from %[1]s + from %[1]s FINAL where %[1]s.validator_index IN($1) ` } From 5ce588905a7cc30d6f04b4f9b42442fa8b2311b2 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Tue, 30 Jul 2024 18:16:54 +0200 Subject: [PATCH 03/11] Clickhouse for the validaltor slashings --- backend/pkg/api/data_access/vdb_summary.go | 28 +++++++++------------- 1 file changed, 11 insertions(+), 17 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 9c9e9e37b..2030d0084 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -36,7 +36,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da wg := errgroup.Group{} // Get the table name based on the period - table, _, clickhouseTable, _, err := d.getTablesForPeriod(period) + table, clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, nil, err } @@ -516,7 +516,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } // Get the table names based on the period - _, _, clickhouseTable, days, err := d.getTablesForPeriod(period) + _, clickhouseTable, days, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1140,7 +1140,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte wg := errgroup.Group{} // Get the table name based on the period - table, _, _, _, err := d.getTablesForPeriod(period) + table, _, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1258,7 +1258,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } // Get the table names based on the period - table, slashedByCountTable, _, _, err := d.getTablesForPeriod(period) + table, _, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1282,10 +1282,9 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("r.epoch_end"), goqu.L("r.validator_index"), goqu.L("r.slashed_by"), - goqu.L("COALESCE(s.slashed_amount, 0) AS slashed_amount")). + goqu.L("COALESCE(r.blocks_slashing_count, 0) AS slashed_amount")). From(goqu.T(table).As("r")). - LeftJoin(goqu.T(slashedByCountTable).As("s"), goqu.On(goqu.L("r.validator_index = s.slashed_by"))). - Where(goqu.L("(r.slashed_by IS NOT NULL OR s.slashed_amount > 0)")) + Where(goqu.L("(r.slashed_by IS NOT NULL OR r.blocks_slashing_count > 0)")) // handle the case when we have a list of validators if len(dashboardId.Validators) > 0 { @@ -1306,7 +1305,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx return nil, err } - err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) + err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { log.Error(err, "error while getting validator dashboard slashed validators list", 0) return nil, err @@ -1458,7 +1457,7 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c } // Get the table name based on the period - table, _, _, _, err := d.getTablesForPeriod(period) + table, _, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1577,36 +1576,31 @@ func (d *DataAccessService) getCurrentAndUpcomingSyncCommittees(ctx context.Cont return currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, nil } -func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, string, string, int, error) { +func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, string, int, error) { table := "" clickhouseTable := "" - slashedByCountTable := "" days := 0 switch period { case enums.TimePeriods.Last24h: table = "validator_dashboard_data_rolling_daily" clickhouseTable = "validator_dashboard_data_rolling_24h" - slashedByCountTable = "validator_dashboard_data_rolling_daily_slashedby_count" days = 1 case enums.TimePeriods.Last7d: table = "validator_dashboard_data_rolling_weekly" clickhouseTable = "validator_dashboard_data_rolling_7d" - slashedByCountTable = "validator_dashboard_data_rolling_weekly_slashedby_count" days = 7 case enums.TimePeriods.Last30d: table = "validator_dashboard_data_rolling_monthly" clickhouseTable = "validator_dashboard_data_rolling_30d" - slashedByCountTable = "validator_dashboard_data_rolling_monthly_slashedby_count" days = 30 case enums.TimePeriods.AllTime: table = "validator_dashboard_data_rolling_total" clickhouseTable = "validator_dashboard_data_rolling_total" - slashedByCountTable = "validator_dashboard_data_rolling_total_slashedby_count" days = -1 default: - return "", "", "", 0, fmt.Errorf("not-implemented time period: %v", period) + return "", "", 0, fmt.Errorf("not-implemented time period: %v", period) } - return table, slashedByCountTable, clickhouseTable, days, nil + return table, clickhouseTable, days, nil } From fa94779da6aee70c8ce0cd0fc27025a865d8d438 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Wed, 31 Jul 2024 13:21:30 +0200 Subject: [PATCH 04/11] CH for validator lists & fixes & cleanup --- backend/pkg/api/data_access/vdb_summary.go | 97 +++++++++++++--------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 2030d0084..dba3c53cf 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -17,7 +17,6 @@ import ( "github.com/gobitfly/beaconchain/pkg/api/enums" t "github.com/gobitfly/beaconchain/pkg/api/types" "github.com/gobitfly/beaconchain/pkg/commons/cache" - "github.com/gobitfly/beaconchain/pkg/commons/db" "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/utils" constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" @@ -36,7 +35,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da wg := errgroup.Group{} // Get the table name based on the period - table, clickhouseTable, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, nil, err } @@ -120,7 +119,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da ds := goqu.Dialect("postgres"). From(goqu.L(fmt.Sprintf(`"%s" AS "r" FINAL`, clickhouseTable))). - With("validators", goqu.L("(SELECT validator_index as validator_index, group_id FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). + With("validators", goqu.L("(SELECT dashboard_id, group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), goqu.L("SUM(COALESCE(r.attestations_reward, 0) + COALESCE(r.blocks_cl_reward, 0) + COALESCE(r.sync_rewards, 0) + COALESCE(r.blocks_cl_slasher_reward, 0)) AS cl_rewards"), @@ -169,7 +168,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da err = d.clickhouseReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return nil, nil, fmt.Errorf("error retrieving data from table %s: %v", table, err) + return nil, nil, fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err) } epochMin := int64(math.MaxInt64) @@ -225,7 +224,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da err = d.alloyReader.SelectContext(ctx, &elRewardsQueryResult, query, args...) if err != nil { - return nil, nil, fmt.Errorf("error retrieving data from table %s: %v", table, err) + return nil, nil, fmt.Errorf("error retrieving data from table blocks: %v", err) } for _, entry := range elRewardsQueryResult { @@ -516,7 +515,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } // Get the table names based on the period - _, clickhouseTable, days, err := d.getTablesForPeriod(period) + clickhouseTable, days, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -568,7 +567,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(inclusion_delay_sum, 0) as inclusion_delay_sum, COALESCE(sync_committees_expected, 0) as sync_committees_expected from %[1]s FINAL - where %[1]s.validator_index IN($1) + where %[1]s.validator_index IN ($1) ` } @@ -750,20 +749,24 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators switch days { case 1: - table = "validator_dashboard_data_rolling_daily" + table = "validator_dashboard_data_rolling_24h" case 7: - table = "validator_dashboard_data_rolling_weekly" + table = "validator_dashboard_data_rolling_7d" case 30: - table = "validator_dashboard_data_rolling_monthly" + table = "validator_dashboard_data_rolling_30d" case -1: table = "validator_dashboard_data_rolling_90d" default: return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("invalid days value: %v", days) } - query := `select (SUM(COALESCE(balance_end,0)) + SUM(COALESCE(withdrawals_amount,0)) - SUM(COALESCE(deposits_amount,0)) - SUM(COALESCE(balance_start,0))) reward FROM %s WHERE validator_index = ANY($1)` + query := ` + SELECT + (SUM(COALESCE(balance_end,0)) + SUM(COALESCE(withdrawals_amount,0)) - SUM(COALESCE(deposits_amount,0)) - SUM(COALESCE(balance_start,0))) AS reward + FROM %s + WHERE validator_index IN ($1)` - err = db.AlloyReader.GetContext(ctx, &reward, fmt.Sprintf(query, table), validators) + err = d.clickhouseReader.GetContext(ctx, &reward, fmt.Sprintf(query, table), validators) if err != nil || !reward.Valid { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -777,7 +780,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators clAPR = 0 } if days == -1 { - err = db.AlloyReader.GetContext(ctx, &reward, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators) + err = d.clickhouseReader.GetContext(ctx, &reward, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators) if err != nil || !reward.Valid { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -791,7 +794,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators LEFT JOIN execution_payloads ON blocks.exec_block_hash = execution_payloads.block_hash LEFT JOIN relays_blocks rb ON blocks.exec_block_hash = rb.exec_block_hash WHERE proposer = ANY($1) AND status = '1' AND slot >= (SELECT MIN(epoch_start) * $2 FROM %s WHERE validator_index = ANY($1));` - err = db.AlloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, table), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) + err = d.alloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, table), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -799,7 +802,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators elAPR = ((elIncomeFloat / float64(aprDivisor)) / (float64(32e18) * float64(len(validators)))) * 365.0 * 100.0 if days == -1 { - err = db.AlloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) + err = d.alloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -1140,7 +1143,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte wg := errgroup.Group{} // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1184,7 +1187,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte ds := goqu.Dialect("postgres"). Select( goqu.L("epoch_start")). - From(table). + From(clickhouseTable). Limit(1) query, args, err := ds.Prepared(true).ToSQL() @@ -1193,7 +1196,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte } var epochStart uint64 - err = d.alloyReader.GetContext(ctx, &epochStart, query, args...) + err = d.clickhouseReader.GetContext(ctx, &epochStart, query, args...) if err != nil { return fmt.Errorf("error retrieving cutoff epoch for past sync committees: %w", err) } @@ -1205,7 +1208,6 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte Select( goqu.L("sc.validatorindex")). From(goqu.L("sync_committees sc")). - LeftJoin(goqu.I(table).As("r"), goqu.On(goqu.L("sc.validatorindex = r.validator_index"))). Where(goqu.L("period >= ? AND period < ? AND validatorindex = ANY(?)", pastSyncPeriodCutoff, currentSyncPeriod, pq.Array(validatorIndices))) query, args, err = ds.Prepared(true).ToSQL() @@ -1258,7 +1260,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } // Get the table names based on the period - table, _, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } @@ -1283,7 +1285,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("r.validator_index"), goqu.L("r.slashed_by"), goqu.L("COALESCE(r.blocks_slashing_count, 0) AS slashed_amount")). - From(goqu.T(table).As("r")). + From(goqu.T(clickhouseTable).As("r")). Where(goqu.L("(r.slashed_by IS NOT NULL OR r.blocks_slashing_count > 0)")) // handle the case when we have a list of validators @@ -1362,7 +1364,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) + return fmt.Errorf("error retrieving data from table blocks_proposerslashings: %v", err) } for _, queryEntry := range queryResult { @@ -1398,7 +1400,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return fmt.Errorf("error retrieving data from table %s: %v", table, err) + return fmt.Errorf("error retrieving data from table blocks_attesterslashings: %v", err) } for _, queryEntry := range queryResult { @@ -1457,11 +1459,33 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c } // Get the table name based on the period - table, _, _, err := d.getTablesForPeriod(period) + clickhouseTable, _, err := d.getTablesForPeriod(period) if err != nil { return nil, err } + var epochQueryResult struct { + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + } + + ds := goqu.Dialect("postgres"). + Select( + goqu.L("epoch_start"), + goqu.L("epoch_end")). + From(clickhouseTable). + Limit(1) + + query, args, err := ds.Prepared(true).ToSQL() + if err != nil { + return nil, fmt.Errorf("error preparing query: %w", err) + } + + err = d.clickhouseReader.GetContext(ctx, &epochQueryResult, query, args...) + if err != nil { + return nil, fmt.Errorf("error retrieving epoch info for proposals: %w", err) + } + // Build the query and get the data var queryResult []struct { Slot uint64 `db:"slot"` @@ -1470,21 +1494,21 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c ValidatorIndex uint64 `db:"validator_index"` } - ds := goqu.Dialect("postgres"). + ds = goqu.Dialect("postgres"). Select( goqu.L("b.slot"), goqu.L("b.exec_block_number"), goqu.L("b.status"), - goqu.L("r.validator_index")). - From(goqu.T(table).As("r")). - InnerJoin(goqu.L("blocks AS b"), goqu.On(goqu.L("b.epoch >= r.epoch_start AND b.epoch <= r.epoch_end AND r.validator_index = b.proposer"))) + goqu.L("b.proposer")). + From(goqu.L("blocks b")). + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?")) if len(dashboardId.Validators) > 0 { ds = ds. Where(goqu.L("r.validator_index = ANY(?)", pq.Array(dashboardId.Validators))) } else { ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) if groupId != t.AllGroups { @@ -1492,14 +1516,14 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c } } - query, args, err := ds.Prepared(true).ToSQL() + query, args, err = ds.Prepared(true).ToSQL() if err != nil { return nil, fmt.Errorf("error preparing query: %v", err) } err = d.alloyReader.SelectContext(ctx, &queryResult, query, args...) if err != nil { - return nil, fmt.Errorf("error retrieving data from table %s: %v", table, err) + return nil, fmt.Errorf("error retrieving data from table blocks: %v", err) } // Process the data @@ -1576,31 +1600,26 @@ func (d *DataAccessService) getCurrentAndUpcomingSyncCommittees(ctx context.Cont return currentSyncCommitteeValidators, upcomingSyncCommitteeValidators, nil } -func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, string, int, error) { - table := "" +func (d *DataAccessService) getTablesForPeriod(period enums.TimePeriod) (string, int, error) { clickhouseTable := "" days := 0 switch period { case enums.TimePeriods.Last24h: - table = "validator_dashboard_data_rolling_daily" clickhouseTable = "validator_dashboard_data_rolling_24h" days = 1 case enums.TimePeriods.Last7d: - table = "validator_dashboard_data_rolling_weekly" clickhouseTable = "validator_dashboard_data_rolling_7d" days = 7 case enums.TimePeriods.Last30d: - table = "validator_dashboard_data_rolling_monthly" clickhouseTable = "validator_dashboard_data_rolling_30d" days = 30 case enums.TimePeriods.AllTime: - table = "validator_dashboard_data_rolling_total" clickhouseTable = "validator_dashboard_data_rolling_total" days = -1 default: - return "", "", 0, fmt.Errorf("not-implemented time period: %v", period) + return "", 0, fmt.Errorf("not-implemented time period: %v", period) } - return table, clickhouseTable, days, nil + return clickhouseTable, days, nil } From 11aecba8ac6d22f73397ff60ea64a2db431abbf7 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Wed, 31 Jul 2024 17:46:57 +0200 Subject: [PATCH 05/11] Rewrote APR calculation for CH --- backend/pkg/api/data_access/vdb_management.go | 2 +- backend/pkg/api/data_access/vdb_summary.go | 122 ++++++++++++++---- 2 files changed, 98 insertions(+), 26 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_management.go b/backend/pkg/api/data_access/vdb_management.go index 03185c73a..9762e539e 100644 --- a/backend/pkg/api/data_access/vdb_management.go +++ b/backend/pkg/api/data_access/vdb_management.go @@ -305,7 +305,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d retrieveRewardsAndEfficiency := func(table string, days int, rewards *t.ClElValue[decimal.Decimal], apr *t.ClElValue[float64], efficiency *float64) { // Rewards + APR wg.Go(func() error { - (*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, validators, days) + (*rewards).El, (*apr).El, (*rewards).Cl, (*apr).Cl, err = d.internal_getElClAPR(ctx, dashboardId, -1, days) if err != nil { return err } diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index dba3c53cf..563d9b780 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -684,7 +684,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex } } - _, ret.Apr.El, _, ret.Apr.Cl, err = d.internal_getElClAPR(ctx, validatorArr, days) + _, ret.Apr.El, _, ret.Apr.Cl, err = d.internal_getElClAPR(ctx, dashboardId, groupId, days) if err != nil { return nil, err } @@ -743,8 +743,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex return ret, nil } -func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators []t.VDBValidator, days int) (elIncome decimal.Decimal, elAPR float64, clIncome decimal.Decimal, clAPR float64, err error) { - var reward sql.NullInt64 +func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId t.VDBId, groupId int64, days int) (elIncome decimal.Decimal, elAPR float64, clIncome decimal.Decimal, clAPR float64, err error) { table := "" switch days { @@ -760,14 +759,45 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("invalid days value: %v", days) } - query := ` - SELECT - (SUM(COALESCE(balance_end,0)) + SUM(COALESCE(withdrawals_amount,0)) - SUM(COALESCE(deposits_amount,0)) - SUM(COALESCE(balance_start,0))) AS reward - FROM %s - WHERE validator_index IN ($1)` + type RewardsResult struct { + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + ValidatorCount uint64 `db:"validator_count"` + Reward sql.NullInt64 `db:"reward"` + } + + var rewardsResultTable RewardsResult + var rewardsResultTotal RewardsResult + + rewardsDs := goqu.Dialect("postgres"). + Select( + goqu.L("MIN(epoch_start) AS epoch_start"), + goqu.L("MAX(epoch_end) AS epoch_end"), + goqu.L("COUNT(*) AS validator_count"), + goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS reward")). + From(goqu.T(table).As("r")) + + if len(dashboardId.Validators) > 0 { + rewardsDs = rewardsDs. + Where(goqu.L("validator_index IN (?)", dashboardId.Validators)) + } else { + rewardsDs = rewardsDs. + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + + if groupId != -1 { + rewardsDs = rewardsDs. + Where(goqu.L("v.group_id = ?", groupId)) + } + } - err = d.clickhouseReader.GetContext(ctx, &reward, fmt.Sprintf(query, table), validators) - if err != nil || !reward.Valid { + query, args, err := rewardsDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.clickhouseReader.GetContext(ctx, &rewardsResultTable, query, args...) + if err != nil || !rewardsResultTable.Reward.Valid { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -775,34 +805,76 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, validators if days == -1 { // for all time APR aprDivisor = 90 } - clAPR = ((float64(reward.Int64) / float64(aprDivisor)) / (float64(32e9) * float64(len(validators)))) * 365.0 * 100.0 + clAPR = ((float64(rewardsResultTable.Reward.Int64) / float64(aprDivisor)) / (float64(32e9) * float64(rewardsResultTable.ValidatorCount))) * 365.0 * 100.0 if math.IsNaN(clAPR) { clAPR = 0 } + + clIncome = decimal.NewFromInt(rewardsResultTable.Reward.Int64).Mul(decimal.NewFromInt(1e9)) + if days == -1 { - err = d.clickhouseReader.GetContext(ctx, &reward, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators) - if err != nil || !reward.Valid { + rewardsDs = rewardsDs. + From(goqu.L("validator_dashboard_data_rolling_total AS r")) + + query, args, err = rewardsDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.clickhouseReader.GetContext(ctx, &rewardsResultTotal, query, args...) + if err != nil || !rewardsResultTotal.Reward.Valid { return decimal.Zero, 0, decimal.Zero, 0, err } + + clIncome = decimal.NewFromInt(rewardsResultTotal.Reward.Int64).Mul(decimal.NewFromInt(1e9)) } - clIncome = decimal.NewFromInt(reward.Int64).Mul(decimal.NewFromInt(1e9)) - query = ` - SELECT - COALESCE(SUM(COALESCE(rb.value / 1e18, fee_recipient_reward)), 0) - FROM blocks - LEFT JOIN execution_payloads ON blocks.exec_block_hash = execution_payloads.block_hash - LEFT JOIN relays_blocks rb ON blocks.exec_block_hash = rb.exec_block_hash - WHERE proposer = ANY($1) AND status = '1' AND slot >= (SELECT MIN(epoch_start) * $2 FROM %s WHERE validator_index = ANY($1));` - err = d.alloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, table), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) + elDs := goqu.Dialect("postgres"). + Select(goqu.L("SUM(COALESCE(rb.value / 1e18, ep.fee_recipient_reward, 0)) AS el_reward")). + From(goqu.L("blocks AS b")). + LeftJoin(goqu.L("execution_payloads AS ep"), goqu.On(goqu.L("b.exec_block_hash = ep.block_hash"))). + LeftJoin(goqu.L("relays_blocks AS rb"), goqu.On(goqu.L("b.exec_block_hash = rb.exec_block_hash"))). + Where(goqu.L("b.status = '1'")) + + if len(dashboardId.Validators) > 0 { + elDs = elDs. + Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))) + } else { + elDs = elDs. + InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). + Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + + if groupId != -1 { + elDs = elDs. + Where(goqu.L("v.group_id = ?", groupId)) + } + } + + elTableDs := elDs. + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", rewardsResultTable.EpochStart, rewardsResultTable.EpochEnd)) + + query, args, err = elTableDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.alloyReader.GetContext(ctx, &elIncome, query, args...) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } elIncomeFloat, _ := elIncome.Float64() - elAPR = ((elIncomeFloat / float64(aprDivisor)) / (float64(32e18) * float64(len(validators)))) * 365.0 * 100.0 + elAPR = ((elIncomeFloat / float64(aprDivisor)) / (float64(32e18) * float64(rewardsResultTable.ValidatorCount))) * 365.0 * 100.0 if days == -1 { - err = d.alloyReader.GetContext(ctx, &elIncome, fmt.Sprintf(query, "validator_dashboard_data_rolling_total"), validators, utils.Config.Chain.ClConfig.SlotsPerEpoch) + elTotalDs := elDs. + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", rewardsResultTotal.EpochStart, rewardsResultTotal.EpochEnd)) + + query, args, err = elTotalDs.Prepared(true).ToSQL() + if err != nil { + return decimal.Zero, 0, decimal.Zero, 0, fmt.Errorf("error preparing query: %v", err) + } + + err = d.alloyReader.GetContext(ctx, &elIncome, query, args...) if err != nil { return decimal.Zero, 0, decimal.Zero, 0, err } @@ -1501,7 +1573,7 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c goqu.L("b.status"), goqu.L("b.proposer")). From(goqu.L("blocks b")). - Where(goqu.L("b.epoch >= ? AND b.epoch <= ?")) + Where(goqu.L("b.epoch >= ? AND b.epoch <= ?", epochQueryResult.EpochStart, epochQueryResult.EpochEnd)) if len(dashboardId.Validators) > 0 { ds = ds. From 945f1132447376ea762d8d1e217963b6c67e7fb4 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Thu, 1 Aug 2024 11:10:04 +0200 Subject: [PATCH 06/11] Slashing validator list for CH --- backend/pkg/api/data_access/vdb_summary.go | 94 +++++++++++++++------- 1 file changed, 64 insertions(+), 30 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 563d9b780..fd3b25b7d 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -118,7 +118,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da } ds := goqu.Dialect("postgres"). - From(goqu.L(fmt.Sprintf(`"%s" AS "r" FINAL`, clickhouseTable))). + From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))). With("validators", goqu.L("(SELECT dashboard_id, group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), @@ -775,7 +775,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId goqu.L("MAX(epoch_end) AS epoch_end"), goqu.L("COUNT(*) AS validator_count"), goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS reward")). - From(goqu.T(table).As("r")) + From(goqu.L(fmt.Sprintf("%s AS r FINAL", table))) if len(dashboardId.Validators) > 0 { rewardsDs = rewardsDs. @@ -814,7 +814,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId if days == -1 { rewardsDs = rewardsDs. - From(goqu.L("validator_dashboard_data_rolling_total AS r")) + From(goqu.L("validator_dashboard_data_rolling_total AS r FINAL")) query, args, err = rewardsDs.Prepared(true).ToSQL() if err != nil { @@ -1259,7 +1259,7 @@ func (d *DataAccessService) GetValidatorDashboardSyncSummaryValidators(ctx conte ds := goqu.Dialect("postgres"). Select( goqu.L("epoch_start")). - From(clickhouseTable). + From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). Limit(1) query, args, err := ds.Prepared(true).ToSQL() @@ -1343,11 +1343,11 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } var queryResult []struct { - EpochStart uint64 `db:"epoch_start"` - EpochEnd uint64 `db:"epoch_end"` - ValidatorIndex uint64 `db:"validator_index"` - SlashedBy sql.NullInt64 `db:"slashed_by"` - SlashedAmount uint32 `db:"slashed_amount"` + EpochStart uint64 `db:"epoch_start"` + EpochEnd uint64 `db:"epoch_end"` + ValidatorIndex uint64 `db:"validator_index"` + Slashed bool `db:"slashed"` + SlashedAmount uint32 `db:"slashed_amount"` } // Build the query @@ -1355,10 +1355,10 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("r.epoch_start"), goqu.L("r.epoch_end"), goqu.L("r.validator_index"), - goqu.L("r.slashed_by"), + goqu.L("r.slashed"), goqu.L("COALESCE(r.blocks_slashing_count, 0) AS slashed_amount")). - From(goqu.T(clickhouseTable).As("r")). - Where(goqu.L("(r.slashed_by IS NOT NULL OR r.blocks_slashing_count > 0)")) + From(goqu.L(fmt.Sprintf("%s AS r FINAL", clickhouseTable))). + Where(goqu.L("(r.slashed OR r.blocks_slashing_count > 0)")) // handle the case when we have a list of validators if len(dashboardId.Validators) > 0 { @@ -1387,27 +1387,30 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx // Process the data and get the slashing validators var slashingValidators []uint64 + var slashedValidators []uint64 for _, queryEntry := range queryResult { - if queryEntry.SlashedBy.Valid { - result.GotSlashed = append(result.GotSlashed, GotSlashedStruct{ - Index: queryEntry.ValidatorIndex, - SlashedBy: uint64(queryEntry.SlashedBy.Int64), - }) - } - if queryEntry.SlashedAmount > 0 { slashingValidators = append(slashingValidators, queryEntry.ValidatorIndex) } + + if queryEntry.Slashed { + slashedValidators = append(slashingValidators, queryEntry.ValidatorIndex) + } } - if len(slashingValidators) == 0 { - // We don't have any slashing validators so we can return early + if len(slashingValidators) == 0 && len(slashedValidators) == 0 { + // We don't have any slashing or slashed validators so we can return early return result, nil } + slashingValidatorsMap := utils.SliceToMap(slashingValidators) + slashedValidatorsMap := utils.SliceToMap(slashedValidators) + // If we have slashing validators then get the validators that got slashed proposalSlashings := make(map[uint64][]uint64) + proposalSlashed := make(map[uint64]uint64) attestationSlashings := make(map[uint64][]uint64) + attestationSlashed := make(map[uint64]uint64) slotStart := queryResult[0].EpochStart * utils.Config.Chain.ClConfig.SlotsPerEpoch slotEnd := (queryResult[0].EpochEnd+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch - 1 @@ -1427,7 +1430,8 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("bps.proposerindex")). From(goqu.L("blocks_proposerslashings bps")). LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("b.slot = bps.block_slot"))). - Where(goqu.L("bps.block_slot >= ? AND bps.block_slot <= ? AND b.proposer = ANY(?)", slotStart, slotEnd, pq.Array(slashingValidators))) + Where(goqu.L("bps.block_slot >= ? AND bps.block_slot <= ?", slotStart, slotEnd)). + Where(goqu.L("(b.proposer = ANY(?) OR bps.proposerindex = ANY(?))", pq.Array(slashingValidators), pq.Array(slashedValidators))) query, args, err := ds.Prepared(true).ToSQL() if err != nil { @@ -1440,10 +1444,15 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } for _, queryEntry := range queryResult { - if _, ok := proposalSlashings[queryEntry.ProposerSlashing]; !ok { - proposalSlashings[queryEntry.ProposerSlashing] = make([]uint64, 0) + if _, ok := slashingValidatorsMap[queryEntry.ProposerSlashing]; ok { + if _, ok := proposalSlashings[queryEntry.ProposerSlashing]; !ok { + proposalSlashings[queryEntry.ProposerSlashing] = make([]uint64, 0) + } + proposalSlashings[queryEntry.ProposerSlashing] = append(proposalSlashings[queryEntry.ProposerSlashing], queryEntry.ProposerSlashed) + } + if _, ok := slashedValidatorsMap[queryEntry.ProposerSlashed]; ok { + proposalSlashed[queryEntry.ProposerSlashed] = queryEntry.ProposerSlashing } - proposalSlashings[queryEntry.ProposerSlashing] = append(proposalSlashings[queryEntry.ProposerSlashing], queryEntry.ProposerSlashed) } return nil }) @@ -1463,7 +1472,13 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx goqu.L("bas.attestation2_indices")). From(goqu.L("blocks_attesterslashings bas")). LeftJoin(goqu.L("blocks b"), goqu.On(goqu.L("b.slot = bas.block_slot"))). - Where(goqu.L("bas.block_slot >= ? AND bas.block_slot <= ? AND b.proposer = ANY(?)", slotStart, slotEnd, pq.Array(slashingValidators))) + Where(goqu.L("bas.block_slot >= ? AND bas.block_slot <= ?", slotStart, slotEnd)) + + if len(slashedValidators) == 0 { + // If we don't have any slashed validators then we can just get the slashing validators + ds = ds. + Where(goqu.L("b.proposer = ANY(?)", pq.Array(slashingValidators))) + } query, args, err := ds.Prepared(true).ToSQL() if err != nil { @@ -1481,10 +1496,15 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx log.WarnWithStackTrace(nil, "No intersection found for attestation violation", 0) } for _, v := range inter { - if _, ok := attestationSlashings[queryEntry.Proposer]; !ok { - attestationSlashings[queryEntry.Proposer] = make([]uint64, 0) + if _, ok := slashingValidatorsMap[queryEntry.Proposer]; ok { + if _, ok := attestationSlashings[queryEntry.Proposer]; !ok { + attestationSlashings[queryEntry.Proposer] = make([]uint64, 0) + } + attestationSlashings[queryEntry.Proposer] = append(attestationSlashings[queryEntry.Proposer], uint64(v.(int64))) + } + if _, ok := slashedValidatorsMap[uint64(v.(int64))]; ok { + attestationSlashed[uint64(v.(int64))] = queryEntry.Proposer } - attestationSlashings[queryEntry.Proposer] = append(attestationSlashings[queryEntry.Proposer], uint64(v.(int64))) } } return nil @@ -1518,6 +1538,20 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx }) } + // Fill the slashed validators + for slashedIdx, slashingIdx := range proposalSlashed { + result.GotSlashed = append(result.GotSlashed, GotSlashedStruct{ + Index: slashedIdx, + SlashedBy: slashingIdx, + }) + } + for slashedIdx, slashingIdx := range attestationSlashed { + result.GotSlashed = append(result.GotSlashed, GotSlashedStruct{ + Index: slashedIdx, + SlashedBy: slashingIdx, + }) + } + return result, nil } @@ -1545,7 +1579,7 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c Select( goqu.L("epoch_start"), goqu.L("epoch_end")). - From(clickhouseTable). + From(goqu.L(fmt.Sprintf("%s FINAL", clickhouseTable))). Limit(1) query, args, err := ds.Prepared(true).ToSQL() From 8c05c89ce22b6a7525e7f7e02270f7ead6fd8b2c Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Thu, 1 Aug 2024 11:58:56 +0200 Subject: [PATCH 07/11] Removed obsolete slice --- backend/pkg/api/data_access/vdb_summary.go | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index fd3b25b7d..7b62cfb0e 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -279,18 +279,13 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da } for _, queryEntry := range queryResult { - uiValidatorIndices := make([]uint64, len(queryEntry.ValidatorIndices)) - for i, validatorIndex := range queryEntry.ValidatorIndices { - uiValidatorIndices[i] = validatorIndex - } - resultEntry := t.VDBSummaryTableRow{ GroupId: queryEntry.GroupId, AverageNetworkEfficiency: averageNetworkEfficiency, } // Status - for _, validatorIndex := range uiValidatorIndices { + for _, validatorIndex := range queryEntry.ValidatorIndices { if currentSyncCommitteeValidators[validatorIndex] { resultEntry.Status.CurrentSyncCount++ } @@ -308,7 +303,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da return nil, nil, err } - for _, validator := range validators { + for _, validator := range queryEntry.ValidatorIndices { metadata := validatorMapping.ValidatorMetadata[validator] // As deposited and pending validators are neither online nor offline they are counted as the third state (exited) @@ -378,7 +373,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da // If the search permits it add the entry to the result if search != "" { prefixSearch := strings.ToLower(search) - for _, validatorIndex := range uiValidatorIndices { + for _, validatorIndex := range queryEntry.ValidatorIndices { if searchValidator != -1 && validatorIndex == uint64(searchValidator) || (groupNameSearchEnabled && strings.HasPrefix(strings.ToLower(queryEntry.GroupName), prefixSearch)) { result = append(result, resultEntry) From 3b15836a893806e38d16c0089ba457de476cb7a2 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Thu, 1 Aug 2024 12:31:31 +0200 Subject: [PATCH 08/11] Fixed proposal and slashing bug --- backend/pkg/api/data_access/vdb_summary.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 7b62cfb0e..c21e3dde7 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -1389,7 +1389,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } if queryEntry.Slashed { - slashedValidators = append(slashingValidators, queryEntry.ValidatorIndex) + slashedValidators = append(slashedValidators, queryEntry.ValidatorIndex) } } @@ -1592,7 +1592,7 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c Slot uint64 `db:"slot"` Block sql.NullInt64 `db:"exec_block_number"` Status string `db:"status"` - ValidatorIndex uint64 `db:"validator_index"` + ValidatorIndex uint64 `db:"proposer"` } ds = goqu.Dialect("postgres"). From b44e337ead638ba34e78c2b456309310a74e6440 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Thu, 1 Aug 2024 13:10:25 +0200 Subject: [PATCH 09/11] Bug fixes --- backend/pkg/api/data_access/vdb_summary.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index c21e3dde7..6e18c5079 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -138,7 +138,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da if len(validators) > 0 { ds = ds. SelectAppend(goqu.L("?::smallint AS result_group_id", t.DefaultGroupId)). - Where(goqu.L("r.validator_index IN (?)", validators)) + Where(goqu.L("r.validator_index IN ?", validators)) } else { if dashboardId.AggregateGroups { ds = ds. @@ -774,7 +774,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId if len(dashboardId.Validators) > 0 { rewardsDs = rewardsDs. - Where(goqu.L("validator_index IN (?)", dashboardId.Validators)) + Where(goqu.L("validator_index IN ?", dashboardId.Validators)) } else { rewardsDs = rewardsDs. InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). @@ -825,7 +825,7 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId } elDs := goqu.Dialect("postgres"). - Select(goqu.L("SUM(COALESCE(rb.value / 1e18, ep.fee_recipient_reward, 0)) AS el_reward")). + Select(goqu.L("COALESCE(SUM(COALESCE(rb.value / 1e18, fee_recipient_reward)), 0) AS el_reward")). From(goqu.L("blocks AS b")). LeftJoin(goqu.L("execution_payloads AS ep"), goqu.On(goqu.L("b.exec_block_hash = ep.block_hash"))). LeftJoin(goqu.L("relays_blocks AS rb"), goqu.On(goqu.L("b.exec_block_hash = rb.exec_block_hash"))). @@ -1358,7 +1358,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx // handle the case when we have a list of validators if len(dashboardId.Validators) > 0 { ds = ds. - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(dashboardId.Validators))) + Where(goqu.L("r.validator_index IN ?", dashboardId.Validators)) } else { ds = ds. InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). @@ -1606,7 +1606,7 @@ func (d *DataAccessService) GetValidatorDashboardProposalSummaryValidators(ctx c if len(dashboardId.Validators) > 0 { ds = ds. - Where(goqu.L("r.validator_index = ANY(?)", pq.Array(dashboardId.Validators))) + Where(goqu.L("b.proposer = ANY(?)", pq.Array(dashboardId.Validators))) } else { ds = ds. InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("b.proposer = v.validator_index"))). From ce88c5efe1e5ce9b8a2e216ec17a5b5a7e8abbed Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Fri, 2 Aug 2024 14:02:09 +0200 Subject: [PATCH 10/11] Included subqueries for clickhouse for performance --- backend/pkg/api/data_access/vdb_summary.go | 33 +++++++++++----------- 1 file changed, 16 insertions(+), 17 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index c5822504d..4d3586db5 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -122,7 +122,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da With("validators", goqu.L("(SELECT dashboard_id, group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( goqu.L("ARRAY_AGG(r.validator_index) AS validator_indices"), - goqu.L("SUM(COALESCE(r.attestations_reward, 0) + COALESCE(r.blocks_cl_reward, 0) + COALESCE(r.sync_rewards, 0) + COALESCE(r.blocks_cl_slasher_reward, 0)) AS cl_rewards"), + goqu.L("COALESCE(SUM(r.attestations_reward + r.blocks_cl_reward + r.sync_rewards + r.blocks_cl_slasher_reward), 0) AS cl_rewards"), goqu.L("COALESCE(SUM(r.attestations_reward)::decimal, 0) AS attestations_reward"), goqu.L("COALESCE(SUM(r.attestations_ideal_reward)::decimal, 0) AS attestations_ideal_reward"), goqu.L("COALESCE(SUM(r.attestations_executed), 0) AS attestations_executed"), @@ -149,8 +149,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da } ds = ds. - InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))) if groupNameSearchEnabled && (search != "" || colSort.Column == enums.VDBSummaryColumns.Group) { // Get the group names since we can filter and/or sort for them @@ -540,7 +539,6 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(sync_committees_expected, 0) as sync_committees_expected from %[1]s FINAL inner join validators v on %[1]s.validator_index = v.validator_index - where validator_index IN (select validator_index FROM validators) ` if dashboardId.Validators != nil { @@ -766,20 +764,20 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId var rewardsResultTotal RewardsResult rewardsDs := goqu.Dialect("postgres"). + From(goqu.L(fmt.Sprintf("%s AS r FINAL", table))). + With("validators", goqu.L("(SELECT group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). Select( goqu.L("MIN(epoch_start) AS epoch_start"), goqu.L("MAX(epoch_end) AS epoch_end"), goqu.L("COUNT(*) AS validator_count"), - goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS reward")). - From(goqu.L(fmt.Sprintf("%s AS r FINAL", table))) + goqu.L("(SUM(COALESCE(r.balance_end,0)) + SUM(COALESCE(r.withdrawals_amount,0)) - SUM(COALESCE(r.deposits_amount,0)) - SUM(COALESCE(r.balance_start,0))) AS reward")) if len(dashboardId.Validators) > 0 { rewardsDs = rewardsDs. Where(goqu.L("validator_index IN ?", dashboardId.Validators)) } else { rewardsDs = rewardsDs. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))) if groupId != -1 { rewardsDs = rewardsDs. @@ -958,7 +956,7 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex COALESCE(SUM(d.sync_scheduled), 0) AS sync_scheduled FROM %[1]s d INNER JOIN validators v ON d.validator_index = v.validator_index - WHERE %[2]s >= fromUnixTimestamp($1) AND %[2]s <= fromUnixTimestamp($2) AND validator_index in (select validator_index from validators) + WHERE %[2]s >= fromUnixTimestamp($1) AND %[2]s <= fromUnixTimestamp($2) GROUP BY 1, 2;`, dataTable, dateColumn) err := d.clickhouseReader.SelectContext(ctx, &queryResults, query, afterTs, beforeTs, dashboardId.Id, groupIds, totalLineRequested) @@ -1372,13 +1370,15 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx } // Build the query - ds := goqu.Dialect("postgres").Select( - goqu.L("r.epoch_start"), - goqu.L("r.epoch_end"), - goqu.L("r.validator_index"), - goqu.L("r.slashed"), - goqu.L("COALESCE(r.blocks_slashing_count, 0) AS slashed_amount")). + ds := goqu.Dialect("postgres"). From(goqu.L(fmt.Sprintf("%s AS r FINAL", clickhouseTable))). + With("validators", goqu.L("(SELECT group_id, validator_index FROM users_val_dashboards_validators WHERE dashboard_id = ?)", dashboardId.Id)). + Select( + goqu.L("r.epoch_start"), + goqu.L("r.epoch_end"), + goqu.L("r.validator_index"), + goqu.L("r.slashed"), + goqu.L("COALESCE(r.blocks_slashing_count, 0) AS slashed_amount")). Where(goqu.L("(r.slashed OR r.blocks_slashing_count > 0)")) // handle the case when we have a list of validators @@ -1387,8 +1387,7 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx Where(goqu.L("r.validator_index IN ?", dashboardId.Validators)) } else { ds = ds. - InnerJoin(goqu.L("users_val_dashboards_validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). - Where(goqu.L("v.dashboard_id = ?", dashboardId.Id)) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))) if groupId != t.AllGroups { ds = ds.Where(goqu.L("v.group_id = ?", groupId)) From b5e42dca43cbf5a17a67b07fd2ca8b344da02786 Mon Sep 17 00:00:00 2001 From: Stefan Pletka Date: Fri, 2 Aug 2024 14:39:53 +0200 Subject: [PATCH 11/11] Reverted removal of WHERE condition --- backend/pkg/api/data_access/vdb_summary.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 4d3586db5..578b09b9c 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -149,7 +149,8 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da } ds = ds. - InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) if groupNameSearchEnabled && (search != "" || colSort.Column == enums.VDBSummaryColumns.Group) { // Get the group names since we can filter and/or sort for them @@ -539,6 +540,7 @@ func (d *DataAccessService) GetValidatorDashboardGroupSummary(ctx context.Contex COALESCE(sync_committees_expected, 0) as sync_committees_expected from %[1]s FINAL inner join validators v on %[1]s.validator_index = v.validator_index + where validator_index IN (select validator_index FROM validators) ` if dashboardId.Validators != nil { @@ -777,7 +779,8 @@ func (d *DataAccessService) internal_getElClAPR(ctx context.Context, dashboardId Where(goqu.L("validator_index IN ?", dashboardId.Validators)) } else { rewardsDs = rewardsDs. - InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) if groupId != -1 { rewardsDs = rewardsDs. @@ -956,7 +959,7 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex COALESCE(SUM(d.sync_scheduled), 0) AS sync_scheduled FROM %[1]s d INNER JOIN validators v ON d.validator_index = v.validator_index - WHERE %[2]s >= fromUnixTimestamp($1) AND %[2]s <= fromUnixTimestamp($2) + WHERE %[2]s >= fromUnixTimestamp($1) AND %[2]s <= fromUnixTimestamp($2) AND validator_index in (select validator_index from validators) GROUP BY 1, 2;`, dataTable, dateColumn) err := d.clickhouseReader.SelectContext(ctx, &queryResults, query, afterTs, beforeTs, dashboardId.Id, groupIds, totalLineRequested) @@ -1387,7 +1390,8 @@ func (d *DataAccessService) GetValidatorDashboardSlashingsSummaryValidators(ctx Where(goqu.L("r.validator_index IN ?", dashboardId.Validators)) } else { ds = ds. - InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))) + InnerJoin(goqu.L("validators v"), goqu.On(goqu.L("r.validator_index = v.validator_index"))). + Where(goqu.L("r.validator_index IN (SELECT validator_index FROM validators)")) if groupId != t.AllGroups { ds = ds.Where(goqu.L("v.group_id = ?", groupId))