Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge staging->main #1134

Merged
merged 6 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
280 changes: 142 additions & 138 deletions backend/pkg/api/data_access/vdb_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/hex"
"fmt"
"math/big"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -836,83 +835,68 @@ func (d *DataAccessService) GetValidatorDashboardGroupExists(ctx context.Context
}

func (d *DataAccessService) AddValidatorDashboardValidators(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, validators []t.VDBValidator) ([]t.VDBPostValidatorsData, error) {
result := []t.VDBPostValidatorsData{}

if len(validators) == 0 {
// No validators to add
return nil, nil
}

pubkeys := []struct {
ValidatorIndex t.VDBValidator `db:"validatorindex"`
Pubkey []byte `db:"pubkey"`
}{}

addedValidators := []struct {
ValidatorIndex t.VDBValidator `db:"validator_index"`
GroupId uint64 `db:"group_id"`
}{}

// Query to find the pubkey for each validator index
pubkeysQuery := `
SELECT
validatorindex,
pubkey
FROM validators
WHERE validatorindex = ANY($1)
`

// Query to add the validators to the dashboard and group
addValidatorsQuery := `
INSERT INTO users_val_dashboards_validators (dashboard_id, group_id, validator_index)
VALUES
`

for idx := range validators {
addValidatorsQuery += fmt.Sprintf("($1, $2, $%d), ", idx+3)
}
addValidatorsQuery = addValidatorsQuery[:len(addValidatorsQuery)-2] // remove trailing comma

// If a validator is already in the dashboard, update the group
// If the validator is already in that group nothing changes but we will include it in the result anyway
addValidatorsQuery += `
ON CONFLICT (dashboard_id, validator_index) DO UPDATE SET
dashboard_id = EXCLUDED.dashboard_id,
group_id = EXCLUDED.group_id,
validator_index = EXCLUDED.validator_index
RETURNING validator_index, group_id
`

// Find all the pubkeys
err := d.alloyReader.SelectContext(ctx, &pubkeys, pubkeysQuery, pq.Array(validators))
tx, err := d.writerDb.BeginTxx(ctx, nil)
if err != nil {
return nil, err
return nil, fmt.Errorf("error starting db transactions to insert validators for a dashboard: %w", err)
}
defer utils.Rollback(tx)

// Add all the validators to the dashboard and group
addValidatorsArgsIntf := []interface{}{dashboardId, groupId}
for _, validatorIndex := range validators {
addValidatorsArgsIntf = append(addValidatorsArgsIntf, validatorIndex)
}
err = d.alloyWriter.SelectContext(ctx, &addedValidators, addValidatorsQuery, addValidatorsArgsIntf...)
if err != nil {
return nil, err
}
numArgs := 3
batchSize := 65535 / numArgs // max 65535 params per batch, since postgres uses int16 for binding input params
batchIdx, allIdx := 0, 0
var validatorsToInsert []goqu.Record
for _, validatorIdx := range validators {
validatorsToInsert = append(validatorsToInsert,
goqu.Record{"dashboard_id": dashboardId, "group_id": groupId, "validator_index": validatorIdx})

batchIdx++
allIdx++

if batchIdx >= batchSize || allIdx >= len(validators) {
insertDs := goqu.Dialect("postgres").
Insert("users_val_dashboards_validators").
Cols("dashboard_id", "group_id", "validator_index").
Rows(validatorsToInsert).
OnConflict(goqu.DoUpdate(
"dashboard_id, validator_index",
goqu.Record{
"dashboard_id": goqu.L("EXCLUDED.dashboard_id"),
"group_id": goqu.L("EXCLUDED.group_id"),
"validator_index": goqu.L("EXCLUDED.validator_index"),
},
))

query, args, err := insertDs.Prepared(true).ToSQL()
if err != nil {
return nil, fmt.Errorf("error preparing query: %w", err)
}

// Combine the pubkeys and group ids for the result
pubkeysMap := make(map[t.VDBValidator]string, len(pubkeys))
for _, pubKeyInfo := range pubkeys {
pubkeysMap[pubKeyInfo.ValidatorIndex] = fmt.Sprintf("%#x", pubKeyInfo.Pubkey)
_, err = tx.ExecContext(ctx, query, args...)
if err != nil {
return nil, err
}

batchIdx = 0
validatorsToInsert = validatorsToInsert[:0]
}
}

addedValidatorsMap := make(map[t.VDBValidator]uint64, len(addedValidators))
for _, addedValidatorInfo := range addedValidators {
addedValidatorsMap[addedValidatorInfo.ValidatorIndex] = addedValidatorInfo.GroupId
err = tx.Commit()
if err != nil {
return nil, fmt.Errorf("error committing tx to insert validators for a dashboard: %w", err)
}

result := []t.VDBPostValidatorsData{}
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
PublicKey: pubkeysMap[validator],
GroupId: addedValidatorsMap[validator],
Index: validator,
GroupId: groupId,
})
}

