diff --git a/backend/pkg/api/data_access/general.go b/backend/pkg/api/data_access/general.go index 3c94db8a1..9d7ee0d97 100644 --- a/backend/pkg/api/data_access/general.go +++ b/backend/pkg/api/data_access/general.go @@ -74,9 +74,9 @@ func applySortAndPagination(defaultColumns []types.SortColumn, primary types.Sor if cursor.IsReverse() { column.Desc = !column.Desc } - colOrder := goqu.C(column.Column).Asc() + colOrder := column.Column.Asc().NullsFirst() if column.Desc { - colOrder = goqu.C(column.Column).Desc() + colOrder = column.Column.Desc().NullsLast() } queryOrder = append(queryOrder, colOrder) } @@ -87,15 +87,21 @@ func applySortAndPagination(defaultColumns []types.SortColumn, primary types.Sor // reverse order to nest conditions for i := len(queryOrderColumns) - 1; i >= 0; i-- { column := queryOrderColumns[i] - colWhere := goqu.C(column.Column).Gt(column.Offset) - if column.Desc { - colWhere = goqu.C(column.Column).Lt(column.Offset) + var colWhere exp.Expression + + // current convention is opposite of the psql default (ASC: nulls first, DESC: nulls last) + colWhere = goqu.Or(column.Column.Lt(column.Offset), column.Column.IsNull()) + if !column.Desc { + colWhere = column.Column.Gt(column.Offset) + if column.Offset == nil { + colWhere = goqu.Or(colWhere, column.Column.IsNull()) + } } if queryWhere == nil { queryWhere = colWhere } else { - queryWhere = goqu.And(goqu.C(column.Column).Eq(column.Offset), queryWhere) + queryWhere = goqu.And(column.Column.Eq(column.Offset), queryWhere) queryWhere = goqu.Or(colWhere, queryWhere) } } diff --git a/backend/pkg/api/data_access/notifications.go b/backend/pkg/api/data_access/notifications.go index 39b10eec9..4774fed0c 100644 --- a/backend/pkg/api/data_access/notifications.go +++ b/backend/pkg/api/data_access/notifications.go @@ -344,14 +344,14 @@ func (d *DataAccessService) GetDashboardNotifications(ctx context.Context, userI // sorting defaultColumns := []t.SortColumn{ - {Column: enums.NotificationsDashboardsColumns.Timestamp.ToString(), Desc: true, Offset: currentCursor.Epoch}, - {Column: enums.NotificationsDashboardsColumns.DashboardName.ToString(), Desc: false, Offset: currentCursor.DashboardName}, - {Column: enums.NotificationsDashboardsColumns.DashboardId.ToString(), Desc: false, Offset: currentCursor.DashboardId}, - {Column: enums.NotificationsDashboardsColumns.GroupName.ToString(), Desc: false, Offset: currentCursor.GroupName}, - {Column: enums.NotificationsDashboardsColumns.GroupId.ToString(), Desc: false, Offset: currentCursor.GroupId}, - {Column: enums.NotificationsDashboardsColumns.ChainId.ToString(), Desc: true, Offset: currentCursor.ChainId}, - } - order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToString(), Desc: colSort.Desc}, currentCursor.GenericCursor) + {Column: enums.NotificationsDashboardsColumns.Timestamp.ToExpr(), Desc: true, Offset: currentCursor.Epoch}, + {Column: enums.NotificationsDashboardsColumns.DashboardName.ToExpr(), Desc: false, Offset: currentCursor.DashboardName}, + {Column: enums.NotificationsDashboardsColumns.DashboardId.ToExpr(), Desc: false, Offset: currentCursor.DashboardId}, + {Column: enums.NotificationsDashboardsColumns.GroupName.ToExpr(), Desc: false, Offset: currentCursor.GroupName}, + {Column: enums.NotificationsDashboardsColumns.GroupId.ToExpr(), Desc: false, Offset: currentCursor.GroupId}, + {Column: enums.NotificationsDashboardsColumns.ChainId.ToExpr(), Desc: true, Offset: currentCursor.ChainId}, + } + order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToExpr(), Desc: colSort.Desc}, currentCursor.GenericCursor) unionQuery = unionQuery.Order(order...) if directions != nil { unionQuery = unionQuery.Where(directions) @@ -726,9 +726,9 @@ func (d *DataAccessService) GetMachineNotifications(ctx context.Context, userId // Sorting and limiting if cursor is present defaultColumns := []t.SortColumn{ - {Column: enums.NotificationsMachinesColumns.Timestamp.ToString(), Desc: true, Offset: currentCursor.Epoch}, - {Column: enums.NotificationsMachinesColumns.MachineId.ToString(), Desc: false, Offset: currentCursor.MachineId}, - {Column: enums.NotificationsMachinesColumns.EventType.ToString(), Desc: false, Offset: currentCursor.EventType}, + {Column: enums.NotificationsMachinesColumns.Timestamp.ToExpr(), Desc: true, Offset: currentCursor.Epoch}, + {Column: enums.NotificationsMachinesColumns.MachineId.ToExpr(), Desc: false, Offset: currentCursor.MachineId}, + {Column: enums.NotificationsMachinesColumns.EventType.ToExpr(), Desc: false, Offset: currentCursor.EventType}, } var offset interface{} switch colSort.Column { @@ -738,7 +738,7 @@ func (d *DataAccessService) GetMachineNotifications(ctx context.Context, userId offset = currentCursor.EventThreshold } - order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToString(), Desc: colSort.Desc, Offset: offset}, currentCursor.GenericCursor) + order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToExpr(), Desc: colSort.Desc, Offset: offset}, currentCursor.GenericCursor) ds = ds.Order(order...) if directions != nil { ds = ds.Where(directions) @@ -847,10 +847,10 @@ func (d *DataAccessService) GetClientNotifications(ctx context.Context, userId u // Sorting and limiting if cursor is present // Rows can be uniquely identified by (epoch, client) defaultColumns := []t.SortColumn{ - {Column: enums.NotificationsClientsColumns.Timestamp.ToString(), Desc: true, Offset: currentCursor.Epoch}, - {Column: enums.NotificationsClientsColumns.ClientName.ToString(), Desc: false, Offset: currentCursor.Client}, + {Column: enums.NotificationsClientsColumns.Timestamp.ToExpr(), Desc: true, Offset: currentCursor.Epoch}, + {Column: enums.NotificationsClientsColumns.ClientName.ToExpr(), Desc: false, Offset: currentCursor.Client}, } - order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToString(), Desc: colSort.Desc}, currentCursor.GenericCursor) + order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToExpr(), Desc: colSort.Desc}, currentCursor.GenericCursor) ds = ds.Order(order...) if directions != nil { ds = ds.Where(directions) @@ -1138,11 +1138,11 @@ func (d *DataAccessService) GetNetworkNotifications(ctx context.Context, userId // Sorting and limiting if cursor is present // Rows can be uniquely identified by (epoch, network, event_type) defaultColumns := []t.SortColumn{ - {Column: enums.NotificationNetworksColumns.Timestamp.ToString(), Desc: true, Offset: currentCursor.Epoch}, - {Column: enums.NotificationNetworksColumns.Network.ToString(), Desc: false, Offset: currentCursor.Network}, - {Column: enums.NotificationNetworksColumns.EventType.ToString(), Desc: false, Offset: currentCursor.EventType}, + {Column: enums.NotificationNetworksColumns.Timestamp.ToExpr(), Desc: true, Offset: currentCursor.Epoch}, + {Column: enums.NotificationNetworksColumns.Network.ToExpr(), Desc: false, Offset: currentCursor.Network}, + {Column: enums.NotificationNetworksColumns.EventType.ToExpr(), Desc: false, Offset: currentCursor.EventType}, } - order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToString(), Desc: colSort.Desc}, currentCursor.GenericCursor) + order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToExpr(), Desc: colSort.Desc}, currentCursor.GenericCursor) ds = ds.Order(order...) if directions != nil { ds = ds.Where(directions) diff --git a/backend/pkg/api/data_access/vdb_blocks.go b/backend/pkg/api/data_access/vdb_blocks.go index ca4a06f88..47bbd651f 100644 --- a/backend/pkg/api/data_access/vdb_blocks.go +++ b/backend/pkg/api/data_access/vdb_blocks.go @@ -9,6 +9,8 @@ import ( "strings" "time" + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/exp" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/gobitfly/beaconchain/pkg/api/enums" t "github.com/gobitfly/beaconchain/pkg/api/types" @@ -17,13 +19,21 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/lib/pq" "github.com/shopspring/decimal" ) func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, dashboardId t.VDBId, cursor string, colSort t.Sort[enums.VDBBlocksColumn], search string, limit uint64, protocolModes t.VDBProtocolModes) ([]t.VDBBlocksTableRow, *t.Paging, error) { // @DATA-ACCESS incorporate protocolModes + + // ------------------------------------- + // Setup var err error var currentCursor t.BlocksCursor + validatorMapping, err := d.services.GetCurrentValidatorMapping() + if err != nil { + return nil, nil, err + } // TODO @LuccaBitfly move validation to handler? if cursor != "" { @@ -32,312 +42,279 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das } } - // regexes taken from api handler common.go searchPubkey := regexp.MustCompile(`^0x[0-9a-fA-F]{96}$`).MatchString(search) searchGroup := regexp.MustCompile(`^[a-zA-Z0-9_\-.\ ]+$`).MatchString(search) searchIndex := regexp.MustCompile(`^[0-9]+$`).MatchString(search) - validatorMap := make(map[t.VDBValidator]bool) - params := []interface{}{} - filteredValidatorsQuery := "" - validatorMapping, err := d.services.GetCurrentValidatorMapping() - if err != nil { - return nil, nil, err + validators := goqu.T("validators") // could adapt data type to make handling as table/alias less confusing + blocks := goqu.T("blocks") + groups := goqu.T("groups") + + type validatorGroup struct { + Validator t.VDBValidator `db:"validator_index"` + Group uint64 `db:"group_id"` } - // determine validators of interest first - if dashboardId.Validators == nil { - // could also optimize this for the average and/or the whale case; will go with some middle-ground, needs testing - // (query validators twice: once without search applied (fast) to pre-filter scheduled proposals (which are sent to db, want to minimize), - // again for blocks query with search applied to not having to send potentially huge validator-list) - startTime := time.Now() - valis, err := d.getDashboardValidators(ctx, dashboardId, nil) - log.Debugf("=== getting validators took %s", time.Since(startTime)) - if err != nil { - return nil, nil, err - } - for _, v := range valis { - validatorMap[v] = true - } + // ------------------------------------- + // Goqu Query to determine validators filtered by search + var filteredValidatorsDs *goqu.SelectDataset + var filteredValidators []validatorGroup - // create a subquery to get the (potentially filtered) validators and their groups for later - params = append(params, dashboardId.Id) - selectStr := `SELECT validator_index, group_id ` - from := `FROM users_val_dashboards_validators validators ` - where := `WHERE validators.dashboard_id = $1` - extraConds := make([]string, 0, 3) + filteredValidatorsDs = goqu.Dialect("postgres"). + Select( + "validator_index", + ) + if dashboardId.Validators == nil { + filteredValidatorsDs = filteredValidatorsDs. + From(goqu.T("users_val_dashboards_validators").As(validators.GetTable())). + Where(validators.Col("dashboard_id").Eq(dashboardId.Id)) + // apply search filters + searches := []exp.Expression{} if searchIndex { - params = append(params, search) - extraConds = append(extraConds, fmt.Sprintf(`validator_index = $%d`, len(params))) + searches = append(searches, validators.Col("validator_index").Eq(search)) } if searchGroup { - from += `INNER JOIN users_val_dashboards_groups groups ON validators.dashboard_id = groups.dashboard_id AND validators.group_id = groups.id ` - // escape the psql single character wildcard "_"; apply prefix-search - params = append(params, strings.Replace(search, "_", "\\_", -1)+"%") - extraConds = append(extraConds, fmt.Sprintf(`LOWER(name) LIKE LOWER($%d)`, len(params))) + filteredValidatorsDs = filteredValidatorsDs. + InnerJoin(goqu.T("users_val_dashboards_groups").As(groups), goqu.On( + validators.Col("group_id").Eq(groups.Col("id")), + validators.Col("dashboard_id").Eq(groups.Col("dashboard_id")), + )) + searches = append(searches, + goqu.L("LOWER(?)", groups.Col("name")).Like(strings.Replace(strings.ToLower(search), "_", "\\_", -1)+"%"), + ) } if searchPubkey { index, ok := validatorMapping.ValidatorIndices[search] - if !ok && len(extraConds) == 0 { - // don't even need to query + if !ok && !searchGroup && !searchIndex { + // searched pubkey doesn't exist, don't even need to query anything return make([]t.VDBBlocksTableRow, 0), &t.Paging{}, nil } - params = append(params, index) - extraConds = append(extraConds, fmt.Sprintf(`validator_index = $%d`, len(params))) + searches = append(searches, + validators.Col("validator_index").Eq(index), + ) } - if len(extraConds) > 0 { - where += ` AND (` + strings.Join(extraConds, ` OR `) + `)` + if len(searches) > 0 { + filteredValidatorsDs = filteredValidatorsDs.Where(goqu.Or(searches...)) } - - filteredValidatorsQuery = selectStr + from + where } else { - validators := make([]t.VDBValidator, 0, len(dashboardId.Validators)) + validatorList := make([]t.VDBValidator, 0, len(dashboardId.Validators)) for _, validator := range dashboardId.Validators { if searchIndex && fmt.Sprint(validator) != search || searchPubkey && validator != validatorMapping.ValidatorIndices[search] { continue } - validatorMap[validator] = true - validators = append(validators, validator) + filteredValidators = append(filteredValidators, validatorGroup{ + Validator: validator, + Group: t.DefaultGroupId, + }) + validatorList = append(validatorList, validator) if searchIndex || searchPubkey { break } } - if len(validators) == 0 { - return make([]t.VDBBlocksTableRow, 0), &t.Paging{}, nil - } - params = append(params, validators) + filteredValidatorsDs = filteredValidatorsDs. + From( + goqu.Dialect("postgres"). + From( + goqu.L("unnest(?::int[])", pq.Array(validatorList)).As("validator_index"), + ). + As(validators.GetTable()), + ) } - var proposals []struct { - Proposer t.VDBValidator `db:"proposer"` - Group uint64 `db:"group_id"` - Epoch uint64 `db:"epoch"` - Slot uint64 `db:"slot"` - Status uint64 `db:"status"` - Block sql.NullInt64 `db:"exec_block_number"` - FeeRecipient []byte `db:"fee_recipient"` - ElReward decimal.NullDecimal `db:"el_reward"` - ClReward decimal.NullDecimal `db:"cl_reward"` - GraffitiText string `db:"graffiti_text"` + // ------------------------------------- + // Constuct final query + var blocksDs *goqu.SelectDataset - // for cursor only - Reward decimal.Decimal + // 1. Tables + blocksDs = filteredValidatorsDs. + InnerJoin(blocks, goqu.On( + blocks.Col("proposer").Eq(validators.Col("validator_index")), + )). + LeftJoin(goqu.T("consensus_payloads").As("cp"), goqu.On( + blocks.Col("slot").Eq(goqu.I("cp.slot")), + )). + LeftJoin(goqu.T("execution_payloads").As("ep"), goqu.On( + blocks.Col("exec_block_hash").Eq(goqu.I("ep.block_hash")), + )). + LeftJoin( + // relay bribe deduplication; select most likely (=max) relay bribe value for each block + goqu.Lateral(goqu.Dialect("postgres"). + From(goqu.T("relays_blocks")). + Select( + goqu.I("relays_blocks.exec_block_hash"), + goqu.I("relays_blocks.proposer_fee_recipient"), + goqu.MAX(goqu.I("relays_blocks.value")).As("value")). + GroupBy( + "exec_block_hash", + "proposer_fee_recipient", + )).As("rb"), + goqu.On( + goqu.I("rb.exec_block_hash").Eq(blocks.Col("exec_block_hash")), + ), + ) + + // 2. Selects + groupIdQ := goqu.C("group_id").(exp.Aliaseable) + if dashboardId.Validators != nil { + groupIdQ = exp.NewLiteralExpression("?::int", t.DefaultGroupId) } + groupId := groupIdQ.As("group_id") - // handle sorting - where := `` - orderBy := `ORDER BY ` - sortOrder := ` ASC` - if colSort.Desc { - sortOrder = ` DESC` + blocksDs = blocksDs. + SelectAppend( + blocks.Col("epoch"), + blocks.Col("slot"), + groupId, + blocks.Col("status"), + blocks.Col("exec_block_number"), + blocks.Col("graffiti_text"), + goqu.COALESCE(goqu.I("rb.proposer_fee_recipient"), blocks.Col("exec_fee_recipient")).As("fee_recipient"), + goqu.COALESCE(goqu.L("rb.value / 1e18"), goqu.I("ep.fee_recipient_reward")).As("el_reward"), + goqu.L("cp.cl_attestations_reward / 1e9 + cp.cl_sync_aggregate_reward / 1e9 + cp.cl_slashing_inclusion_reward / 1e9").As("cl_reward"), + ) + + // 3. Sorting and pagination + defaultColumns := []t.SortColumn{ + {Column: enums.VDBBlocksColumns.Slot.ToExpr(), Desc: true, Offset: currentCursor.Slot}, } - var val any - sortColName := `slot` + var offset any switch colSort.Column { - case enums.VDBBlockProposer: - sortColName = `proposer` - val = currentCursor.Proposer - case enums.VDBBlockStatus: - sortColName = `status` - val = currentCursor.Status - case enums.VDBBlockProposerReward: - sortColName = `el_reward + cl_reward` - val = currentCursor.Reward - } - onlyPrimarySort := sortColName == `slot` - if currentCursor.IsValid() { - sign := ` > ` - if colSort.Desc && !currentCursor.IsReverse() || !colSort.Desc && currentCursor.IsReverse() { - sign = ` < ` + case enums.VDBBlocksColumns.Proposer: + offset = currentCursor.Proposer + case enums.VDBBlocksColumns.Block: + offset = currentCursor.Block + if !currentCursor.Block.Valid { + offset = nil } - if currentCursor.IsReverse() { - if sortOrder == ` ASC` { - sortOrder = ` DESC` - } else { - sortOrder = ` ASC` - } - } - params = append(params, currentCursor.Slot) - where += `WHERE (` - if onlyPrimarySort { - where += `slot` + sign + fmt.Sprintf(`$%d`, len(params)) - } else { - params = append(params, val) - secSign := ` < ` - if currentCursor.IsReverse() { - secSign = ` > ` - } - if sortColName == "status" { - // explicit cast to int because type of 'status' column is text for some reason - sortColName += "::int" - } - where += fmt.Sprintf(`(slot`+secSign+`$%d AND `+sortColName+` = $%d) OR `+sortColName+sign+`$%d`, len(params)-1, len(params), len(params)) - } - where += `) ` - } - if sortOrder == ` ASC` { - sortOrder += ` NULLS FIRST` - } else { - sortOrder += ` NULLS LAST` + case enums.VDBBlocksColumns.Status: + offset = fmt.Sprintf("%d", currentCursor.Status) // type of 'status' column is text for some reason + case enums.VDBBlocksColumns.ProposerReward: + offset = currentCursor.Reward } - orderBy += sortColName + sortOrder - secSort := `DESC` - if !onlyPrimarySort { - if currentCursor.IsReverse() { - secSort = `ASC` - } - orderBy += `, slot ` + secSort + + order, directions := applySortAndPagination(defaultColumns, t.SortColumn{Column: colSort.Column.ToExpr(), Desc: colSort.Desc, Offset: offset}, currentCursor.GenericCursor) + blocksDs = goqu.Dialect("postgres").From(goqu.T("past_blocks_cte")). + With("past_blocks_cte", blocksDs). // encapsulate so we can use selected fields + Order(order...) + if directions != nil { + blocksDs = blocksDs.Where(directions) } - // Get scheduled blocks. They aren't written to blocks table, get from duties - // Will just pass scheduled proposals to query and let db do the sorting etc - var scheduledProposers []t.VDBValidator - var scheduledEpochs []uint64 - var scheduledSlots []uint64 - // don't need to query if requested slots are in the past + // 4. Limit + blocksDs = blocksDs.Limit(uint(limit + 1)) + + // 5. Gather and supply scheduled blocks to let db do the sorting etc latestSlot := cache.LatestSlot.Get() - if !onlyPrimarySort || !currentCursor.IsValid() || - currentCursor.Slot > latestSlot+1 && currentCursor.Reverse != colSort.Desc || - currentCursor.Slot < latestSlot+1 && currentCursor.Reverse == colSort.Desc { + onlyPrimarySort := colSort.Column == enums.VDBBlockSlot + if !(onlyPrimarySort || colSort.Column == enums.VDBBlockBlock) || + !currentCursor.IsValid() || + currentCursor.Slot > latestSlot+1 || + colSort.Desc == currentCursor.Reverse { dutiesInfo, err := d.services.GetCurrentDutiesInfo() if err == nil { + if dashboardId.Validators == nil { + // fetch filtered validators if not done yet + filteredValidatorsDs = filteredValidatorsDs. + SelectAppend(groupIdQ) + validatorsQuery, validatorsArgs, err := filteredValidatorsDs.Prepared(true).ToSQL() + if err != nil { + return nil, nil, err + } + if err = d.alloyReader.SelectContext(ctx, &filteredValidators, validatorsQuery, validatorsArgs...); err != nil { + return nil, nil, err + } + } + if len(filteredValidators) == 0 { + return make([]t.VDBBlocksTableRow, 0), &t.Paging{}, nil + } + + validatorSet := make(map[t.VDBValidator]uint64) + for _, v := range filteredValidators { + validatorSet[v.Validator] = v.Group + } + var scheduledProposers []t.VDBValidator + var scheduledGroups []uint64 + var scheduledEpochs []uint64 + var scheduledSlots []uint64 + // don't need if requested slots are in the past for slot, vali := range dutiesInfo.PropAssignmentsForSlot { // only gather scheduled slots if _, ok := dutiesInfo.SlotStatus[slot]; ok { continue } // only gather slots scheduled for our validators - if _, ok := validatorMap[vali]; !ok { + if _, ok := validatorSet[vali]; !ok { continue } scheduledProposers = append(scheduledProposers, dutiesInfo.PropAssignmentsForSlot[slot]) + scheduledGroups = append(scheduledGroups, validatorSet[vali]) scheduledEpochs = append(scheduledEpochs, slot/utils.Config.Chain.ClConfig.SlotsPerEpoch) scheduledSlots = append(scheduledSlots, slot) } - } else { - log.Debugf("duties info not available, skipping scheduled slots: %s", err) - } - } - groupIdCol := "group_id" - if dashboardId.Validators != nil { - groupIdCol = fmt.Sprintf("%d AS %s", t.DefaultGroupId, groupIdCol) - } - selectFields := fmt.Sprintf(` - blocks.proposer, - blocks.epoch, - blocks.slot, - %s, - blocks.status, - exec_block_number, - COALESCE(rb.proposer_fee_recipient, blocks.exec_fee_recipient) AS fee_recipient, - COALESCE(rb.value / 1e18, ep.fee_recipient_reward) AS el_reward, - cp.cl_attestations_reward / 1e9 + cp.cl_sync_aggregate_reward / 1e9 + cp.cl_slashing_inclusion_reward / 1e9 as cl_reward, - blocks.graffiti_text`, groupIdCol) - cte := fmt.Sprintf(`WITH past_blocks AS (SELECT - %s - FROM blocks - `, selectFields) - /*if dashboardId.Validators == nil { - query += ` - LEFT JOIN cached_proposal_rewards ON cached_proposal_rewards.dashboard_id = $1 AND blocks.slot = cached_proposal_rewards.slot - ` - } else { - query += ` - LEFT JOIN execution_payloads ep ON ep.block_hash = blocks.exec_block_hash - LEFT JOIN relays_blocks rb ON rb.exec_block_hash = blocks.exec_block_hash - ` - } + scheduledDs := goqu.Dialect("postgres"). + From( + goqu.L("unnest(?::int[], ?::int[], ?::int[], ?::int[]) AS prov(validator_index, group_id, epoch, slot)", pq.Array(scheduledProposers), pq.Array(scheduledGroups), pq.Array(scheduledEpochs), pq.Array(scheduledSlots)), + ). + Select( + goqu.C("validator_index"), + goqu.C("epoch"), + goqu.C("slot"), + groupId, + goqu.V("0").As("status"), + goqu.V(nil).As("exec_block_number"), + goqu.V(nil).As("graffiti_text"), + goqu.V(nil).As("fee_recipient"), + goqu.V(nil).As("el_reward"), + goqu.V(nil).As("cl_reward"), + ). + As("scheduled_blocks") - // shrink selection to our filtered validators - if len(scheduledProposers) > 0 { - query += `)` - } - query += `) as u `*/ - if dashboardId.Validators == nil { - cte += fmt.Sprintf(` - INNER JOIN (%s) validators ON validators.validator_index = proposer - `, filteredValidatorsQuery) - } else { - if len(where) == 0 { - where += `WHERE ` + // Supply to result query + // distinct + block number ordering to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was) + // might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved + blocksDs = goqu.Dialect("Postgres"). + From(blocksDs.Union(scheduledDs)). // wrap union to apply order + Order(order...). + OrderAppend(goqu.C("exec_block_number").Desc().NullsLast()). + Limit(uint(limit + 1)). + Distinct(enums.VDBBlocksColumns.Slot.ToExpr()) + if directions != nil { + blocksDs = blocksDs.Where(directions) + } + if !onlyPrimarySort { + blocksDs = blocksDs. + Distinct(colSort.Column.ToExpr(), enums.VDBBlocksColumns.Slot.ToExpr()) + } } else { - where += `AND ` + log.Warnf("Error getting scheduled proposals, DutiesInfo not available in Redis: %s", err) } - where += `proposer = ANY($1) ` } - params = append(params, limit+1) - limitStr := fmt.Sprintf(` - LIMIT $%d - `, len(params)) - // relay bribe deduplication; select most likely (=max) relay bribe value for each block - cte += ` - LEFT JOIN consensus_payloads cp on blocks.slot = cp.slot - LEFT JOIN execution_payloads ep ON ep.block_hash = blocks.exec_block_hash - LEFT JOIN LATERAL (SELECT exec_block_hash, proposer_fee_recipient, max(value) as value - FROM relays_blocks - WHERE relays_blocks.exec_block_hash = blocks.exec_block_hash - GROUP BY exec_block_hash, proposer_fee_recipient - ) rb ON rb.exec_block_hash = blocks.exec_block_hash - ) - ` - - from := `past_blocks ` - selectStr := `SELECT * FROM ` + // ------------------------------------- + // Execute query + var proposals []struct { + Proposer t.VDBValidator `db:"validator_index"` + Group uint64 `db:"group_id"` + Epoch uint64 `db:"epoch"` + Slot uint64 `db:"slot"` + Status uint64 `db:"status"` + Block sql.NullInt64 `db:"exec_block_number"` + FeeRecipient []byte `db:"fee_recipient"` + ElReward decimal.NullDecimal `db:"el_reward"` + ClReward decimal.NullDecimal `db:"cl_reward"` + GraffitiText sql.NullString `db:"graffiti_text"` - query := selectStr + from + where + orderBy + limitStr - // supply scheduled proposals, if any - if len(scheduledProposers) > 0 { - // distinct to filter out duplicates in an edge case (if dutiesInfo didn't update yet after a block was proposed, but the blocks table was) - // might be possible to remove this once the TODO in service_slot_viz.go:startSlotVizDataService is resolved - params = append(params, scheduledProposers) - params = append(params, scheduledEpochs) - params = append(params, scheduledSlots) - cte += fmt.Sprintf(`, - scheduled_blocks as ( - SELECT - prov.proposer, - prov.epoch, - prov.slot, - %s, - '0'::text AS status, - NULL::int AS exec_block_number, - ''::bytea AS fee_recipient, - NULL::float AS el_reward, - NULL::float AS cl_reward, - ''::text AS graffiti_text - FROM unnest($%d::int[], $%d::int[], $%d::int[]) AS prov(proposer, epoch, slot) - `, groupIdCol, len(params)-2, len(params)-1, len(params)) - if dashboardId.Validators == nil { - // add group id - cte += fmt.Sprintf(`INNER JOIN users_val_dashboards_validators validators - ON validators.dashboard_id = $1 - AND validators.validator_index = ANY($%d::int[]) - `, len(params)-2) - } - cte += `) ` - distinct := "slot" - if !onlyPrimarySort { - distinct = sortColName + ", " + distinct - } - // keep all ordering, sorting etc - selectStr = `SELECT DISTINCT ON (` + distinct + `) * FROM ` - // encapsulate past blocks query to ensure performance - from = `( - ( ` + query + ` ) - UNION ALL - SELECT * FROM scheduled_blocks - ) as combined - ` - // make sure the distinct clause filters out the correct duplicated row (e.g. block=nil) - orderBy += `, exec_block_number NULLS LAST` - query = selectStr + from + where + orderBy + limitStr + // for cursor only + Reward decimal.Decimal } - startTime := time.Now() - err = d.alloyReader.SelectContext(ctx, &proposals, cte+query, params...) + query, args, err := blocksDs.Prepared(true).ToSQL() + if err != nil { + return nil, nil, err + } + err = d.alloyReader.SelectContext(ctx, &proposals, query, args...) log.Debugf("=== getting past blocks took %s", time.Since(startTime)) if err != nil { return nil, nil, err @@ -345,6 +322,9 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das if len(proposals) == 0 { return make([]t.VDBBlocksTableRow, 0), &t.Paging{}, nil } + + // ------------------------------------- + // Prepare result moreDataFlag := len(proposals) > int(limit) if moreDataFlag { proposals = proposals[:len(proposals)-1] @@ -379,10 +359,14 @@ func (d *DataAccessService) GetValidatorDashboardBlocks(ctx context.Context, das if proposal.Status == 0 || proposal.Status == 2 { continue } - graffiti := proposal.GraffitiText - data[i].Graffiti = &graffiti - block := uint64(proposal.Block.Int64) - data[i].Block = &block + if proposal.GraffitiText.Valid { + graffiti := proposal.GraffitiText.String + data[i].Graffiti = &graffiti + } + if proposal.Block.Valid { + block := uint64(proposal.Block.Int64) + data[i].Block = &block + } if proposal.Status == 3 { continue } diff --git a/backend/pkg/api/enums/notifications_enums.go b/backend/pkg/api/enums/notifications_enums.go index 033141edd..9470b8caa 100644 --- a/backend/pkg/api/enums/notifications_enums.go +++ b/backend/pkg/api/enums/notifications_enums.go @@ -1,5 +1,7 @@ package enums +import "github.com/doug-martin/goqu/v9" + // ------------------------------------------------------------ // Notifications Dashboard Table Columns @@ -34,22 +36,22 @@ func (NotificationDashboardsColumn) NewFromString(s string) NotificationDashboar } // internal use, used to map to query column names -func (c NotificationDashboardsColumn) ToString() string { +func (c NotificationDashboardsColumn) ToExpr() OrderableSortable { switch c { case NotificationDashboardChainId: - return "chain_id" + return goqu.C("chain_id") case NotificationDashboardEpoch: - return "epoch" + return goqu.C("epoch") case NotificationDashboardDashboardName: - return "dashboard_name" + return goqu.C("dashboard_name") case NotificationDashboardDashboardId: - return "dashboard_id" + return goqu.C("dashboard_id") case NotificationDashboardGroupName: - return "group_name" + return goqu.C("group_name") case NotificationDashboardGroupId: - return "group_id" + return goqu.C("group_id") default: - return "" + return nil } } @@ -104,20 +106,20 @@ func (NotificationMachinesColumn) NewFromString(s string) NotificationMachinesCo } // internal use, used to map to query column names -func (c NotificationMachinesColumn) ToString() string { +func (c NotificationMachinesColumn) ToExpr() OrderableSortable { switch c { case NotificationMachineId: - return "machine_id" + return goqu.C("machine_id") case NotificationMachineName: - return "machine_name" + return goqu.C("machine_name") case NotificationMachineThreshold: - return "threshold" + return goqu.C("threshold") case NotificationMachineEventType: - return "event_type" + return goqu.C("event_type") case NotificationMachineTimestamp: - return "epoch" + return goqu.C("epoch") default: - return "" + return nil } } @@ -163,14 +165,14 @@ func (NotificationClientsColumn) NewFromString(s string) NotificationClientsColu } // internal use, used to map to query column names -func (c NotificationClientsColumn) ToString() string { +func (c NotificationClientsColumn) ToExpr() OrderableSortable { switch c { case NotificationClientName: - return "client" + return goqu.C("client") case NotificationClientTimestamp: - return "epoch" + return goqu.C("epoch") default: - return "" + return nil } } @@ -251,16 +253,16 @@ func (NotificationNetworksColumn) NewFromString(s string) NotificationNetworksCo } // internal use, used to map to query column names -func (c NotificationNetworksColumn) ToString() string { +func (c NotificationNetworksColumn) ToExpr() OrderableSortable { switch c { case NotificationNetworkTimestamp: - return "epoch" + return goqu.C("epoch") case NotificationNetworkNetwork: - return "network" + return goqu.C("network") case NotificationNetworkEventType: - return "event_type" + return goqu.C("event_type") default: - return "" + return nil } } diff --git a/backend/pkg/api/enums/validator_dashboard_enums.go b/backend/pkg/api/enums/validator_dashboard_enums.go index 58e87f337..c241ef98e 100644 --- a/backend/pkg/api/enums/validator_dashboard_enums.go +++ b/backend/pkg/api/enums/validator_dashboard_enums.go @@ -1,5 +1,10 @@ package enums +import ( + "github.com/doug-martin/goqu/v9" + "github.com/doug-martin/goqu/v9/exp" +) + // ---------------- // Validator Dashboard Summary Table @@ -156,6 +161,29 @@ func (VDBBlocksColumn) NewFromString(s string) VDBBlocksColumn { } } +type OrderableSortable interface { + exp.Orderable + exp.Comparable + exp.Isable +} + +func (c VDBBlocksColumn) ToExpr() OrderableSortable { + switch c { + case VDBBlockProposer: + return goqu.C("validator_index") + case VDBBlockSlot: + return goqu.C("slot") + case VDBBlockBlock: + return goqu.C("exec_block_number") + case VDBBlockStatus: + return goqu.C("status") + case VDBBlockProposerReward: + return goqu.L("el_reward + cl_reward") + default: + return nil + } +} + var VDBBlocksColumns = struct { Proposer VDBBlocksColumn Slot VDBBlocksColumn diff --git a/backend/pkg/api/types/data_access.go b/backend/pkg/api/types/data_access.go index 8cf95256e..5a21aa0f9 100644 --- a/backend/pkg/api/types/data_access.go +++ b/backend/pkg/api/types/data_access.go @@ -1,6 +1,7 @@ package types import ( + "database/sql" "time" "github.com/gobitfly/beaconchain/pkg/api/enums" @@ -24,10 +25,10 @@ type Sort[T enums.Enum] struct { } type SortColumn struct { - Column string + // defaults + Column enums.OrderableSortable Desc bool - // represents value from cursor - Offset any + Offset any // nil to indicate null value } type VDBIdPrimary int @@ -165,11 +166,10 @@ type UserCredentialInfo struct { type BlocksCursor struct { GenericCursor - Slot uint64 // basically the same as Block, Epoch, Age; mandatory, used to index - // optional, max one of those (for now) Proposer uint64 - Group uint64 + Slot uint64 // same as Age + Block sql.NullInt64 Status uint64 Reward decimal.Decimal } diff --git a/frontend/components/dashboard/table/DashboardTableBlocks.vue b/frontend/components/dashboard/table/DashboardTableBlocks.vue index daa3a6e2f..01227c95f 100644 --- a/frontend/components/dashboard/table/DashboardTableBlocks.vue +++ b/frontend/components/dashboard/table/DashboardTableBlocks.vue @@ -52,7 +52,7 @@ const loadData = (query?: TableQueryParams) => { if (!query) { query = { limit: pageSize.value, - sort: 'block:desc', + sort: 'slot:desc', } } setQuery(query, true, true)