Skip to content

Commit

Permalink
Merge pull request #181 from gobitfly/NOBIDS/cl-deposits-endpoint
Browse files Browse the repository at this point in the history
Nobids/cl deposits endpoint
  • Loading branch information
invis-bitfly authored Apr 8, 2024
2 parents 4df07c7 + 04435b3 commit 1c39ed5
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 2 deletions.
152 changes: 150 additions & 2 deletions backend/pkg/api/data_access/data_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"math"
"math/big"
"slices"
"sort"
isort "sort"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -1955,8 +1957,154 @@ func (d *DataAccessService) GetValidatorDashboardElDeposits(dashboardId t.VDBId,
}

func (d *DataAccessService) GetValidatorDashboardClDeposits(dashboardId t.VDBId, cursor string, search string, limit uint64) ([]t.VDBConsensusDepositsTableRow, *t.Paging, error) {
// WORKING @invis
return d.dummy.GetValidatorDashboardClDeposits(dashboardId, cursor, search, limit)
var err error
currentDirection := enums.DESC // TODO: expose over parameter
var currentCursor t.CLDepositsCursor

if cursor != "" {
currentCursor, err = utils.StringToCursor[t.CLDepositsCursor](cursor)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse passed cursor as CLDepositsCursor: %w", err)
}
}

var byteaArray pq.ByteaArray

// Resolve validator indices to pubkeys
if dashboardId.Validators != nil {
validatorsArray := make([]uint64, len(dashboardId.Validators))
for i, v := range dashboardId.Validators {
validatorsArray[i] = v.Index
}
validatorPubkeys, err := d.services.GetPubkeysOfValidatorIndexSlice(validatorsArray)
if err != nil {
return nil, nil, fmt.Errorf("failed to resolve validator indices to pubkeys: %w", err)
}

// Convert pubkeys to bytes for PostgreSQL
byteaArray = make(pq.ByteaArray, len(validatorPubkeys))
for i, p := range validatorPubkeys {
byteaArray[i], _ = hexutil.Decode(p)
}
}

// Custom type for block_index
var data []struct {
GroupId sql.NullInt64 `db:"group_id"`
PublicKey []byte `db:"publickey"`
Slot int64 `db:"block_slot"`
SlotIndex int64 `db:"block_index"`
WithdrawalCredential []byte `db:"withdrawalcredentials"`
Amount decimal.Decimal `db:"amount"`
Signature []byte `db:"signature"`
}

query := `
SELECT
bd.publickey,
bd.block_slot,
bd.block_index,
bd.amount,
bd.signature,
bd.withdrawalcredentials
`

var filter interface{}
if dashboardId.Validators != nil {
query += `
FROM
blocks_deposits bd
WHERE
bd.publickey = ANY ($1)`
filter = byteaArray
} else {
query += `
, cbdl.group_id
FROM
cached_blocks_deposits_lookup cbdl
LEFT JOIN blocks_deposits bd ON bd.block_slot = cbdl.block_slot
AND bd.block_index = cbdl.block_index
WHERE
cbdl.dashboard_id = $1`
filter = dashboardId.Id
}

params := []interface{}{filter}
filterFragment := ` ORDER BY bd.block_slot DESC, bd.block_index DESC`
if currentCursor.IsValid() {
filterFragment = ` AND (bd.block_slot < $2 or (bd.block_slot = $2 and bd.block_index < $3)) ` + filterFragment
params = append(params, currentCursor.Slot, currentCursor.SlotIndex)
}

if currentCursor.IsValid() && currentCursor.Direction == enums.ASC ||
!currentCursor.IsValid() && currentDirection == enums.ASC {
filterFragment = strings.Replace(strings.Replace(filterFragment, "<", ">", -1), "DESC", "ASC", -1)
}

if dashboardId.Validators == nil {
filterFragment = strings.Replace(filterFragment, "bd.", "cbdl.", -1)
}

params = append(params, limit+1)
filterFragment += fmt.Sprintf(" LIMIT $%d", len(params))

err = db.AlloyReader.Select(&data, query+filterFragment, params...)

if err != nil {
return nil, nil, err
}

pubkeys := make([]string, len(data))
for i, row := range data {
pubkeys[i] = hexutil.Encode(row.PublicKey)
}
indices, err := d.services.GetValidatorIndexOfPubkeySlice(pubkeys)
if err != nil {
return nil, nil, fmt.Errorf("failed to recover indices after query: %w", err)
}

responseData := make([]t.VDBConsensusDepositsTableRow, len(data))
for i, row := range data {
responseData[i] = t.VDBConsensusDepositsTableRow{
PublicKey: t.PubKey(pubkeys[i]),
Index: indices[i],
Epoch: utils.EpochOfSlot(uint64(row.Slot)),
Slot: uint64(row.Slot),
WithdrawalCredential: t.Hash(hexutil.Encode(row.WithdrawalCredential)),
Amount: row.Amount,
Signature: t.Hash(hexutil.Encode(row.WithdrawalCredential)),
}
if row.GroupId.Valid {
responseData[i].GroupId = uint64(row.GroupId.Int64)
} else {
responseData[i].GroupId = t.DefaultGroupId
}
}
var paging t.Paging