Expand All @@ -922,131 +906,151 @@ func (d *DataAccessService) AddValidatorDashboardValidators(ctx context.Context,
// Updates the group for validators already in the dashboard linked to the deposit address.
// Adds up to limit new validators associated with the deposit address, if not already in the dashboard.
func (d *DataAccessService) AddValidatorDashboardValidatorsByDepositAddress(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, address string, limit uint64) ([]t.VDBPostValidatorsData, error) {
result := []t.VDBPostValidatorsData{}

addressParsed, err := hex.DecodeString(strings.TrimPrefix(address, "0x"))
if err != nil {
return nil, err
}

g, gCtx := errgroup.WithContext(ctx)

// fetch validators that are already in the dashboard and associated with the deposit address
var validatorIndicesToUpdate []uint64

g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToUpdate, `
SELECT DISTINCT uvdv.validator_index
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND d.from_address = $2;
`, dashboardId, addressParsed)
})

// fetch validators that are not yet in the dashboard and associated with the deposit address, up to the limit
var validatorIndicesToInsert []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToInsert, `
SELECT DISTINCT v.validatorindex
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
LEFT JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE d.from_address = $2 AND uvdv.validator_index IS NULL
ORDER BY v.validatorindex
LIMIT $3;
`, dashboardId, addressParsed, limit)
})

err = g.Wait()
uniqueValidatorIndexesQuery := `
(SELECT
DISTINCT uvdv.validator_index
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND d.from_address = $2)

UNION

(SELECT
DISTINCT v.validatorindex AS validator_index
FROM validators v
JOIN eth1_deposits d ON v.pubkey = d.publickey
LEFT JOIN users_val_dashboards_validators uvdv
ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE d.from_address = $2 AND uvdv.validator_index IS NULL
ORDER BY validator_index
LIMIT $3)`

addValidatorsQuery := d.getAddValidatorsQuery(uniqueValidatorIndexesQuery)

var validators []uint64
err = d.alloyWriter.SelectContext(ctx, &validators, addValidatorsQuery, dashboardId, addressParsed, limit, groupId)
if err != nil {
return nil, err
}

validatorIndices := slices.Concat(validatorIndicesToUpdate, validatorIndicesToInsert)
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
Index: validator,
GroupId: groupId,
})
}

return d.AddValidatorDashboardValidators(ctx, dashboardId, groupId, validatorIndices)
return result, nil
}

// Updates the group for validators already in the dashboard linked to the withdrawal address.
// Adds up to limit new validators associated with the withdrawal address, if not already in the dashboard.
func (d *DataAccessService) AddValidatorDashboardValidatorsByWithdrawalAddress(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, address string, limit uint64) ([]t.VDBPostValidatorsData, error) {
result := []t.VDBPostValidatorsData{}

addressParsed, err := hex.DecodeString(strings.TrimPrefix(address, "0x"))
if err != nil {
return nil, err
}

g, gCtx := errgroup.WithContext(ctx)

// fetch validators that are already in the dashboard and associated with the withdrawal address
var validatorIndicesToUpdate []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToUpdate, `
SELECT DISTINCT uvdv.validator_index
uniqueValidatorIndexesQuery := `
(SELECT
DISTINCT uvdv.validator_index
FROM validators v
JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND v.withdrawalcredentials = $2;
`, dashboardId, addressParsed)
})
WHERE uvdv.dashboard_id = $1 AND v.withdrawalcredentials = $2)

UNION

// fetch validators that are not yet in the dashboard and associated with the withdrawal address, up to the limit
var validatorIndicesToInsert []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToInsert, `
SELECT DISTINCT v.validatorindex
(SELECT
DISTINCT v.validatorindex AS validator_index
FROM validators v
LEFT JOIN users_val_dashboards_validators uvdv ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
LEFT JOIN users_val_dashboards_validators uvdv
ON v.validatorindex = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE v.withdrawalcredentials = $2 AND uvdv.validator_index IS NULL
ORDER BY v.validatorindex
LIMIT $3;
`, dashboardId, addressParsed, limit)
})
LIMIT $3)`

addValidatorsQuery := d.getAddValidatorsQuery(uniqueValidatorIndexesQuery)

err = g.Wait()
var validators []uint64
err = d.alloyWriter.SelectContext(ctx, &validators, addValidatorsQuery, dashboardId, addressParsed, limit, groupId)
if err != nil {
return nil, err
}

validatorIndices := slices.Concat(validatorIndicesToUpdate, validatorIndicesToInsert)
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
Index: validator,
GroupId: groupId,
})
}

