From 9da57af5b81416f6c22492fb8d1dc0057641b8c2 Mon Sep 17 00:00:00 2001 From: remoterami <142154971+remoterami@users.noreply.github.com> Date: Mon, 30 Sep 2024 18:26:21 +0200 Subject: [PATCH 1/3] removed query to mat view --- backend/pkg/api/data_access/vdb_blocks.go | 58 ++++++++++------------- 1 file changed, 25 insertions(+), 33 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_blocks.go b/backend/pkg/api/data_access/vdb_blocks.go index 2a2a0953a..14cbb3e1e 100644 --- a/backend/pkg/api/data_access/vdb_blocks.go +++ b/backend/pkg/api/data_access/vdb_blocks.go @@ -115,7 +115,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das Epoch uint64 `db:"epoch"` Slot uint64 `db:"slot"` Status uint64 `db:"status"` - Block sql.NullInt64 `db:"block"` + Block sql.NullInt64 `db:"exec_block_number"` FeeRecipient []byte `db:"fee_recipient"` ElReward decimal.NullDecimal `db:"el_reward"` ClReward decimal.NullDecimal `db:"cl_reward"` @@ -161,7 +161,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das params = append(params, currentCursor.Slot) where += `WHERE (` if onlyPrimarySort { - where += `slot` + sign + fmt.Sprintf(`$%d`, len(params)) + where += `blocks.slot` + sign + fmt.Sprintf(`$%d`, len(params)) } else { params = append(params, val) secSign := ` < ` @@ -172,7 +172,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das // explicit cast to int because type of 'status' column is text for some reason sortColName += "::int" } - where += fmt.Sprintf(`(slot`+secSign+`$%d AND `+sortColName+` = $%d) OR `+sortColName+sign+`$%d`, len(params)-1, len(params), len(params)) + where += fmt.Sprintf(`(blocks.slot`+secSign+`$%d AND `+sortColName+` = $%d) OR `+sortColName+sign+`$%d`, len(params)-1, len(params), len(params)) } where += `) ` } @@ -226,25 +226,30 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das groupIdCol := "group_id" // this is actually just used for sorting for "reward".. will not consider EL rewards of unfinalized blocks atm - reward := "reward" + // reward := "reward" if dashboardId.Validators != nil { groupIdCol = fmt.Sprintf("%d AS %s", t.DefaultGroupId, groupIdCol) - reward = "coalesce(rb.value / 1e18, ep.fee_recipient_reward) AS " + reward + // reward = "coalesce(rb.value / 1e18, ep.fee_recipient_reward) AS " + reward } selectFields := fmt.Sprintf(` - r.proposer, + blocks.proposer, %s, - r.epoch, - r.slot, - r.status, - block, + blocks.epoch, + blocks.slot, + blocks.status, + exec_block_number, COALESCE(rb.proposer_fee_recipient, blocks.exec_fee_recipient) AS fee_recipient, COALESCE(rb.value / 1e18, ep.fee_recipient_reward) AS el_reward, cp.cl_attestations_reward / 1e9 + cp.cl_sync_aggregate_reward / 1e9 + cp.cl_slashing_inclusion_reward / 1e9 as cl_reward, - r.graffiti_text`, groupIdCol) - query := fmt.Sprintf(`SELECT distinct on (slot) + blocks.graffiti_text`, groupIdCol) + distinct := "slot" + if !onlyPrimarySort { + distinct = sortColName + ", " + distinct + } + query := fmt.Sprintf(`SELECT distinct on (%s) %s - FROM ( SELECT * FROM (`, selectFields) + FROM blocks + `, distinct, selectFields) // supply scheduled proposals, if any if len(scheduledProposers) > 0 { // distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was) @@ -277,19 +282,8 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das UNION (`, distinct, selectFields, len(params)-2, len(params)-1, len(params)) } - query += fmt.Sprintf(` - SELECT - proposer, - epoch, - blocks.slot, - status, - exec_block_number AS block, - %s, - graffiti_text - FROM blocks - `, reward) - if dashboardId.Validators == nil { + /*if dashboardId.Validators == nil { query += ` LEFT JOIN cached_proposal_rewards ON cached_proposal_rewards.dashboard_id = $1 AND blocks.slot = cached_proposal_rewards.slot ` @@ -304,32 +298,30 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das if len(scheduledProposers) > 0 { query += `)` } - query += `) as u ` + query += `) as u `*/ if dashboardId.Validators == nil { query += fmt.Sprintf(` INNER JOIN (%s) validators ON validators.validator_index = proposer `, filteredValidatorsQuery) } else { - query += `WHERE proposer = ANY($1) ` + where += `WHERE proposer = ANY($1) ` } params = append(params, limit+1) limitStr := fmt.Sprintf(` LIMIT $%d `, len(params)) - rewardsStr := `) r - LEFT JOIN consensus_payloads cp on r.slot = cp.slot - LEFT JOIN blocks on r.slot = blocks.slot + rewardsStr := ` + LEFT JOIN consensus_payloads cp on blocks.slot = cp.slot LEFT JOIN execution_payloads ep ON ep.block_hash = blocks.exec_block_hash LEFT JOIN relays_blocks rb ON rb.exec_block_hash = blocks.exec_block_hash ` // relay bribe deduplication; select most likely (=max) relay bribe value for each block - relayOrder := `` if colSort.Column != enums.VDBBlockProposerReward { - relayOrder += `, rb.value ` + secSort + orderBy += `, rb.value ` + secSort } startTime := time.Now() - err = d.alloyReader.SelectContext(ctx, &proposals, query+where+orderBy+limitStr+rewardsStr+orderBy+relayOrder, params...) + err = d.alloyReader.SelectContext(ctx, &proposals, query+rewardsStr+where+orderBy+limitStr, params...) log.Debugf("=== getting past blocks took %s", time.Since(startTime)) if err != nil { return nil, nil, err From 6db47db73a8f09c974373a603284813fb95a7eaf Mon Sep 17 00:00:00 2001 From: remoterami <142154971+remoterami@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:25:26 +0200 Subject: [PATCH 2/3] fixed rewards handling --- backend/pkg/api/data_access/vdb_blocks.go | 141 +++++++++++++--------- 1 file changed, 84 insertions(+), 57 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_blocks.go b/backend/pkg/api/data_access/vdb_blocks.go index 14cbb3e1e..39bd95d6e 100644 --- a/backend/pkg/api/data_access/vdb_blocks.go +++ b/backend/pkg/api/data_access/vdb_blocks.go @@ -132,7 +132,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das if colSort.Desc { sortOrder = ` DESC` } - val := t.VDBValidator(0) + var val any sortColName := `slot` switch colSort.Column { case enums.VDBBlockProposer: @@ -142,8 +142,8 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das sortColName = `status` val = currentCursor.Status case enums.VDBBlockProposerReward: - sortColName = `reward` - val = currentCursor.Reward.BigInt().Uint64() + sortColName = `el_reward + cl_reward` + val = currentCursor.Reward } onlyPrimarySort := sortColName == `slot` if currentCursor.IsValid() { @@ -161,7 +161,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das params = append(params, currentCursor.Slot) where += `WHERE (` if onlyPrimarySort { - where += `blocks.slot` + sign + fmt.Sprintf(`$%d`, len(params)) + where += `slot` + sign + fmt.Sprintf(`$%d`, len(params)) } else { params = append(params, val) secSign := ` < ` @@ -172,7 +172,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das // explicit cast to int because type of 'status' column is text for some reason sortColName += "::int" } - where += fmt.Sprintf(`(blocks.slot`+secSign+`$%d AND `+sortColName+` = $%d) OR `+sortColName+sign+`$%d`, len(params)-1, len(params), len(params)) + where += fmt.Sprintf(`(slot`+secSign+`$%d AND `+sortColName+` = $%d) OR `+sortColName+sign+`$%d`, len(params)-1, len(params), len(params)) } where += `) ` } @@ -218,10 +218,6 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das } else { log.Debugf("duties info not available, skipping scheduled slots: %s", err) } - if len(scheduledProposers) > 0 { - // make sure the distinct clause filters out the correct duplicated row (e.g. block=nil) - orderBy += `, block` - } } groupIdCol := "group_id" @@ -233,56 +229,19 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das } selectFields := fmt.Sprintf(` blocks.proposer, - %s, blocks.epoch, blocks.slot, + %s, blocks.status, exec_block_number, COALESCE(rb.proposer_fee_recipient, blocks.exec_fee_recipient) AS fee_recipient, COALESCE(rb.value / 1e18, ep.fee_recipient_reward) AS el_reward, cp.cl_attestations_reward / 1e9 + cp.cl_sync_aggregate_reward / 1e9 + cp.cl_slashing_inclusion_reward / 1e9 as cl_reward, blocks.graffiti_text`, groupIdCol) - distinct := "slot" - if !onlyPrimarySort { - distinct = sortColName + ", " + distinct - } - query := fmt.Sprintf(`SELECT distinct on (%s) + cte := fmt.Sprintf(`WITH past_blocks AS (SELECT %s FROM blocks - `, distinct, selectFields) - // supply scheduled proposals, if any - if len(scheduledProposers) > 0 { - // distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was) - // might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved - distinct := "slot" - if !onlyPrimarySort { - distinct = sortColName + ", " + distinct - } - params = append(params, scheduledProposers) - params = append(params, scheduledEpochs) - params = append(params, scheduledSlots) - query = fmt.Sprintf(`SELECT distinct on (%s) - %s - FROM ( SELECT * FROM (WITH scheduled_proposals ( - proposer, - epoch, - slot, - status, - block, - reward, - graffiti_text - ) AS (SELECT - *, - '0', - null::int, - null::int, - '' - FROM unnest($%d::int[], $%d::int[], $%d::int[])) - SELECT * FROM scheduled_proposals - UNION - (`, distinct, selectFields, len(params)-2, len(params)-1, len(params)) - } - + `, selectFields) /*if dashboardId.Validators == nil { query += ` LEFT JOIN cached_proposal_rewards ON cached_proposal_rewards.dashboard_id = $1 AND blocks.slot = cached_proposal_rewards.slot @@ -300,28 +259,96 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das } query += `) as u `*/ if dashboardId.Validators == nil { - query += fmt.Sprintf(` + cte += fmt.Sprintf(` INNER JOIN (%s) validators ON validators.validator_index = proposer `, filteredValidatorsQuery) } else { - where += `WHERE proposer = ANY($1) ` + if len(where) == 0 { + where += `WHERE ` + } else { + where += `AND ` + } + where += `proposer = ANY($1) ` } params = append(params, limit+1) limitStr := fmt.Sprintf(` LIMIT $%d `, len(params)) - rewardsStr := ` + // relay bribe deduplication; select most likely (=max) relay bribe value for each block + cte += ` LEFT JOIN consensus_payloads cp on blocks.slot = cp.slot LEFT JOIN execution_payloads ep ON ep.block_hash = blocks.exec_block_hash - LEFT JOIN relays_blocks rb ON rb.exec_block_hash = blocks.exec_block_hash + LEFT JOIN LATERAL (SELECT exec_block_hash, proposer_fee_recipient, max(value) as value + FROM relays_blocks + WHERE relays_blocks.exec_block_hash = blocks.exec_block_hash + GROUP BY exec_block_hash, proposer_fee_recipient + ) rb ON rb.exec_block_hash = blocks.exec_block_hash + ) ` - // relay bribe deduplication; select most likely (=max) relay bribe value for each block - if colSort.Column != enums.VDBBlockProposerReward { - orderBy += `, rb.value ` + secSort + + distinct := "" + if !onlyPrimarySort { + distinct = sortColName } + from := `past_blocks ` + selectStr := `SELECT * FROM ` + from + if len(distinct) > 0 { + selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM ` + from + } + + query := selectStr + from + where + orderBy + limitStr + // supply scheduled proposals, if any + if len(scheduledProposers) > 0 { + // distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was) + // might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved + params = append(params, scheduledProposers) + params = append(params, scheduledEpochs) + params = append(params, scheduledSlots) + cte += fmt.Sprintf(`, + scheduled_blocks ( + proposer, + epoch, + slot, + group_id, + status, + exec_block_number, + fee_recipient, + el_reward, + cl_reward, + graffiti_text + ) AS (SELECT + *, + 0, + '0', + null::int, + ''::bytea, + null::int, + null::int, + '' + FROM unnest($%d::int[], $%d::int[], $%d::int[]) + ) + `, len(params)-2, len(params)-1, len(params)) + if len(distinct) != 0 { + distinct += ", " + } + // keep all ordering, sorting etc + distinct += "slot" + selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM ` + // encapsulate past blocks query to ensure performance + from = `( + ( ` + query + ` ) + UNION ALL + SELECT * FROM scheduled_blocks + ) as combined + ` + // make sure the distinct clause filters out the correct duplicated row (e.g. block=nil) + orderBy += `, exec_block_number NULLS LAST` + query = selectStr + from + where + orderBy + limitStr + } + startTime := time.Now() - err = d.alloyReader.SelectContext(ctx, &proposals, query+rewardsStr+where+orderBy+limitStr, params...) + err = d.alloyReader.SelectContext(ctx, &proposals, cte+query, params...) log.Debugf("=== getting past blocks took %s", time.Since(startTime)) if err != nil { return nil, nil, err From 3e1f57c513855af56dbed1031b7d573523701af9 Mon Sep 17 00:00:00 2001 From: remoterami <142154971+remoterami@users.noreply.github.com> Date: Wed, 2 Oct 2024 10:07:29 +0200 Subject: [PATCH 3/3] group ids fixed --- backend/pkg/api/data_access/vdb_blocks.go | 48 +++++++++++------------ 1 file changed, 22 insertions(+), 26 deletions(-) diff --git a/backend/pkg/api/data_access/vdb_blocks.go b/backend/pkg/api/data_access/vdb_blocks.go index 39bd95d6e..ca881c0fb 100644 --- a/backend/pkg/api/data_access/vdb_blocks.go +++ b/backend/pkg/api/data_access/vdb_blocks.go @@ -221,11 +221,8 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das } groupIdCol := "group_id" - // this is actually just used for sorting for "reward".. will not consider EL rewards of unfinalized blocks atm - // reward := "reward" if dashboardId.Validators != nil { groupIdCol = fmt.Sprintf("%d AS %s", t.DefaultGroupId, groupIdCol) - // reward = "coalesce(rb.value / 1e18, ep.fee_recipient_reward) AS " + reward } selectFields := fmt.Sprintf(` blocks.proposer, @@ -306,29 +303,28 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das params = append(params, scheduledEpochs) params = append(params, scheduledSlots) cte += fmt.Sprintf(`, - scheduled_blocks ( - proposer, - epoch, - slot, - group_id, - status, - exec_block_number, - fee_recipient, - el_reward, - cl_reward, - graffiti_text - ) AS (SELECT - *, - 0, - '0', - null::int, - ''::bytea, - null::int, - null::int, - '' - FROM unnest($%d::int[], $%d::int[], $%d::int[]) - ) - `, len(params)-2, len(params)-1, len(params)) + scheduled_blocks as ( + SELECT + prov.proposer, + prov.epoch, + prov.slot, + %s, + '0'::text AS status, + NULL::int AS exec_block_number, + ''::bytea AS fee_recipient, + NULL::float AS el_reward, + NULL::float AS cl_reward, + ''::text AS graffiti_text + FROM unnest($%d::int[], $%d::int[], $%d::int[]) AS prov(proposer, epoch, slot) + `, groupIdCol, len(params)-2, len(params)-1, len(params)) + if dashboardId.Validators == nil { + // add group id + cte += fmt.Sprintf(`INNER JOIN users_val_dashboards_validators validators + ON validators.dashboard_id = $1 + AND validators.validator_index = ANY($%d::int[]) + `, len(params)-2) + } + cte += `) ` if len(distinct) != 0 { distinct += ", " }