moreDataFlag := len(responseData) > int(limit)
if !moreDataFlag && !currentCursor.IsValid() {
// No paging required
return responseData, &paging, nil
}
if moreDataFlag {
// Remove the last entry as it is only required for the more data flag
responseData = responseData[:len(responseData)-1]
data = data[:len(data)-1]
}

if currentCursor.IsValid() && currentDirection != currentCursor.Direction {
// Invert query result so response matches requested direction
slices.Reverse(responseData)
slices.Reverse(data)
}

p, err := utils.GetPagingFromData(utils.DataStructure(data), currentCursor, currentDirection, moreDataFlag)
if err != nil {
return nil, nil, fmt.Errorf("failed to get paging: %w", err)
}

return responseData, p, nil
}

func (d *DataAccessService) GetValidatorDashboardWithdrawals(dashboardId t.VDBId, cursor string, sort []t.Sort[enums.VDBWithdrawalsColumn], search string, limit uint64) ([]t.VDBWithdrawalsTableRow, *t.Paging, error) {
Expand Down
6 changes: 6 additions & 0 deletions backend/pkg/api/types/data_access.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,9 @@ func (c GenericCursor) IsValid() bool {
func (c GenericCursor) GetDirection() enums.SortOrder {
return c.Direction
}

type CLDepositsCursor struct {
GenericCursor
Slot int64
SlotIndex int64
}
71 changes: 71 additions & 0 deletions backend/pkg/exporter/modules/slot_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,77 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e
log.Infof("writing validator mapping to redis done, took %s", time.Since(start))
return nil
})
// update cached view of consensus desposits
g.Go(func() error {
start := time.Now()
tx, err := db.AlloyWriter.Beginx()
if err != nil {
return fmt.Errorf("error starting tx: %w", err)
}
defer utils.Rollback(tx)
// clean up if we for some godforsaken reason have ended up in a situation where the temporary views still exists
_, err = tx.Exec(`drop materialized view if exists "_tmp_cached_blocks_deposits_lookup"`)
if err != nil {
return fmt.Errorf("error dropping tmp materialized view: %w", err)
}
_, err = tx.Exec(`drop materialized view if exists "_trash_blocks_deposits_lookup"`)
if err != nil {
return fmt.Errorf("error dropping trash materialized view: %w", err)
}
_, err = tx.Exec(`
CREATE MATERIALIZED VIEW _tmp_cached_blocks_deposits_lookup AS
SELECT
uvdv.dashboard_id,
uvdv.group_id,
bd.block_slot,
bd.block_index
FROM
blocks_deposits bd
INNER JOIN validators v ON bd.publickey = v.pubkey
INNER JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
ORDER BY
uvdv.dashboard_id DESC,
bd.block_slot DESC,
bd.block_index DESC;
`)
if err != nil {
return fmt.Errorf("error creating tmp materialized view: %w", err)
}
_, err = tx.Exec(`CREATE UNIQUE INDEX "_tmp_cached_blocks_deposits_lookup_block_index_idx" ON "_tmp_cached_blocks_deposits_lookup" (dashboard_id, block_slot, block_index);`)
if err != nil {
return fmt.Errorf("error creating index for tmp materialized view: %w", err)
}
_, err = tx.Exec(`GRANT SELECT ON _tmp_cached_blocks_deposits_lookup TO readaccess;`)
if err != nil {
return fmt.Errorf("error granting select on tmp materialized view: %w", err)
}
_, err = tx.Exec(`GRANT ALL ON _tmp_cached_blocks_deposits_lookup TO alloydbsuperuser;`)
if err != nil {
return fmt.Errorf("error granting all on tmp materialized view: %w", err)
}
_, err = tx.Exec(`ALTER MATERIALIZED VIEW if exists cached_blocks_deposits_lookup RENAME TO _trash_blocks_deposits_lookup;`)
if err != nil {
return fmt.Errorf("error renaming existing materialized view: %w", err)
}
_, err = tx.Exec(`ALTER MATERIALIZED VIEW _tmp_cached_blocks_deposits_lookup RENAME TO cached_blocks_deposits_lookup;`)
if err != nil {
return fmt.Errorf("error renaming tmp materialized view: %w", err)
}
_, err = tx.Exec(`drop materialized view if exists "_trash_blocks_deposits_lookup"`)
if err != nil {
return fmt.Errorf("error dropping trash materialized view: %w", err)
}
_, err = tx.Exec(`ALTER INDEX "_tmp_cached_blocks_deposits_lookup_block_index_idx" RENAME TO "cached_blocks_deposits_lookup_block_index_idx";`)
if err != nil {
return fmt.Errorf("error renaming index: %w", err)
}
err = tx.Commit()
if err != nil {
return fmt.Errorf("error committing tx: %w", err)
}
log.Infof("updating cached view of consensus deposits took %s", time.Since(start))
return nil
})
}
var epochParticipationStats *types.ValidatorParticipation
if epoch > 0 {
Expand Down

0 comments on commit 1c39ed5

Please sign in to comment.