return d.AddValidatorDashboardValidators(ctx, dashboardId, groupId, validatorIndices)
return result, nil
}

// Update the group for validators already in the dashboard linked to the graffiti (via produced block).
// Add up to limit new validators associated with the graffiti, if not already in the dashboard.
func (d *DataAccessService) AddValidatorDashboardValidatorsByGraffiti(ctx context.Context, dashboardId t.VDBIdPrimary, groupId uint64, graffiti string, limit uint64) ([]t.VDBPostValidatorsData, error) {
g, gCtx := errgroup.WithContext(ctx)
result := []t.VDBPostValidatorsData{}

// fetch validators that are already in the dashboard and associated with the graffiti
var validatorIndicesToUpdate []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToUpdate, `
SELECT DISTINCT uvdv.validator_index
uniqueValidatorIndexesQuery := `
(SELECT
DISTINCT uvdv.validator_index
FROM blocks b
JOIN users_val_dashboards_validators uvdv ON b.proposer = uvdv.validator_index
WHERE uvdv.dashboard_id = $1 AND b.graffiti_text = $2;
`, dashboardId, graffiti)
})
WHERE uvdv.dashboard_id = $1 AND b.graffiti_text = $2)

// fetch validators that are not yet in the dashboard and associated with the graffiti, up to the limit
var validatorIndicesToInsert []uint64
g.Go(func() error {
return d.readerDb.SelectContext(gCtx, &validatorIndicesToInsert, `
SELECT DISTINCT b.proposer
UNION

(SELECT DISTINCT b.proposer AS validator_index
FROM blocks b
LEFT JOIN users_val_dashboards_validators uvdv ON b.proposer = uvdv.validator_index AND uvdv.dashboard_id = $1
LEFT JOIN users_val_dashboards_validators uvdv
ON b.proposer = uvdv.validator_index AND uvdv.dashboard_id = $1
WHERE b.graffiti_text = $2 AND uvdv.validator_index IS NULL
ORDER BY b.proposer
LIMIT $3;
`, dashboardId, graffiti, limit)
})
LIMIT $3)`

addValidatorsQuery := d.getAddValidatorsQuery(uniqueValidatorIndexesQuery)

err := g.Wait()
var validators []uint64
err := d.alloyWriter.SelectContext(ctx, &validators, addValidatorsQuery, dashboardId, graffiti, limit, groupId)
if err != nil {
return nil, err
}

validatorIndices := slices.Concat(validatorIndicesToUpdate, validatorIndicesToInsert)
for _, validator := range validators {
result = append(result, t.VDBPostValidatorsData{
Index: validator,
GroupId: groupId,
})
}

return d.AddValidatorDashboardValidators(ctx, dashboardId, groupId, validatorIndices)
return result, nil
}

func (d *DataAccessService) getAddValidatorsQuery(uniqueValidatorIndexesQuery string) string {
return fmt.Sprintf(`
WITH unique_validator_indexes AS (
%s
)
INSERT INTO users_val_dashboards_validators (dashboard_id, group_id, validator_index)
SELECT $1 AS dashboard_id, $4 AS group_id, validator_index
FROM unique_validator_indexes
ON CONFLICT (dashboard_id, validator_index) DO UPDATE
SET
dashboard_id = EXCLUDED.dashboard_id,
group_id = EXCLUDED.group_id,
validator_index = EXCLUDED.validator_index
RETURNING validator_index`, uniqueValidatorIndexesQuery)
}

func (d *DataAccessService) RemoveValidatorDashboardValidators(ctx context.Context, dashboardId t.VDBIdPrimary, validators []t.VDBValidator) error {
Expand Down
4 changes: 2 additions & 2 deletions backend/pkg/api/types/validator_dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,6 @@ type VDBPostCreateGroupData struct {
}

type VDBPostValidatorsData struct {
PublicKey string `json:"public_key"`
GroupId uint64 `json:"group_id"`
Index uint64 `json:"index"`
GroupId uint64 `json:"group_id"`
}
8 changes: 5 additions & 3 deletions backend/pkg/commons/ethclients/ethclients.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ type gitAPIResponse struct {
}

type clientUpdateInfo struct {
Name string
Date time.Time
Name string
Url string
Version string
Date time.Time
}

type EthClients struct {
Expand Down Expand Up @@ -177,7 +179,7 @@ func prepareEthClientData(repo string, name string, curTime time.Time) (string,
timeDiff := (curTime.Sub(rTime).Hours() / 24.0)

if timeDiff < 1 { // add recent releases for notification collector to be collected
update := clientUpdateInfo{Name: name, Date: rTime}
update := clientUpdateInfo{Name: name, Date: rTime, Url: client.HTMLURL, Version: client.TagName}
bannerClients = append(bannerClients, update)
}
return client.Name, rTime.Unix()
Expand Down
Loading
Loading