Skip to content

Commit

Permalink
Added batch behaviour and simpler queries
Browse files Browse the repository at this point in the history
Fixed missing tx commit

Added insert count to standard insert

Get the count of inserted validators

Get the inserted count for each query

Get the total amount of rows not just inserted ones

Removed test comments

Returned the filled result

Changed to return pubkeys instead of count

Changed the return type from pubkey to index

chore: convert ts types

See: BEDS-880
  • Loading branch information
Eisei24 committed Nov 13, 2024
1 parent 310860a commit bac8e1e
Show file tree
Hide file tree
Showing 3 changed files with 145 additions and 141 deletions.
280 changes: 142 additions & 138 deletions backend/pkg/api/data_access/vdb_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"math/big"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -836,83 +835,68 @@ func (d *DataAccessService) GetValidatorDashboardGroupExists(ctx context.Context
}

func (d *DataAccessService) AddValidatorDashboardValidators(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, validators []t.VDBValidator) ([]t.VDBPostValidatorsData, error) {
result := []t.VDBPostValidatorsData{}

if len(validators) == 0 {
// No validators to add
return nil, nil
}

pubkeys := []struct {
ValidatorIndex t.VDBValidator `db:"validatorindex"`
Pubkey []byte `db:"pubkey"`
}{}

addedValidators := []struct {
ValidatorIndex t.VDBValidator `db:"validator_index"`
GroupId uint64 `db:"group_id"`
}{}

// Query to find the pubkey for each validator index
pubkeysQuery := `
SELECT
validatorindex,
pubkey
FROM validators
WHERE validatorindex = ANY($1)
`

// Query to add the validators to the dashboard and group
addValidatorsQuery := `
INSERT INTO users_val_dashboards_validators (dashboard_id, group_id, validator_index)
VALUES
`

for idx := range validators {
addValidatorsQuery += fmt.Sprintf("($1, $2, $%d), ", idx+3)
}
addValidatorsQuery = addValidatorsQuery[:len(addValidatorsQuery)-2] // remove trailing comma

// If a validator is already in the dashboard, update the group
// If the validator is already in that group nothing changes but we will include it in the result anyway
addValidatorsQuery += `
ON CONFLICT (dashboard_id, validator_index) DO UPDATE SET
dashboard_id = EXCLUDED.dashboard_id,
group_id = EXCLUDED.group_id,
validator_index = EXCLUDED.validator_index
RETURNING validator_index, group_id
`

// Find all the pubkeys
err := d.alloyReader.SelectContext(ctx, &pubkeys, pubkeysQuery, pq.Array(validators))
tx, err := d.userWriter.BeginTxx(ctx, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("error starting db transactions to insert validators for a dashboard: %w", err)
}
defer utils.Rollback(tx)

// Add all the validators to the dashboard and group
addValidatorsArgsIntf := []interface{}{dashboardId, groupId}
for _, validatorIndex := range validators {
addValidatorsArgsIntf = append(addValidatorsArgsIntf, validatorIndex)
}
err = d.alloyWriter.SelectContext(ctx, &addedValidators, addValidatorsQuery, addValidatorsArgsIntf...)
if err != nil {
return nil, err
}
numArgs := 3
batchSize := 65535 / numArgs // max 65535 params per batch, since postgres uses int16 for binding input params
batchIdx, allIdx := 0, 0
var validatorsToInsert []goqu.Record
for _, validatorIdx := range validators {
validatorsToInsert = append(validatorsToInsert,
goqu.Record{"dashboard_id": dashboardId, "group_id": groupId, "validator_index": validatorIdx})

batchIdx++
allIdx++

if batchIdx >= batchSize || allIdx >= len(validators) {
insertDs := goqu.Dialect("postgres").
Insert("users_val_dashboards_validators").
Cols("dashboard_id", "group_id", "validator_index").
Rows(validatorsToInsert).
OnConflict(goqu.DoUpdate(
"dashboard_id, validator_index",
goqu.Record{
"dashboard_id": goqu.L("EXCLUDED.dashboard_id"),
"group_id": goqu.L("EXCLUDED.group_id"),
"validator_index": goqu.L("EXCLUDED.validator_index"),
},
))

query, args, err := insertDs.Prepared(true).ToSQL()
if err != nil {
return nil, fmt.Errorf("error preparing query: %w", err)
}

// Combine the pubkeys and group ids for the result
pubkeysMap := make(map[t.VDBValidator]string, len(pubkeys))
for _, pubKeyInfo := range pubkeys {
pubkeysMap[pubKeyInfo.ValidatorIndex] = fmt.Sprintf("%#x", pubKeyInfo.Pubkey)
_, err = tx.ExecContext(ctx, query, args...)
if err != nil {
return nil, err
}

batchIdx = 0
validatorsToInsert = validatorsToInsert[:0]
}
}

addedValidatorsMap := make(map[t.VDBValidator]uint64, len(addedValidators))
for _, addedValidatorInfo := range addedValidators {
addedValidatorsMap[addedValidatorInfo.ValidatorIndex] = addedValidatorInfo.GroupId
err = tx.Commit()
if err != nil {
return nil, fmt.Errorf("error committing tx to insert validators for a dashboard: %w", err)
}

result := []t.VDBPostValidatorsData{}
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
PublicKey: pubkeysMap[validator],
GroupId: addedValidatorsMap[validator],
Index: validator,
GroupId: groupId,
})
}

