Skip to content

Commit

Permalink
Merge pull request #900 from gobitfly/BEDS-520/fix-blocks-query
Browse files Browse the repository at this point in the history
(BEDS-520) DA: removed query to mat view
  • Loading branch information
remoterami authored Oct 2, 2024
2 parents 09b2451 + 3e1f57c commit 42a8588
Showing 1 changed file with 91 additions and 76 deletions.
167 changes: 91 additions & 76 deletions backend/pkg/api/data_access/vdb_blocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
Epoch uint64 `db:"epoch"`
Slot uint64 `db:"slot"`
Status uint64 `db:"status"`
Block sql.NullInt64 `db:"block"`
Block sql.NullInt64 `db:"exec_block_number"`
FeeRecipient []byte `db:"fee_recipient"`
ElReward decimal.NullDecimal `db:"el_reward"`
ClReward decimal.NullDecimal `db:"cl_reward"`
Expand All @@ -132,7 +132,7 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
if colSort.Desc {
sortOrder = ` DESC`
}
val := t.VDBValidator(0)
var val any
sortColName := `slot`
switch colSort.Column {
case enums.VDBBlockProposer:
Expand All @@ -142,8 +142,8 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
sortColName = `status`
val = currentCursor.Status
case enums.VDBBlockProposerReward:
sortColName = `reward`
val = currentCursor.Reward.BigInt().Uint64()
sortColName = `el_reward + cl_reward`
val = currentCursor.Reward
}
onlyPrimarySort := sortColName == `slot`
if currentCursor.IsValid() {
Expand Down Expand Up @@ -218,78 +218,28 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
} else {
log.Debugf("duties info not available, skipping scheduled slots: %s", err)
}
if len(scheduledProposers) > 0 {
// make sure the distinct clause filters out the correct duplicated row (e.g. block=nil)
orderBy += `, block`
}
}

groupIdCol := "group_id"
// this is actually just used for sorting for "reward".. will not consider EL rewards of unfinalized blocks atm
reward := "reward"
if dashboardId.Validators != nil {
groupIdCol = fmt.Sprintf("%d AS %s", t.DefaultGroupId, groupIdCol)
reward = "coalesce(rb.value / 1e18, ep.fee_recipient_reward) AS " + reward
}
selectFields := fmt.Sprintf(`
r.proposer,
blocks.proposer,
blocks.epoch,
blocks.slot,
%s,
r.epoch,
r.slot,
r.status,
block,
blocks.status,
exec_block_number,
COALESCE(rb.proposer_fee_recipient, blocks.exec_fee_recipient) AS fee_recipient,
COALESCE(rb.value / 1e18, ep.fee_recipient_reward) AS el_reward,
cp.cl_attestations_reward / 1e9 + cp.cl_sync_aggregate_reward / 1e9 + cp.cl_slashing_inclusion_reward / 1e9 as cl_reward,
r.graffiti_text`, groupIdCol)
query := fmt.Sprintf(`SELECT distinct on (slot)
%s
FROM ( SELECT * FROM (`, selectFields)
// supply scheduled proposals, if any
if len(scheduledProposers) > 0 {
// distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was)
// might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved
distinct := "slot"
if !onlyPrimarySort {
distinct = sortColName + ", " + distinct
}
params = append(params, scheduledProposers)
params = append(params, scheduledEpochs)
params = append(params, scheduledSlots)
query = fmt.Sprintf(`SELECT distinct on (%s)
blocks.graffiti_text`, groupIdCol)
cte := fmt.Sprintf(`WITH past_blocks AS (SELECT
%s
FROM ( SELECT * FROM (WITH scheduled_proposals (
proposer,
epoch,
slot,
status,
block,
reward,
graffiti_text
) AS (SELECT
*,
'0',
null::int,
null::int,
''
FROM unnest($%d::int[], $%d::int[], $%d::int[]))
SELECT * FROM scheduled_proposals
UNION
(`, distinct, selectFields, len(params)-2, len(params)-1, len(params))
}
query += fmt.Sprintf(`
SELECT
proposer,
epoch,
blocks.slot,
status,
exec_block_number AS block,
%s,
graffiti_text
FROM blocks
`, reward)

