Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(BEDS-520) DA: removed query to mat view #900

Merged
merged 3 commits into from
Oct 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 91 additions & 76 deletions backend/pkg/api/data_access/vdb_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"block"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
Expand All @@ -132,7 +132,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
if colSort.Desc {
sortOrder = ` DESC`
}
val := t.VDBValidator(0)
var val any
sortColName := `slot`
switch colSort.Column {
case enums.VDBBlockProposer:
Expand All @@ -142,8 +142,8 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
sortColName = `status`
val = currentCursor.Status
case enums.VDBBlockProposerReward:
sortColName = `reward`
val = currentCursor.Reward.BigInt().Uint64()
sortColName = `el_reward + cl_reward`
val = currentCursor.Reward
}
onlyPrimarySort := sortColName == `slot`
if currentCursor.IsValid() {
Expand Down Expand Up @@ -218,78 +218,28 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
} else {
log.Debugf("duties info not available, skipping scheduled slots: %s", err)
}
if len(scheduledProposers) > 0 {
// make sure the distinct clause filters out the correct duplicated row (e.g. block=nil)
orderBy += `, block`
}
}

groupIdCol := "group_id"
// this is actually just used for sorting for "reward".. will not consider EL rewards of unfinalized blocks atm
reward := "reward"
if dashboardId.Validators != nil {
groupIdCol = fmt.Sprintf("%d AS %s", t.DefaultGroupId, groupIdCol)
reward = "coalesce(rb.value / 1e18, ep.fee_recipient_reward) AS " + reward
}
selectFields := fmt.Sprintf(`
r.proposer,
blocks.proposer,
blocks.epoch,
blocks.slot,
%s,
r.epoch,
r.slot,
r.status,
block,
blocks.status,
exec_block_number,
COALESCE(rb.proposer_fee_recipient, blocks.exec_fee_recipient) AS fee_recipient,
COALESCE(rb.value / 1e18, ep.fee_recipient_reward) AS el_reward,
cp.cl_attestations_reward / 1e9 + cp.cl_sync_aggregate_reward / 1e9 + cp.cl_slashing_inclusion_reward / 1e9 as cl_reward,
r.graffiti_text`, groupIdCol)
query := fmt.Sprintf(`SELECT distinct on (slot)
%s
FROM ( SELECT * FROM (`, selectFields)
// supply scheduled proposals, if any
if len(scheduledProposers) > 0 {
// distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was)
// might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved
distinct := "slot"
if !onlyPrimarySort {
distinct = sortColName + ", " + distinct
}
params = append(params, scheduledProposers)
params = append(params, scheduledEpochs)
params = append(params, scheduledSlots)
query = fmt.Sprintf(`SELECT distinct on (%s)
blocks.graffiti_text`, groupIdCol)
cte := fmt.Sprintf(`WITH past_blocks AS (SELECT
%s
FROM ( SELECT * FROM (WITH scheduled_proposals (
proposer,
epoch,
slot,
status,
block,
reward,
graffiti_text
) AS (SELECT
*,
'0',
null::int,
null::int,
''
FROM unnest($%d::int[], $%d::int[], $%d::int[]))
SELECT * FROM scheduled_proposals
UNION
(`, distinct, selectFields, len(params)-2, len(params)-1, len(params))
}
query += fmt.Sprintf(`
SELECT
proposer,
epoch,
blocks.slot,
status,
exec_block_number AS block,
%s,
graffiti_text
FROM blocks
`, reward)

if dashboardId.Validators == nil {
FROM blocks
`, selectFields)
/*if dashboardId.Validators == nil {
query += `
LEFT JOIN cached_proposal_rewards ON cached_proposal_rewards.dashboard_id = $1 AND blocks.slot = cached_proposal_rewards.slot
`
Expand All @@ -304,32 +254,97 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
if len(scheduledProposers) > 0 {
query += `)`
}
query += `) as u `
query += `) as u `*/
if dashboardId.Validators == nil {
query += fmt.Sprintf(`
cte += fmt.Sprintf(`
INNER JOIN (%s) validators ON validators.validator_index = proposer
`, filteredValidatorsQuery)
} else {
query += `WHERE proposer = ANY($1) `
if len(where) == 0 {
where += `WHERE `
} else {
where += `AND `
}
where += `proposer = ANY($1) `
}

params = append(params, limit+1)
limitStr := fmt.Sprintf(`
LIMIT $%d
`, len(params))
rewardsStr := `) r
LEFT JOIN consensus_payloads cp on r.slot = cp.slot
LEFT JOIN blocks on r.slot = blocks.slot
// relay bribe deduplication; select most likely (=max) relay bribe value for each block
cte += `
LEFT JOIN consensus_payloads cp on blocks.slot = cp.slot
LEFT JOIN execution_payloads ep ON ep.block_hash = blocks.exec_block_hash
LEFT JOIN relays_blocks rb ON rb.exec_block_hash = blocks.exec_block_hash
LEFT JOIN LATERAL (SELECT exec_block_hash, proposer_fee_recipient, max(value) as value
FROM relays_blocks
WHERE relays_blocks.exec_block_hash = blocks.exec_block_hash
GROUP BY exec_block_hash, proposer_fee_recipient
) rb ON rb.exec_block_hash = blocks.exec_block_hash
)
`
// relay bribe deduplication; select most likely (=max) relay bribe value for each block
relayOrder := ``
if colSort.Column != enums.VDBBlockProposerReward {
relayOrder += `, rb.value ` + secSort

distinct := ""
if !onlyPrimarySort {
distinct = sortColName
}
from := `past_blocks `
selectStr := `SELECT * FROM ` + from
if len(distinct) > 0 {
selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM ` + from
}

query := selectStr + from + where + orderBy + limitStr
// supply scheduled proposals, if any
if len(scheduledProposers) > 0 {
// distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was)
// might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved
params = append(params, scheduledProposers)
params = append(params, scheduledEpochs)
params = append(params, scheduledSlots)
cte += fmt.Sprintf(`,
scheduled_blocks as (
SELECT
prov.proposer,
prov.epoch,
prov.slot,
%s,
'0'::text AS status,
NULL::int AS exec_block_number,
''::bytea AS fee_recipient,
NULL::float AS el_reward,
NULL::float AS cl_reward,
''::text AS graffiti_text
FROM unnest($%d::int[], $%d::int[], $%d::int[]) AS prov(proposer, epoch, slot)
`, groupIdCol, len(params)-2, len(params)-1, len(params))
if dashboardId.Validators == nil {
// add group id
cte += fmt.Sprintf(`INNER JOIN users_val_dashboards_validators validators
ON validators.dashboard_id = $1
AND validators.validator_index = ANY($%d::int[])
`, len(params)-2)
}
cte += `) `
if len(distinct) != 0 {
distinct += ", "
}
// keep all ordering, sorting etc
distinct += "slot"
selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM `
// encapsulate past blocks query to ensure performance
from = `(
( ` + query + ` )
UNION ALL
SELECT * FROM scheduled_blocks
) as combined
`
// make sure the distinct clause filters out the correct duplicated row (e.g. block=nil)
orderBy += `, exec_block_number NULLS LAST`
query = selectStr + from + where + orderBy + limitStr
}

startTime := time.Now()
err = d.alloyReader.SelectContext(ctx, &proposals, query+where+orderBy+limitStr+rewardsStr+orderBy+relayOrder, params...)
err = d.alloyReader.SelectContext(ctx, &proposals, cte+query, params...)
log.Debugf("=== getting past blocks took %s", time.Since(startTime))
if err != nil {
return nil, nil, err
Expand Down
Loading