Expand All @@ -922,131 +906,151 @@ func (d *DataAccessService) AddValidatorDashboardValidators(ctx context.Context,
// Updates the group for validators already in the dashboard linked to the deposit address.
// Adds up to limit new validators associated with the deposit address, if not already in the dashboard.
func (d *DataAccessService) AddValidatorDashboardValidatorsByDepositAddress(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, address string, limit uint64) ([]t.VDBPostValidatorsData, error) {
result := []t.VDBPostValidatorsData{}

addressParsed, err := hex.DecodeString(strings.TrimPrefix(address, "0x"))
if err != nil {
return nil, err
}

g, gCtx := errgroup.WithContext(ctx)

// fetch validators that are already in the dashboard and associated with the deposit address
var validatorIndicesToUpdate []uint64

g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToUpdate, `
SELECT DISTINCT uvdv.validator_index
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND d.from_address = $2;
`, dashboardId, addressParsed)
})

// fetch validators that are not yet in the dashboard and associated with the deposit address, up to the limit
var validatorIndicesToInsert []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToInsert, `
SELECT DISTINCT v.validatorindex
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
LEFT JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE d.from_address = $2 AND uvdv.validator_index IS NULL
ORDER BY v.validatorindex
LIMIT $3;
`, dashboardId, addressParsed, limit)
})

err = g.Wait()
uniqueValidatorIndexesQuery := `
(SELECT
DISTINCT uvdv.validator_index
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND d.from_address = $2)
UNION
(SELECT
DISTINCT v.validatorindex AS validator_index
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
LEFT JOIN users_val_dashboards_validators uvdv
ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE d.from_address = $2 AND uvdv.validator_index IS NULL
ORDER BY validator_index
LIMIT $3)`

addValidatorsQuery := d.getAddValidatorsQuery(uniqueValidatorIndexesQuery)

var validators []uint64
err = d.alloyWriter.SelectContext(ctx, &validators, addValidatorsQuery, dashboardId, addressParsed, limit, groupId)
if err != nil {
return nil, err
}

validatorIndices := slices.Concat(validatorIndicesToUpdate, validatorIndicesToInsert)
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
Index: validator,
GroupId: groupId,
})
}

return d.AddValidatorDashboardValidators(ctx, dashboardId, groupId, validatorIndices)
return result, nil
}

// Updates the group for validators already in the dashboard linked to the withdrawal address.
// Adds up to limit new validators associated with the withdrawal address, if not already in the dashboard.
func (d *DataAccessService) AddValidatorDashboardValidatorsByWithdrawalAddress(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, address string, limit uint64) ([]t.VDBPostValidatorsData, error) {
result := []t.VDBPostValidatorsData{}

addressParsed, err := hex.DecodeString(strings.TrimPrefix(address, "0x"))
if err != nil {
return nil, err
}

g, gCtx := errgroup.WithContext(ctx)

// fetch validators that are already in the dashboard and associated with the withdrawal address
var validatorIndicesToUpdate []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToUpdate, `
SELECT DISTINCT uvdv.validator_index
uniqueValidatorIndexesQuery := `
(SELECT
DISTINCT uvdv.validator_index
FROM validators v
JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND v.withdrawalcredentials = $2;
`, dashboardId, addressParsed)
})
WHERE uvdv.dashboard_id = $1 AND v.withdrawalcredentials = $2)
UNION
// fetch validators that are not yet in the dashboard and associated with the withdrawal address, up to the limit
var validatorIndicesToInsert []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToInsert, `
SELECT DISTINCT v.validatorindex
(SELECT
DISTINCT v.validatorindex AS validator_index
FROM validators v
LEFT JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
LEFT JOIN users_val_dashboards_validators uvdv
ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE v.withdrawalcredentials = $2 AND uvdv.validator_index IS NULL
ORDER BY v.validatorindex
LIMIT $3;
`, dashboardId, addressParsed, limit)
})
LIMIT $3)`

addValidatorsQuery := d.getAddValidatorsQuery(uniqueValidatorIndexesQuery)

err = g.Wait()
var validators []uint64
err = d.alloyWriter.SelectContext(ctx, &validators, addValidatorsQuery, dashboardId, addressParsed, limit, groupId)
if err != nil {
return nil, err
}

validatorIndices := slices.Concat(validatorIndicesToUpdate, validatorIndicesToInsert)
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
Index: validator,
GroupId: groupId,
})
}