if dashboardId.Validators == nil {
FROM blocks
`, selectFields)
/*if dashboardId.Validators == nil {
query += `
LEFT JOIN cached_proposal_rewards ON cached_proposal_rewards.dashboard_id = $1 AND blocks.slot = cached_proposal_rewards.slot
`
Expand All @@ -304,32 +254,97 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das
if len(scheduledProposers) > 0 {
query += `)`
}
query += `) as u `
query += `) as u `*/
if dashboardId.Validators == nil {
query += fmt.Sprintf(`
cte += fmt.Sprintf(`
INNER JOIN (%s) validators ON validators.validator_index = proposer
`, filteredValidatorsQuery)
} else {
query += `WHERE proposer = ANY($1) `
if len(where) == 0 {
where += `WHERE `
} else {
where += `AND `
}
where += `proposer = ANY($1) `
}

params = append(params, limit+1)
limitStr := fmt.Sprintf(`
LIMIT $%d
`, len(params))
rewardsStr := `) r
LEFT JOIN consensus_payloads cp on r.slot = cp.slot
LEFT JOIN blocks on r.slot = blocks.slot
// relay bribe deduplication; select most likely (=max) relay bribe value for each block
cte += `
LEFT JOIN consensus_payloads cp on blocks.slot = cp.slot
LEFT JOIN execution_payloads ep ON ep.block_hash = blocks.exec_block_hash
LEFT JOIN relays_blocks rb ON rb.exec_block_hash = blocks.exec_block_hash
LEFT JOIN LATERAL (SELECT exec_block_hash, proposer_fee_recipient, max(value) as value
FROM relays_blocks
WHERE relays_blocks.exec_block_hash = blocks.exec_block_hash
GROUP BY exec_block_hash, proposer_fee_recipient
) rb ON rb.exec_block_hash = blocks.exec_block_hash
)
`
// relay bribe deduplication; select most likely (=max) relay bribe value for each block
relayOrder := ``
if colSort.Column != enums.VDBBlockProposerReward {
relayOrder += `, rb.value ` + secSort

distinct := ""
if !onlyPrimarySort {
distinct = sortColName
}
from := `past_blocks `
selectStr := `SELECT * FROM ` + from
if len(distinct) > 0 {
selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM ` + from
}

query := selectStr + from + where + orderBy + limitStr
// supply scheduled proposals, if any
if len(scheduledProposers) > 0 {
// distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was)
// might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved
params = append(params, scheduledProposers)
params = append(params, scheduledEpochs)
params = append(params, scheduledSlots)
cte += fmt.Sprintf(`,
scheduled_blocks as (
SELECT
prov.proposer,
prov.epoch,
prov.slot,
%s,
'0'::text AS status,
NULL::int AS exec_block_number,
''::bytea AS fee_recipient,
NULL::float AS el_reward,
NULL::float AS cl_reward,
''::text AS graffiti_text
FROM unnest($%d::int[], $%d::int[], $%d::int[]) AS prov(proposer, epoch, slot)
`, groupIdCol, len(params)-2, len(params)-1, len(params))
if dashboardId.Validators == nil {
// add group id
cte += fmt.Sprintf(`INNER JOIN users_val_dashboards_validators validators
ON validators.dashboard_id = $1
AND validators.validator_index = ANY($%d::int[])
`, len(params)-2)
}
cte += `) `
if len(distinct) != 0 {
distinct += ", "
}
// keep all ordering, sorting etc
distinct += "slot"
selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM `
// encapsulate past blocks query to ensure performance
from = `(
( ` + query + ` )
UNION ALL
SELECT * FROM scheduled_blocks
) as combined
`
// make sure the distinct clause filters out the correct duplicated row (e.g. block=nil)
orderBy += `, exec_block_number NULLS LAST`
query = selectStr + from + where + orderBy + limitStr
}

startTime := time.Now()
err = d.alloyReader.SelectContext(ctx, &proposals, query+where+orderBy+limitStr+rewardsStr+orderBy+relayOrder, params...)
err = d.alloyReader.SelectContext(ctx, &proposals, cte+query, params...)
log.Debugf("=== getting past blocks took %s", time.Since(startTime))
if err != nil {
return nil, nil, err
Expand Down

0 comments on commit 42a8588

Please sign in to comment.