From 849bdf672ba2ddf668bac8bcd4137af251e0f548 Mon Sep 17 00:00:00 2001 From: invis-bitfly Date: Mon, 8 Apr 2024 10:46:07 +0200 Subject: [PATCH 1/2] exporter: add task to generate materialized cl deposits view --- backend/pkg/exporter/modules/slot_exporter.go | 71 +++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/backend/pkg/exporter/modules/slot_exporter.go b/backend/pkg/exporter/modules/slot_exporter.go index 6bf6bd2fb..29c7a1fb8 100644 --- a/backend/pkg/exporter/modules/slot_exporter.go +++ b/backend/pkg/exporter/modules/slot_exporter.go @@ -543,6 +543,77 @@ func ExportSlot(client rpc.Client, slot uint64, isHeadEpoch bool, tx *sqlx.Tx) e log.Infof("writing validator mapping to redis done, took %s", time.Since(start)) return nil }) + // update cached view of consensus desposits + g.Go(func() error { + start := time.Now() + tx, err := db.AlloyWriter.Beginx() + if err != nil { + return fmt.Errorf("error starting tx: %w", err) + } + defer utils.Rollback(tx) + // clean up if we for some godforsaken reason have ended up in a situation where the temporary views still exists + _, err = tx.Exec(`drop materialized view if exists "_tmp_cached_blocks_deposits_lookup"`) + if err != nil { + return fmt.Errorf("error dropping tmp materialized view: %w", err) + } + _, err = tx.Exec(`drop materialized view if exists "_trash_blocks_deposits_lookup"`) + if err != nil { + return fmt.Errorf("error dropping trash materialized view: %w", err) + } + _, err = tx.Exec(` + CREATE MATERIALIZED VIEW _tmp_cached_blocks_deposits_lookup AS + SELECT + uvdv.dashboard_id, + uvdv.group_id, + bd.block_slot, + bd.block_index + FROM + blocks_deposits bd + INNER JOIN validators v ON bd.publickey = v.pubkey + INNER JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index + ORDER BY + uvdv.dashboard_id DESC, + bd.block_slot DESC, + bd.block_index DESC; + `) + if err != nil { + return fmt.Errorf("error creating tmp materialized view: %w", err) + } + _, err = tx.Exec(`CREATE UNIQUE INDEX "_tmp_cached_blocks_deposits_lookup_block_index_idx" ON "_tmp_cached_blocks_deposits_lookup" (dashboard_id, block_slot, block_index);`) + if err != nil { + return fmt.Errorf("error creating index for tmp materialized view: %w", err) + } + _, err = tx.Exec(`GRANT SELECT ON _tmp_cached_blocks_deposits_lookup TO readaccess;`) + if err != nil { + return fmt.Errorf("error granting select on tmp materialized view: %w", err) + } + _, err = tx.Exec(`GRANT ALL ON _tmp_cached_blocks_deposits_lookup TO alloydbsuperuser;`) + if err != nil { + return fmt.Errorf("error granting all on tmp materialized view: %w", err) + } + _, err = tx.Exec(`ALTER MATERIALIZED VIEW if exists cached_blocks_deposits_lookup RENAME TO _trash_blocks_deposits_lookup;`) + if err != nil { + return fmt.Errorf("error renaming existing materialized view: %w", err) + } + _, err = tx.Exec(`ALTER MATERIALIZED VIEW _tmp_cached_blocks_deposits_lookup RENAME TO cached_blocks_deposits_lookup;`) + if err != nil { + return fmt.Errorf("error renaming tmp materialized view: %w", err) + } + _, err = tx.Exec(`drop materialized view if exists "_trash_blocks_deposits_lookup"`) + if err != nil { + return fmt.Errorf("error dropping trash materialized view: %w", err) + } + _, err = tx.Exec(`ALTER INDEX "_tmp_cached_blocks_deposits_lookup_block_index_idx" RENAME TO "cached_blocks_deposits_lookup_block_index_idx";`) + if err != nil { + return fmt.Errorf("error renaming index: %w", err) + } + err = tx.Commit() + if err != nil { + return fmt.Errorf("error committing tx: %w", err) + } + log.Infof("updating cached view of consensus deposits took %s", time.Since(start)) + return nil + }) } var epochParticipationStats *types.ValidatorParticipation if epoch > 0 { From 04435b3feef8f05e1c3739f8d57b9c43906ca9e0 Mon Sep 17 00:00:00 2001 From: invis-bitfly Date: Mon, 8 Apr 2024 10:46:32 +0200 Subject: [PATCH 2/2] api: add cl deposit endpoint --- backend/pkg/api/data_access/data_access.go | 152 ++++++++++++++++++++- backend/pkg/api/types/data_access.go | 6 + 2 files changed, 156 insertions(+), 2 deletions(-) diff --git a/backend/pkg/api/data_access/data_access.go b/backend/pkg/api/data_access/data_access.go index a6f6b5273..e9f9e48ff 100644 --- a/backend/pkg/api/data_access/data_access.go +++ b/backend/pkg/api/data_access/data_access.go @@ -6,8 +6,10 @@ import ( "fmt" "math" "math/big" + "slices" "sort" isort "sort" + "strings" "sync" "time" @@ -1955,8 +1957,154 @@ func (d *DataAccessService) GetValidatorDashboardElDeposits(dashboardId t.VDBId, } func (d *DataAccessService) GetValidatorDashboardClDeposits(dashboardId t.VDBId, cursor string, search string, limit uint64) ([]t.VDBConsensusDepositsTableRow, *t.Paging, error) { - // WORKING @invis - return d.dummy.GetValidatorDashboardClDeposits(dashboardId, cursor, search, limit) + var err error + currentDirection := enums.DESC // TODO: expose over parameter + var currentCursor t.CLDepositsCursor + + if cursor != "" { + currentCursor, err = utils.StringToCursor[t.CLDepositsCursor](cursor) + if err != nil { + return nil, nil, fmt.Errorf("failed to parse passed cursor as CLDepositsCursor: %w", err) + } + } + + var byteaArray pq.ByteaArray + + // Resolve validator indices to pubkeys + if dashboardId.Validators != nil { + validatorsArray := make([]uint64, len(dashboardId.Validators)) + for i, v := range dashboardId.Validators { + validatorsArray[i] = v.Index + } + validatorPubkeys, err := d.services.GetPubkeysOfValidatorIndexSlice(validatorsArray) + if err != nil { + return nil, nil, fmt.Errorf("failed to resolve validator indices to pubkeys: %w", err) + } + + // Convert pubkeys to bytes for PostgreSQL + byteaArray = make(pq.ByteaArray, len(validatorPubkeys)) + for i, p := range validatorPubkeys { + byteaArray[i], _ = hexutil.Decode(p) + } + } + + // Custom type for block_index + var data []struct { + GroupId sql.NullInt64 `db:"group_id"` + PublicKey []byte `db:"publickey"` + Slot int64 `db:"block_slot"` + SlotIndex int64 `db:"block_index"` + WithdrawalCredential []byte `db:"withdrawalcredentials"` + Amount decimal.Decimal `db:"amount"` + Signature []byte `db:"signature"` + } + + query := ` + SELECT + bd.publickey, + bd.block_slot, + bd.block_index, + bd.amount, + bd.signature, + bd.withdrawalcredentials + ` + + var filter interface{} + if dashboardId.Validators != nil { + query += ` + FROM + blocks_deposits bd + WHERE + bd.publickey = ANY ($1)` + filter = byteaArray + } else { + query += ` + , cbdl.group_id + FROM + cached_blocks_deposits_lookup cbdl + LEFT JOIN blocks_deposits bd ON bd.block_slot = cbdl.block_slot + AND bd.block_index = cbdl.block_index + WHERE + cbdl.dashboard_id = $1` + filter = dashboardId.Id + } + + params := []interface{}{filter} + filterFragment := ` ORDER BY bd.block_slot DESC, bd.block_index DESC` + if currentCursor.IsValid() { + filterFragment = ` AND (bd.block_slot < $2 or (bd.block_slot = $2 and bd.block_index < $3)) ` + filterFragment + params = append(params, currentCursor.Slot, currentCursor.SlotIndex) + } + + if currentCursor.IsValid() && currentCursor.Direction == enums.ASC || + !currentCursor.IsValid() && currentDirection == enums.ASC { + filterFragment = strings.Replace(strings.Replace(filterFragment, "<", ">", -1), "DESC", "ASC", -1) + } + + if dashboardId.Validators == nil { + filterFragment = strings.Replace(filterFragment, "bd.", "cbdl.", -1) + } + + params = append(params, limit+1) + filterFragment += fmt.Sprintf(" LIMIT $%d", len(params)) + + err = db.AlloyReader.Select(&data, query+filterFragment, params...) + + if err != nil { + return nil, nil, err + } + + pubkeys := make([]string, len(data)) + for i, row := range data { + pubkeys[i] = hexutil.Encode(row.PublicKey) + } + indices, err := d.services.GetValidatorIndexOfPubkeySlice(pubkeys) + if err != nil { + return nil, nil, fmt.Errorf("failed to recover indices after query: %w", err) + } + + responseData := make([]t.VDBConsensusDepositsTableRow, len(data)) + for i, row := range data { + responseData[i] = t.VDBConsensusDepositsTableRow{ + PublicKey: t.PubKey(pubkeys[i]), + Index: indices[i], + Epoch: utils.EpochOfSlot(uint64(row.Slot)), + Slot: uint64(row.Slot), + WithdrawalCredential: t.Hash(hexutil.Encode(row.WithdrawalCredential)), + Amount: row.Amount, + Signature: t.Hash(hexutil.Encode(row.WithdrawalCredential)), + } + if row.GroupId.Valid { + responseData[i].GroupId = uint64(row.GroupId.Int64) + } else { + responseData[i].GroupId = t.DefaultGroupId + } + } + var paging t.Paging + + moreDataFlag := len(responseData) > int(limit) + if !moreDataFlag && !currentCursor.IsValid() { + // No paging required + return responseData, &paging, nil + } + if moreDataFlag { + // Remove the last entry as it is only required for the more data flag + responseData = responseData[:len(responseData)-1] + data = data[:len(data)-1] + } + + if currentCursor.IsValid() && currentDirection != currentCursor.Direction { + // Invert query result so response matches requested direction + slices.Reverse(responseData) + slices.Reverse(data) + } + + p, err := utils.GetPagingFromData(utils.DataStructure(data), currentCursor, currentDirection, moreDataFlag) + if err != nil { + return nil, nil, fmt.Errorf("failed to get paging: %w", err) + } + + return responseData, p, nil } func (d *DataAccessService) GetValidatorDashboardWithdrawals(dashboardId t.VDBId, cursor string, sort []t.Sort[enums.VDBWithdrawalsColumn], search string, limit uint64) ([]t.VDBWithdrawalsTableRow, *t.Paging, error) { diff --git a/backend/pkg/api/types/data_access.go b/backend/pkg/api/types/data_access.go index 8c8913b1e..56eb7e123 100644 --- a/backend/pkg/api/types/data_access.go +++ b/backend/pkg/api/types/data_access.go @@ -54,3 +54,9 @@ func (c GenericCursor) IsValid() bool { func (c GenericCursor) GetDirection() enums.SortOrder { return c.Direction } + +type CLDepositsCursor struct { + GenericCursor + Slot int64 + SlotIndex int64 +}