return d.AddValidatorDashboardValidators(ctx, dashboardId, groupId, validatorIndices)
return result, nil
}

// Update the group for validators already in the dashboard linked to the graffiti (via produced block).
// Add up to limit new validators associated with the graffiti, if not already in the dashboard.
func (d *DataAccessService) AddValidatorDashboardValidatorsByGraffiti(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, graffiti string, limit uint64) ([]t.VDBPostValidatorsData, error) {
g, gCtx := errgroup.WithContext(ctx)
result := []t.VDBPostValidatorsData{}

// fetch validators that are already in the dashboard and associated with the graffiti
var validatorIndicesToUpdate []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToUpdate, `
SELECT DISTINCT uvdv.validator_index
uniqueValidatorIndexesQuery := `
(SELECT
DISTINCT uvdv.validator_index
FROM blocks b
JOIN users_val_dashboards_validators uvdv ON b.proposer = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND b.graffiti_text = $2;
`, dashboardId, graffiti)
})
WHERE uvdv.dashboard_id = $1 AND b.graffiti_text = $2)
// fetch validators that are not yet in the dashboard and associated with the graffiti, up to the limit
var validatorIndicesToInsert []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToInsert, `
SELECT DISTINCT b.proposer
UNION
(SELECT DISTINCT b.proposer AS validator_index
FROM blocks b
LEFT JOIN users_val_dashboards_validators uvdv ON b.proposer = uvdv.validator_index AND uvdv.dashboard_id = $1
LEFT JOIN users_val_dashboards_validators uvdv
ON b.proposer = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE b.graffiti_text = $2 AND uvdv.validator_index IS NULL
ORDER BY b.proposer
LIMIT $3;
`, dashboardId, graffiti, limit)
})
LIMIT $3)`

addValidatorsQuery := d.getAddValidatorsQuery(uniqueValidatorIndexesQuery)

err := g.Wait()
var validators []uint64
err := d.alloyWriter.SelectContext(ctx, &validators, addValidatorsQuery, dashboardId, graffiti, limit, groupId)
if err != nil {
return nil, err
}

validatorIndices := slices.Concat(validatorIndicesToUpdate, validatorIndicesToInsert)
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
Index: validator,
GroupId: groupId,
})
}

return d.AddValidatorDashboardValidators(ctx, dashboardId, groupId, validatorIndices)
return result, nil
}

func (d *DataAccessService) getAddValidatorsQuery(uniqueValidatorIndexesQuery string) string {
return fmt.Sprintf(`
WITH unique_validator_indexes AS (
%s
)
INSERT INTO users_val_dashboards_validators (dashboard_id, group_id, validator_index)
SELECT $1 AS dashboard_id, $4 AS group_id, validator_index
FROM unique_validator_indexes
ON CONFLICT (dashboard_id, validator_index) DO UPDATE
SET
dashboard_id = EXCLUDED.dashboard_id,
group_id = EXCLUDED.group_id,
validator_index = EXCLUDED.validator_index
RETURNING validator_index`, uniqueValidatorIndexesQuery)
}

func (d *DataAccessService) RemoveValidatorDashboardValidators(ctx context.Context, dashboardId t.VDBIdPrimary, validators []t.VDBValidator) error {
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/api/types/validator_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,6 @@ type VDBPostCreateGroupData struct {
}

type VDBPostValidatorsData struct {
PublicKey string `json:"public_key"`
GroupId uint64 `json:"group_id"`
Index uint64 `json:"index"`
GroupId uint64 `json:"group_id"`
}
2 changes: 1 addition & 1 deletion frontend/types/api/validator_dashboard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,6 @@ export interface VDBPostCreateGroupData {
name: string;
}
export interface VDBPostValidatorsData {
public_key: string;
index: number /* uint64 */;
group_id: number /* uint64 */;
}

0 comments on commit bac8e1e

Please sign in to comment.