Skip to content

Commit

Permalink
Merge pull request #428 from gobitfly/NOBIDS/implement_validator_bulk…
Browse files Browse the repository at this point in the history
…_add_methods

prevent exceeding the dashboard validator limit during batch inserts
  • Loading branch information
peterbitfly authored Jun 11, 2024
2 parents 7482e78 + 9f673c2 commit e5e5d98
Showing 1 changed file with 136 additions and 9 deletions.
145 changes: 136 additions & 9 deletions backend/pkg/api/data_access/vdb_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/gobitfly/beaconchain/pkg/api/enums"
t "github.com/gobitfly/beaconchain/pkg/api/types"
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/lib/pq"
Expand Down Expand Up @@ -790,13 +791,55 @@ func (d *DataAccessService) AddValidatorDashboardValidatorsByDepositAddress(dash
if len(addressParsed) != 20 {
return nil, fmt.Errorf("invalid deposit address: %s", address)
}
var validatorIndices []uint64
err = d.readerDb.Select(&validatorIndices, "SELECT validatorindex FROM validators WHERE pubkey IN (SELECT publickey FROM eth1_deposits WHERE from_address = $1) ORDER BY validatorindex LIMIT $2;", addressParsed, limit)
var validatorIndicesToAdd []uint64
err = d.readerDb.Select(&validatorIndicesToAdd, "SELECT validatorindex FROM validators WHERE pubkey IN (SELECT publickey FROM eth1_deposits WHERE from_address = $1) ORDER BY validatorindex LIMIT $2;", addressParsed, limit)
if err != nil {
return nil, err
}

// AddValidatorDashboardValidators will handle updating groups for validators already in the dashboard
// retrieve the existing validators
var existingValidators []uint64
err = d.alloyWriter.Select(&existingValidators, "SELECT validator_index FROM users_val_dashboards_validators WHERE dashboard_id = $1", dashboardId)
if err != nil {
return nil, err
}
existingValidatorsMap := make(map[uint64]bool, len(existingValidators))
for _, validatorIndex := range existingValidators {
existingValidatorsMap[validatorIndex] = true
}

// filter out the validators that are already in the dashboard
var validatorIndicesToUpdate []uint64
var validatorIndicesToInsert []uint64
for _, validatorIndex := range validatorIndicesToAdd {
if _, ok := existingValidatorsMap[validatorIndex]; ok {
validatorIndicesToUpdate = append(validatorIndicesToUpdate, validatorIndex)
} else {
validatorIndicesToInsert = append(validatorIndicesToInsert, validatorIndex)
}
}

// update the group for all existing validators
validatorIndices := make([]uint64, 0, int(limit))
validatorIndices = append(validatorIndices, validatorIndicesToUpdate...)

// insert the new validators up to the allowed user max limit taking into account how many validators are already in the dashboard
if len(validatorIndicesToInsert) > 0 {
freeSpace := int(limit) - len(existingValidators)
if freeSpace > 0 {
if len(validatorIndicesToInsert) > freeSpace { // cap inserts to the amount of free space available
log.Infof("limiting the number of validators to insert to %d", freeSpace)
validatorIndicesToInsert = validatorIndicesToInsert[:freeSpace]
}
validatorIndices = append(validatorIndices, validatorIndicesToInsert...)
}
}

if len(validatorIndices) == 0 {
// no validators to add
return []t.VDBPostValidatorsData{}, nil
}
log.Infof("inserting %d new validators and updating %d validators of dashboard %d, limit is %d", len(validatorIndicesToInsert), len(validatorIndicesToUpdate), dashboardId, limit)
return d.AddValidatorDashboardValidators(dashboardId, groupId, validatorIndices)
}

Expand All @@ -807,26 +850,110 @@ func (d *DataAccessService) AddValidatorDashboardValidatorsByWithdrawalAddress(d
if err != nil {
return nil, err
}
var validatorIndices []uint64
err = d.readerDb.Select(&validatorIndices, "SELECT validatorindex FROM validators WHERE withdrawalcredentials = $1 ORDER BY validatorindex LIMIT $2;", addressParsed, limit)
var validatorIndicesToAdd []uint64
err = d.readerDb.Select(&validatorIndicesToAdd, "SELECT validatorindex FROM validators WHERE withdrawalcredentials = $1 ORDER BY validatorindex LIMIT $2;", addressParsed, limit)
if err != nil {
return nil, err
}

// AddValidatorDashboardValidators will handle updating groups for validators already in the dashboard
// retrieve the existing validators
var existingValidators []uint64
err = d.alloyWriter.Select(&existingValidators, "SELECT validator_index FROM users_val_dashboards_validators WHERE dashboard_id = $1", dashboardId)
if err != nil {
return nil, err
}
existingValidatorsMap := make(map[uint64]bool, len(existingValidators))
for _, validatorIndex := range existingValidators {
existingValidatorsMap[validatorIndex] = true
}

// filter out the validators that are already in the dashboard
var validatorIndicesToUpdate []uint64
var validatorIndicesToInsert []uint64
for _, validatorIndex := range validatorIndicesToAdd {
if _, ok := existingValidatorsMap[validatorIndex]; ok {
validatorIndicesToUpdate = append(validatorIndicesToUpdate, validatorIndex)
} else {
validatorIndicesToInsert = append(validatorIndicesToInsert, validatorIndex)
}
}

// update the group for all existing validators
validatorIndices := make([]uint64, 0, int(limit))
validatorIndices = append(validatorIndices, validatorIndicesToUpdate...)

// insert the new validators up to the allowed user max limit taking into account how many validators are already in the dashboard
if len(validatorIndicesToInsert) > 0 {
freeSpace := int(limit) - len(existingValidators)
if freeSpace > 0 {
if len(validatorIndicesToInsert) > freeSpace { // cap inserts to the amount of free space available
log.Infof("limiting the number of validators to insert to %d", freeSpace)
validatorIndicesToInsert = validatorIndicesToInsert[:freeSpace]
}
validatorIndices = append(validatorIndices, validatorIndicesToInsert...)
}
}

if len(validatorIndices) == 0 {
// no validators to add
return []t.VDBPostValidatorsData{}, nil
}
log.Infof("inserting %d new validators and updating %d validators of dashboard %d, limit is %d", len(validatorIndicesToInsert), len(validatorIndicesToUpdate), dashboardId, limit)
return d.AddValidatorDashboardValidators(dashboardId, groupId, validatorIndices)
}

func (d *DataAccessService) AddValidatorDashboardValidatorsByGraffiti(dashboardId t.VDBIdPrimary, groupId uint64, graffiti string, limit uint64) ([]t.VDBPostValidatorsData, error) {
// for all validators already in the dashboard that are associated with the graffiti (by produced block), update the group
// then add no more than `limit` validators associated with the deposit address to the dashboard
var validatorIndices []uint64
err := d.readerDb.Select(&validatorIndices, "SELECT DISTINCT proposer FROM blocks WHERE graffiti_text = $1 ORDER BY proposer LIMIT $2;", graffiti, limit)
var validatorIndicesToAdd []uint64
err := d.readerDb.Select(&validatorIndicesToAdd, "SELECT DISTINCT proposer FROM blocks WHERE graffiti_text = $1 ORDER BY proposer LIMIT $2;", graffiti, limit)
if err != nil {
return nil, err
}

// retrieve the existing validators
var existingValidators []uint64
err = d.alloyWriter.Select(&existingValidators, "SELECT validator_index FROM users_val_dashboards_validators WHERE dashboard_id = $1", dashboardId)
if err != nil {
return nil, err
}
existingValidatorsMap := make(map[uint64]bool, len(existingValidators))
for _, validatorIndex := range existingValidators {
existingValidatorsMap[validatorIndex] = true
}

// filter out the validators that are already in the dashboard
var validatorIndicesToUpdate []uint64
var validatorIndicesToInsert []uint64
for _, validatorIndex := range validatorIndicesToAdd {
if _, ok := existingValidatorsMap[validatorIndex]; ok {
validatorIndicesToUpdate = append(validatorIndicesToUpdate, validatorIndex)
} else {
validatorIndicesToInsert = append(validatorIndicesToInsert, validatorIndex)
}
}

// update the group for all existing validators
validatorIndices := make([]uint64, 0, int(limit))
validatorIndices = append(validatorIndices, validatorIndicesToUpdate...)

// AddValidatorDashboardValidators will handle updating groups for validators already in the dashboard
// insert the new validators up to the allowed user max limit taking into account how many validators are already in the dashboard
if len(validatorIndicesToInsert) > 0 {
freeSpace := int(limit) - len(existingValidators)
if freeSpace > 0 {
if len(validatorIndicesToInsert) > freeSpace { // cap inserts to the amount of free space available
log.Infof("limiting the number of validators to insert to %d", freeSpace)
validatorIndicesToInsert = validatorIndicesToInsert[:freeSpace]
}
validatorIndices = append(validatorIndices, validatorIndicesToInsert...)
}
}

if len(validatorIndices) == 0 {
// no validators to add
return []t.VDBPostValidatorsData{}, nil
}
log.Infof("inserting %d new validators and updating %d validators of dashboard %d, limit is %d", len(validatorIndicesToInsert), len(validatorIndicesToUpdate), dashboardId, limit)
return d.AddValidatorDashboardValidators(dashboardId, groupId, validatorIndices)
}

Expand Down

0 comments on commit e5e5d98

Please sign in to comment.