From 355f1fb3aa637751c47e82eb924210fec4849ae9 Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 10:16:31 +0000
Subject: [PATCH 1/8] feat(notifications): implement group efficiency
notifications
---
backend/pkg/api/data_access/vdb_helpers.go | 24 +---
backend/pkg/api/data_access/vdb_summary.go | 6 +-
backend/pkg/commons/utils/efficiency.go | 25 ++++
backend/pkg/notification/collection.go | 160 +++++++++++++++++++++
backend/pkg/notification/types.go | 78 ++++------
5 files changed, 218 insertions(+), 75 deletions(-)
create mode 100644 backend/pkg/commons/utils/efficiency.go
diff --git a/backend/pkg/api/data_access/vdb_helpers.go b/backend/pkg/api/data_access/vdb_helpers.go
index 12bfe2c46..30fd72f61 100644
--- a/backend/pkg/api/data_access/vdb_helpers.go
+++ b/backend/pkg/api/data_access/vdb_helpers.go
@@ -41,28 +41,6 @@ func (d DataAccessService) getDashboardValidators(ctx context.Context, dashboard
return dashboardId.Validators, nil
}
-func (d DataAccessService) calculateTotalEfficiency(attestationEff, proposalEff, syncEff sql.NullFloat64) float64 {
- efficiency := float64(0)
-
- if !attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid {
- efficiency = 0
- } else if attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid {
- efficiency = attestationEff.Float64 * 100.0
- } else if attestationEff.Valid && proposalEff.Valid && !syncEff.Valid {
- efficiency = ((56.0 / 64.0 * attestationEff.Float64) + (8.0 / 64.0 * proposalEff.Float64)) * 100.0
- } else if attestationEff.Valid && !proposalEff.Valid && syncEff.Valid {
- efficiency = ((62.0 / 64.0 * attestationEff.Float64) + (2.0 / 64.0 * syncEff.Float64)) * 100.0
- } else {
- efficiency = (((54.0 / 64.0) * attestationEff.Float64) + ((8.0 / 64.0) * proposalEff.Float64) + ((2.0 / 64.0) * syncEff.Float64)) * 100.0
- }
-
- if efficiency < 0 {
- efficiency = 0
- }
-
- return efficiency
-}
-
func (d DataAccessService) calculateChartEfficiency(efficiencyType enums.VDBSummaryChartEfficiencyType, row *t.VDBValidatorSummaryChartRow) (float64, error) {
efficiency := float64(0)
switch efficiencyType {
@@ -81,7 +59,7 @@ func (d DataAccessService) calculateChartEfficiency(efficiencyType enums.VDBSumm
syncEfficiency.Valid = true
}
- efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
+ efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
case enums.VDBSummaryChartAttestation:
if row.AttestationIdealReward > 0 {
efficiency = (row.AttestationReward / row.AttestationIdealReward) * 100
diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go
index 7f51b5bed..0c83b0ddc 100644
--- a/backend/pkg/api/data_access/vdb_summary.go
+++ b/backend/pkg/api/data_access/vdb_summary.go
@@ -90,7 +90,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da
if err != nil {
return nil, nil, err
}
- averageNetworkEfficiency := d.calculateTotalEfficiency(
+ averageNetworkEfficiency := utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[period], efficiency.ProposalEfficiency[period], efficiency.SyncEfficiency[period])
// ------------------------------------------------------------------------------------------------------------------
@@ -366,7 +366,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da
syncEfficiency.Float64 = float64(queryEntry.SyncExecuted) / float64(queryEntry.SyncScheduled)
syncEfficiency.Valid = true
}
- resultEntry.Efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
+ resultEntry.Efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
// Add the duties info to the total
total.AttestationReward = total.AttestationReward.Add(queryEntry.AttestationReward)
@@ -486,7 +486,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da
totalSyncEfficiency.Float64 = float64(total.SyncExecuted) / float64(total.SyncScheduled)
totalSyncEfficiency.Valid = true
}
- totalEntry.Efficiency = d.calculateTotalEfficiency(totalAttestationEfficiency, totalProposerEfficiency, totalSyncEfficiency)
+ totalEntry.Efficiency = utils.CalculateTotalEfficiency(totalAttestationEfficiency, totalProposerEfficiency, totalSyncEfficiency)
result = append([]t.VDBSummaryTableRow{totalEntry}, result...)
}
diff --git a/backend/pkg/commons/utils/efficiency.go b/backend/pkg/commons/utils/efficiency.go
new file mode 100644
index 000000000..5bb7cd57c
--- /dev/null
+++ b/backend/pkg/commons/utils/efficiency.go
@@ -0,0 +1,25 @@
+package utils
+
+import "database/sql"
+
+func CalculateTotalEfficiency(attestationEff, proposalEff, syncEff sql.NullFloat64) float64 {
+ efficiency := float64(0)
+
+ if !attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid {
+ efficiency = 0
+ } else if attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid {
+ efficiency = attestationEff.Float64 * 100.0
+ } else if attestationEff.Valid && proposalEff.Valid && !syncEff.Valid {
+ efficiency = ((56.0 / 64.0 * attestationEff.Float64) + (8.0 / 64.0 * proposalEff.Float64)) * 100.0
+ } else if attestationEff.Valid && !proposalEff.Valid && syncEff.Valid {
+ efficiency = ((62.0 / 64.0 * attestationEff.Float64) + (2.0 / 64.0 * syncEff.Float64)) * 100.0
+ } else {
+ efficiency = (((54.0 / 64.0) * attestationEff.Float64) + ((8.0 / 64.0) * proposalEff.Float64) + ((2.0 / 64.0) * syncEff.Float64)) * 100.0
+ }
+
+ if efficiency < 0 {
+ efficiency = 0
+ }
+
+ return efficiency
+}
diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go
index 7633e786a..5bbb3322a 100644
--- a/backend/pkg/notification/collection.go
+++ b/backend/pkg/notification/collection.go
@@ -12,6 +12,7 @@ import (
"time"
gcp_bigtable "cloud.google.com/go/bigtable"
+ "github.com/doug-martin/goqu/v9"
"github.com/ethereum/go-ethereum/common"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
"github.com/gobitfly/beaconchain/pkg/commons/db"
@@ -24,6 +25,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/lib/pq"
"github.com/rocket-pool/rocketpool-go/utils/eth"
+ "github.com/shopspring/decimal"
)
func InitNotificationCollector(pubkeyCachePath string) {
@@ -308,6 +310,13 @@ func collectNotifications(epoch uint64) (types.NotificationsPerUserId, error) {
// The following functions will collect the notifications and add them to the
// notificationsByUserID map. The notifications will be queued and sent later
// by the notification sender process
+ err = collectGroupEfficiencyNotifications(notificationsByUserID, epoch)
+ if err != nil {
+ metrics.Errors.WithLabelValues("notifications_collect_group_efficiency").Inc()
+ return nil, fmt.Errorf("error collecting validator_group_efficiency notifications: %v", err)
+ }
+ log.Infof("collecting attestation & offline notifications took: %v", time.Since(start))
+
err = collectAttestationAndOfflineValidatorNotifications(notificationsByUserID, epoch)
if err != nil {
metrics.Errors.WithLabelValues("notifications_collect_missed_attestation").Inc()
@@ -460,6 +469,157 @@ func collectUserDbNotifications(epoch uint64) (types.NotificationsPerUserId, err
return notificationsByUserID, nil
}
+func collectGroupEfficiencyNotifications(notificationsByUserID types.NotificationsPerUserId, epoch uint64) error {
+ subMap, err := GetSubsForEventFilter("group_efficiency", "", nil, nil)
+ if err != nil {
+ return fmt.Errorf("error getting subscriptions for (missed) block proposals %w", err)
+ }
+
+ // create a lookup map for the dashboard & groups
+ type groupDetails struct {
+ Validators []types.ValidatorIndex
+ Subscription *types.Subscription
+ }
+ dashboardMap := make(map[types.UserId]map[types.DashboardId]map[types.DashboardGroupId]*groupDetails)
+
+ for _, subs := range subMap {
+ for _, sub := range subs {
+ if sub.DashboardId == nil || sub.DashboardGroupId == nil {
+ continue
+ }
+ userId := *sub.UserID
+ dashboardId := types.DashboardId(*sub.DashboardId)
+ groupId := types.DashboardGroupId(*sub.DashboardGroupId)
+ if _, ok := dashboardMap[userId]; !ok {
+ dashboardMap[userId] = make(map[types.DashboardId]map[types.DashboardGroupId]*groupDetails)
+ }
+ if _, ok := dashboardMap[userId][dashboardId]; !ok {
+ dashboardMap[userId][dashboardId] = make(map[types.DashboardGroupId]*groupDetails)
+ }
+ if _, ok := dashboardMap[userId][dashboardId][groupId]; !ok {
+ dashboardMap[userId][dashboardId][groupId] = &groupDetails{
+ Validators: []types.ValidatorIndex{},
+ }
+ }
+ if sub.EventFilter != "" {
+ pubkeyDecoded, err := hex.DecodeString(sub.EventFilter)
+ if err != nil {
+ return fmt.Errorf("error decoding pubkey %v: %w", sub.EventFilter, err)
+ }
+ validatorIndex, err := GetIndexForPubkey(pubkeyDecoded)
+ if err != nil {
+ return fmt.Errorf("error getting validator index for pubkey %v: %w", sub.EventFilter, err)
+ }
+ dashboardMap[userId][dashboardId][groupId].Validators = append(dashboardMap[*sub.UserID][dashboardId][groupId].Validators, types.ValidatorIndex(validatorIndex))
+ }
+ dashboardMap[userId][dashboardId][groupId].Subscription = sub
+ }
+ }
+
+ type dbResult struct {
+ ValidatorIndex uint64 `db:"validator_index"`
+ AttestationReward decimal.Decimal `db:"attestations_reward"`
+ AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"`
+ BlocksProposed uint64 `db:"blocks_proposed"`
+ BlocksScheduled uint64 `db:"blocks_scheduled"`
+ SyncExecuted uint64 `db:"sync_executed"`
+ SyncScheduled uint64 `db:"sync_scheduled"`
+ }
+
+ var queryResult []*dbResult
+ clickhouseTable := "validator_dashboard_data_epoch"
+ // retrieve efficiency data for the epoch
+ ds := goqu.Dialect("postgres").
+ From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))).
+ Select(
+ goqu.L("validator_index"),
+ goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"),
+ goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"),
+ goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"),
+ goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"),
+ goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"),
+ goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")).
+ Where(goqu.L("r.epoch = ?", epoch))
+ query, args, err := ds.Prepared(true).ToSQL()
+ if err != nil {
+ return fmt.Errorf("error preparing query: %v", err)
+ }
+
+ err = db.ClickHouseReader.Select(&queryResult, query, args...)
+ if err != nil {
+ return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
+ }
+
+ efficiencyMap := make(map[types.ValidatorIndex]*dbResult)
+ for _, row := range queryResult {
+ efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row
+ }
+
+ for userId, dashboards := range dashboardMap {
+ for dashboardId, groups := range dashboards {
+ for groupId, groupDetails := range groups {
+
+ attestationReward := decimal.Decimal{}
+ attestationIdealReward := decimal.Decimal{}
+ blocksProposed := uint64(0)
+ blocksScheduled := uint64(0)
+ syncExecuted := uint64(0)
+ syncScheduled := uint64(0)
+
+ for _, validatorIndex := range groupDetails.Validators {
+ if row, ok := efficiencyMap[validatorIndex]; ok {
+ attestationReward = attestationReward.Add(row.AttestationReward)
+ attestationIdealReward = attestationIdealReward.Add(row.AttestationIdealReward)
+ blocksProposed += row.BlocksProposed
+ blocksScheduled += row.BlocksScheduled
+ syncExecuted += row.SyncExecuted
+ syncScheduled += row.SyncScheduled
+ }
+ }
+
+ var attestationEfficiency, proposerEfficiency, syncEfficiency sql.NullFloat64
+
+ if !attestationIdealReward.IsZero() {
+ attestationEfficiency.Float64 = attestationReward.Div(attestationIdealReward).InexactFloat64()
+ attestationEfficiency.Valid = true
+ }
+ if blocksScheduled > 0 {
+ proposerEfficiency.Float64 = float64(blocksProposed) / float64(blocksScheduled)
+ proposerEfficiency.Valid = true
+ }
+ if syncScheduled > 0 {
+ syncEfficiency.Float64 = float64(syncExecuted) / float64(syncScheduled)
+ syncEfficiency.Valid = true
+ }
+
+ efficiency := utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
+
+ if efficiency < groupDetails.Subscription.EventThreshold {
+ log.Infof("creating group efficiency notification for user %v, dashboard %v, group %v in epoch %v", userId, dashboardId, groupId, epoch)
+ n := &ValidatorGroupEfficiencyNotification{
+ NotificationBaseImpl: types.NotificationBaseImpl{
+ SubscriptionID: *groupDetails.Subscription.ID,
+ UserID: *groupDetails.Subscription.UserID,
+ Epoch: epoch,
+ EventName: groupDetails.Subscription.EventName,
+ EventFilter: "-",
+ DashboardId: groupDetails.Subscription.DashboardId,
+ DashboardName: groupDetails.Subscription.DashboardName,
+ DashboardGroupId: groupDetails.Subscription.DashboardGroupId,
+ DashboardGroupName: groupDetails.Subscription.DashboardGroupName,
+ },
+ Threshold: groupDetails.Subscription.EventThreshold,
+ Efficiency: efficiency,
+ }
+ notificationsByUserID.AddNotification(n)
+ metrics.NotificationsCollected.WithLabelValues(string(n.GetEventName())).Inc()
+ }
+ }
+ }
+ }
+
+ return nil
+}
func collectBlockProposalNotifications(notificationsByUserID types.NotificationsPerUserId, status uint64, eventName types.EventName, epoch uint64) error {
type dbResult struct {
Proposer uint64 `db:"proposer"`
diff --git a/backend/pkg/notification/types.go b/backend/pkg/notification/types.go
index b3dcc8377..baed697c2 100644
--- a/backend/pkg/notification/types.go
+++ b/backend/pkg/notification/types.go
@@ -230,55 +230,35 @@ func (n *ValidatorIsOnlineNotification) GetLegacyTitle() string {
return "Validator Back Online"
}
-// type validatorGroupIsOfflineNotification struct {
-// types.NotificationBaseImpl
-
-// IsOffline bool
-// }
-
-// func (n *validatorGroupIsOfflineNotification) GetEntitiyId() string {
-// return fmt.Sprintf("%s - %s", n.GetDashboardName(), n.GetDashboardGroupName())
-// }
-
-// // Overwrite specific methods
-// func (n *validatorGroupIsOfflineNotification) GetInfo(format types.NotificationFormat) string {
-// epoch := ""
-// if n.IsOffline {
-// epoch = formatEpochLink(format, n.LatestState)
-// } else {
-// epoch = formatEpochLink(format, n.Epoch)
-// }
-
-// if n.IsOffline {
-// return fmt.Sprintf(`Group %s is offline since epoch %s.`, n.DashboardGroupName, epoch)
-// } else {
-// return fmt.Sprintf(`Group %s is back online since epoch %v.`, n.DashboardGroupName, epoch)
-// }
-// }
-
-// func (n *validatorGroupIsOfflineNotification) GetTitle() string {
-// if n.IsOffline {
-// return "Group is offline"
-// } else {
-// return "Group is back online"
-// }
-// }
-
-// func (n *validatorGroupIsOfflineNotification) GetLegacyInfo() string {
-// if n.IsOffline {
-// return fmt.Sprintf(`Group %s is offline since epoch %s.`, n.DashboardGroupName, n.LatestState)
-// } else {
-// return fmt.Sprintf(`Group %s is back online since epoch %v.`, n.DashboardGroupName, n.Epoch)
-// }
-// }
-
-// func (n *validatorGroupIsOfflineNotification) GetLegacyTitle() string {
-// if n.IsOffline {
-// return "Group is offline"
-// } else {
-// return "Group is back online"
-// }
-// }
+type ValidatorGroupEfficiencyNotification struct {
+ types.NotificationBaseImpl
+
+ Threshold float64
+ Efficiency float64
+}
+
+func (n *ValidatorGroupEfficiencyNotification) GetEntitiyId() string {
+ return fmt.Sprintf("%s - %s", n.GetDashboardName(), n.GetDashboardGroupName())
+}
+
+// Overwrite specific methods
+func (n *ValidatorGroupEfficiencyNotification) GetInfo(format types.NotificationFormat) string {
+ dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ epoch := formatEpochLink(format, n.Epoch)
+ return fmt.Sprintf(`%s%s efficiency of %.2f is below the threhold of %.2f in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold epoch)
+}
+
+func (n *ValidatorGroupEfficiencyNotification) GetTitle() string {
+ return "Low group efficiency"
+}
+
+func (n *ValidatorGroupEfficiencyNotification) GetLegacyInfo() string {
+ return n.GetInfo(types.NotifciationFormatText)
+}
+
+func (n *ValidatorGroupEfficiencyNotification) GetLegacyTitle() string {
+ return n.GetTitle()
+}
type ValidatorAttestationNotification struct {
types.NotificationBaseImpl
From 691035595647e550ea8b2ca26b9aeb355814bb0e Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 10:30:22 +0000
Subject: [PATCH 2/8] feat(notifications): add clickhouse db connection
---
backend/cmd/notification_collector/main.go | 27 ++++++++
backend/pkg/notification/collection.go | 73 +++++++++++-----------
backend/pkg/notification/types.go | 2 +-
3 files changed, 66 insertions(+), 36 deletions(-)
diff --git a/backend/cmd/notification_collector/main.go b/backend/cmd/notification_collector/main.go
index d6556eaa7..feeecf98b 100644
--- a/backend/cmd/notification_collector/main.go
+++ b/backend/cmd/notification_collector/main.go
@@ -150,6 +150,31 @@ func Run() {
}, "pgx", "postgres")
}()
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ // clickhouse
+ db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{
+ Username: cfg.ClickHouse.WriterDatabase.Username,
+ Password: cfg.ClickHouse.WriterDatabase.Password,
+ Name: cfg.ClickHouse.WriterDatabase.Name,
+ Host: cfg.ClickHouse.WriterDatabase.Host,
+ Port: cfg.ClickHouse.WriterDatabase.Port,
+ MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns,
+ SSL: true,
+ MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns,
+ }, &types.DatabaseConfig{
+ Username: cfg.ClickHouse.ReaderDatabase.Username,
+ Password: cfg.ClickHouse.ReaderDatabase.Password,
+ Name: cfg.ClickHouse.ReaderDatabase.Name,
+ Host: cfg.ClickHouse.ReaderDatabase.Host,
+ Port: cfg.ClickHouse.ReaderDatabase.Port,
+ MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
+ SSL: true,
+ MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
+ }, "clickhouse", "clickhouse")
+ }()
+
wg.Add(1)
go func() {
defer wg.Done()
@@ -184,6 +209,8 @@ func Run() {
defer db.FrontendWriterDB.Close()
defer db.AlloyReader.Close()
defer db.AlloyWriter.Close()
+ defer db.ClickHouseReader.Close()
+ defer db.ClickHouseWriter.Close()
defer db.BigtableClient.Close()
log.Infof("database connection established")
diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go
index 5bbb3322a..145b45a1b 100644
--- a/backend/pkg/notification/collection.go
+++ b/backend/pkg/notification/collection.go
@@ -470,6 +470,44 @@ func collectUserDbNotifications(epoch uint64) (types.NotificationsPerUserId, err
}
func collectGroupEfficiencyNotifications(notificationsByUserID types.NotificationsPerUserId, epoch uint64) error {
+ type dbResult struct {
+ ValidatorIndex uint64 `db:"validator_index"`
+ AttestationReward decimal.Decimal `db:"attestations_reward"`
+ AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"`
+ BlocksProposed uint64 `db:"blocks_proposed"`
+ BlocksScheduled uint64 `db:"blocks_scheduled"`
+ SyncExecuted uint64 `db:"sync_executed"`
+ SyncScheduled uint64 `db:"sync_scheduled"`
+ }
+
+ var queryResult []*dbResult
+ clickhouseTable := "validator_dashboard_data_epoch"
+ // retrieve efficiency data for the epoch
+ ds := goqu.Dialect("postgres").
+ From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))).
+ Select(
+ goqu.L("validator_index"),
+ goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"),
+ goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"),
+ goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"),
+ goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"),
+ goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"),
+ goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")).
+ Where(goqu.L("r.epoch = ?", epoch))
+ query, args, err := ds.Prepared(true).ToSQL()
+ if err != nil {
+ return fmt.Errorf("error preparing query: %v", err)
+ }
+
+ err = db.ClickHouseReader.Select(&queryResult, query, args...)
+ if err != nil {
+ return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
+ }
+
+ if len(queryResult) == 0 {
+ return fmt.Errorf("no efficiency data found for epoch %v", epoch)
+ }
+
subMap, err := GetSubsForEventFilter("group_efficiency", "", nil, nil)
if err != nil {
return fmt.Errorf("error getting subscriptions for (missed) block proposals %w", err)
@@ -516,40 +554,6 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
}
}
- type dbResult struct {
- ValidatorIndex uint64 `db:"validator_index"`
- AttestationReward decimal.Decimal `db:"attestations_reward"`
- AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"`
- BlocksProposed uint64 `db:"blocks_proposed"`
- BlocksScheduled uint64 `db:"blocks_scheduled"`
- SyncExecuted uint64 `db:"sync_executed"`
- SyncScheduled uint64 `db:"sync_scheduled"`
- }
-
- var queryResult []*dbResult
- clickhouseTable := "validator_dashboard_data_epoch"
- // retrieve efficiency data for the epoch
- ds := goqu.Dialect("postgres").
- From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))).
- Select(
- goqu.L("validator_index"),
- goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"),
- goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"),
- goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"),
- goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"),
- goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"),
- goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")).
- Where(goqu.L("r.epoch = ?", epoch))
- query, args, err := ds.Prepared(true).ToSQL()
- if err != nil {
- return fmt.Errorf("error preparing query: %v", err)
- }
-
- err = db.ClickHouseReader.Select(&queryResult, query, args...)
- if err != nil {
- return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
- }
-
efficiencyMap := make(map[types.ValidatorIndex]*dbResult)
for _, row := range queryResult {
efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row
@@ -558,7 +562,6 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
for userId, dashboards := range dashboardMap {
for dashboardId, groups := range dashboards {
for groupId, groupDetails := range groups {
-
attestationReward := decimal.Decimal{}
attestationIdealReward := decimal.Decimal{}
blocksProposed := uint64(0)
diff --git a/backend/pkg/notification/types.go b/backend/pkg/notification/types.go
index baed697c2..d7b146649 100644
--- a/backend/pkg/notification/types.go
+++ b/backend/pkg/notification/types.go
@@ -245,7 +245,7 @@ func (n *ValidatorGroupEfficiencyNotification) GetEntitiyId() string {
func (n *ValidatorGroupEfficiencyNotification) GetInfo(format types.NotificationFormat) string {
dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
epoch := formatEpochLink(format, n.Epoch)
- return fmt.Sprintf(`%s%s efficiency of %.2f is below the threhold of %.2f in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold epoch)
+ return fmt.Sprintf(`%s%s efficiency of %.2f is below the threhold of %.2f in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold, epoch)
}
func (n *ValidatorGroupEfficiencyNotification) GetTitle() string {
From b2a1c718f1eccdad8aad46ef4e9a3ba40ca91659 Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 10:34:40 +0000
Subject: [PATCH 3/8] feat(notifications): add efficiency threshold types
---
backend/pkg/api/data_access/notifications.go | 30 +++++++++-----------
backend/pkg/api/data_access/vdb_summary.go | 2 +-
backend/pkg/commons/types/frontend.go | 9 ++++--
backend/pkg/notification/collection.go | 2 +-
4 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/backend/pkg/api/data_access/notifications.go b/backend/pkg/api/data_access/notifications.go
index 01d54b860..f37948ac0 100644
--- a/backend/pkg/api/data_access/notifications.go
+++ b/backend/pkg/api/data_access/notifications.go
@@ -568,19 +568,19 @@ func (d *DataAccessService) GetValidatorDashboardNotificationDetails(ctx context
continue
}
notificationDetails.ValidatorBackOnline = append(notificationDetails.ValidatorBackOnline, t.NotificationEventValidatorBackOnline{Index: curNotification.ValidatorIndex, EpochCount: curNotification.Epoch})
- case types.ValidatorGroupIsOfflineEventName:
- // TODO type / collection not present yet, skipping
- /*curNotification, ok := not.(*notification.validatorGroupIsOfflineNotification)
- if !ok {
- return nil, fmt.Errorf("failed to cast notification to validatorGroupIsOfflineNotification")
- }
- if curNotification.Status == 0 {
- notificationDetails.GroupOffline = ...
- notificationDetails.GroupOfflineReminder = ...
- } else {
- notificationDetails.GroupBackOnline = ...
- }
- */
+ // case types.ValidatorGroupIsOfflineEventName:
+ // TODO type / collection not present yet, skipping
+ /*curNotification, ok := not.(*notification.validatorGroupIsOfflineNotification)
+ if !ok {
+ return nil, fmt.Errorf("failed to cast notification to validatorGroupIsOfflineNotification")
+ }
+ if curNotification.Status == 0 {
+ notificationDetails.GroupOffline = ...
+ notificationDetails.GroupOfflineReminder = ...
+ } else {
+ notificationDetails.GroupBackOnline = ...
+ }
+ */
case types.ValidatorReceivedWithdrawalEventName:
curNotification, ok := notification.(*n.ValidatorWithdrawalNotification)
if !ok {
@@ -1878,9 +1878,6 @@ func (d *DataAccessService) GetNotificationSettingsDashboards(ctx context.Contex
switch eventName {
case types.ValidatorIsOfflineEventName:
settings.IsValidatorOfflineSubscribed = true
- case types.GroupIsOfflineEventName:
- settings.IsGroupOfflineSubscribed = true
- settings.GroupOfflineThreshold = event.Threshold
case types.ValidatorMissedAttestationEventName:
settings.IsAttestationsMissedSubscribed = true
case types.ValidatorMissedProposalEventName, types.ValidatorExecutedProposalEventName:
@@ -2110,7 +2107,6 @@ func (d *DataAccessService) UpdateNotificationSettingsValidatorDashboard(ctx con
eventFilter := fmt.Sprintf("%s:%d:%d", ValidatorDashboardEventPrefix, dashboardId, groupId)
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsValidatorOfflineSubscribed, userId, types.ValidatorIsOfflineEventName, networkName, eventFilter, epoch, 0)
- d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsGroupOfflineSubscribed, userId, types.GroupIsOfflineEventName, networkName, eventFilter, epoch, settings.GroupOfflineThreshold)
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsAttestationsMissedSubscribed, userId, types.ValidatorMissedAttestationEventName, networkName, eventFilter, epoch, 0)
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsUpcomingBlockProposalSubscribed, userId, types.ValidatorUpcomingProposalEventName, networkName, eventFilter, epoch, 0)
d.AddOrRemoveEvent(&eventsToInsert, &eventsToDelete, settings.IsSyncSubscribed, userId, types.SyncCommitteeSoon, networkName, eventFilter, epoch, 0)
diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go
index 0c83b0ddc..f4216ec7b 100644
--- a/backend/pkg/api/data_access/vdb_summary.go
+++ b/backend/pkg/api/data_access/vdb_summary.go
@@ -1021,7 +1021,7 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex
if err != nil {
return nil, err
}
- averageNetworkEfficiency := d.calculateTotalEfficiency(
+ averageNetworkEfficiency := utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[enums.Last24h], efficiency.ProposalEfficiency[enums.Last24h], efficiency.SyncEfficiency[enums.Last24h])
for ts := range tsMap {
diff --git a/backend/pkg/commons/types/frontend.go b/backend/pkg/commons/types/frontend.go
index b45f1b06d..9bb2b4e8a 100644
--- a/backend/pkg/commons/types/frontend.go
+++ b/backend/pkg/commons/types/frontend.go
@@ -68,8 +68,8 @@ const (
ValidatorMissedProposalEventName EventName = "validator_proposal_missed"
ValidatorExecutedProposalEventName EventName = "validator_proposal_submitted"
- ValidatorDidSlashEventName EventName = "validator_did_slash"
- ValidatorGroupIsOfflineEventName EventName = "validator_group_is_offline"
+ ValidatorDidSlashEventName EventName = "validator_did_slash"
+ ValidatorGroupEfficiencyEventName EventName = "validator_group_efficiency"
ValidatorReceivedDepositEventName EventName = "validator_received_deposit"
NetworkSlashingEventName EventName = "network_slashing"
@@ -86,7 +86,6 @@ const (
// Validator dashboard events
ValidatorIsOfflineEventName EventName = "validator_is_offline"
ValidatorIsOnlineEventName EventName = "validator_is_online"
- GroupIsOfflineEventName EventName = "group_is_offline"
ValidatorMissedAttestationEventName EventName = "validator_attestation_missed"
ValidatorProposalEventName EventName = "validator_proposal"
ValidatorUpcomingProposalEventName EventName = "validator_proposal_upcoming"
@@ -132,6 +131,7 @@ var EventSortOrder = []EventName{
SyncCommitteeSoonEventName,
ValidatorIsOfflineEventName,
ValidatorIsOnlineEventName,
+ ValidatorGroupEfficiencyEventName,
ValidatorReceivedWithdrawalEventName,
NetworkLivenessIncreasedEventName,
EthClientUpdateEventName,
@@ -175,6 +175,7 @@ var MachineEventsMap = map[EventName]struct{}{
var LegacyEventLabel map[EventName]string = map[EventName]string{
ValidatorUpcomingProposalEventName: "Your validator(s) will soon propose a block",
+ ValidatorGroupEfficiencyEventName: "Your validator group efficiency is low",
ValidatorMissedProposalEventName: "Your validator(s) missed a proposal",
ValidatorExecutedProposalEventName: "Your validator(s) submitted a proposal",
ValidatorMissedAttestationEventName: "Your validator(s) missed an attestation",
@@ -199,6 +200,7 @@ var LegacyEventLabel map[EventName]string = map[EventName]string{
var EventLabel map[EventName]string = map[EventName]string{
ValidatorUpcomingProposalEventName: "Upcoming block proposal",
+ ValidatorGroupEfficiencyEventName: "Low validator group efficiency",
ValidatorMissedProposalEventName: "Block proposal missed",
ValidatorExecutedProposalEventName: "Block proposal submitted",
ValidatorMissedAttestationEventName: "Attestation missed",
@@ -233,6 +235,7 @@ func IsMachineNotification(event EventName) bool {
var EventNames = []EventName{
ValidatorExecutedProposalEventName,
+ ValidatorGroupEfficiencyEventName,
ValidatorMissedProposalEventName,
ValidatorMissedAttestationEventName,
ValidatorGotSlashedEventName,
diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go
index 145b45a1b..d5f4b3b8a 100644
--- a/backend/pkg/notification/collection.go
+++ b/backend/pkg/notification/collection.go
@@ -508,7 +508,7 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
return fmt.Errorf("no efficiency data found for epoch %v", epoch)
}
- subMap, err := GetSubsForEventFilter("group_efficiency", "", nil, nil)
+ subMap, err := GetSubsForEventFilter(types.ValidatorGroupEfficiencyEventName, "", nil, nil)
if err != nil {
return fmt.Errorf("error getting subscriptions for (missed) block proposals %w", err)
}
From bb1ba290ae7551fd98b78943c02d3e59654fa035 Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 10:35:33 +0000
Subject: [PATCH 4/8] fix(notifications): resolve compiler errors
---
backend/pkg/api/data_access/mobile.go | 4 ++--
backend/pkg/api/data_access/vdb_management.go | 2 +-
2 files changed, 3 insertions(+), 3 deletions(-)
diff --git a/backend/pkg/api/data_access/mobile.go b/backend/pkg/api/data_access/mobile.go
index 4d6165b20..dc526746b 100644
--- a/backend/pkg/api/data_access/mobile.go
+++ b/backend/pkg/api/data_access/mobile.go
@@ -171,7 +171,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex
if err != nil {
return nil, fmt.Errorf("error retrieving validator dashboard overview data: %w", err)
}
- data.NetworkEfficiency = d.calculateTotalEfficiency(
+ data.NetworkEfficiency = utils.CalculateTotalEfficiency(
efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime])
// Validator status
@@ -327,7 +327,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex
syncEfficiency.Float64 = float64(queryResult.SyncExecuted) / float64(queryResult.SyncScheduled)
syncEfficiency.Valid = true
}
- *efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
+ *efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
return nil
})
diff --git a/backend/pkg/api/data_access/vdb_management.go b/backend/pkg/api/data_access/vdb_management.go
index cbf0a4b64..d152d6cac 100644
--- a/backend/pkg/api/data_access/vdb_management.go
+++ b/backend/pkg/api/data_access/vdb_management.go
@@ -520,7 +520,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d
syncEfficiency.Float64 = float64(queryResult.SyncExecuted) / float64(queryResult.SyncScheduled)
syncEfficiency.Valid = true
}
- *efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
+ *efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
return nil
})
From daa4c3edf032dad1e44ad537c70c0a32e44b7bcb Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 14:06:45 +0000
Subject: [PATCH 5/8] feat(notification): implement group efficiency
notification collection
---
backend/cmd/misc/main.go | 42 ++---
backend/pkg/api/data_access/notifications.go | 1 +
backend/pkg/notification/collection.go | 176 +++++++++++++++----
backend/pkg/notification/notifications.go | 10 +-
backend/pkg/notification/pubkey_cache.go | 2 +-
backend/pkg/notification/types.go | 35 ++--
6 files changed, 195 insertions(+), 71 deletions(-)
diff --git a/backend/cmd/misc/main.go b/backend/cmd/misc/main.go
index 5555c2e7e..c567a0371 100644
--- a/backend/cmd/misc/main.go
+++ b/backend/cmd/misc/main.go
@@ -247,27 +247,27 @@ func Run() {
}
// clickhouse
- // db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{
- // Username: cfg.ClickHouse.WriterDatabase.Username,
- // Password: cfg.ClickHouse.WriterDatabase.Password,
- // Name: cfg.ClickHouse.WriterDatabase.Name,
- // Host: cfg.ClickHouse.WriterDatabase.Host,
- // Port: cfg.ClickHouse.WriterDatabase.Port,
- // MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns,
- // SSL: true,
- // MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns,
- // }, &types.DatabaseConfig{
- // Username: cfg.ClickHouse.ReaderDatabase.Username,
- // Password: cfg.ClickHouse.ReaderDatabase.Password,
- // Name: cfg.ClickHouse.ReaderDatabase.Name,
- // Host: cfg.ClickHouse.ReaderDatabase.Host,
- // Port: cfg.ClickHouse.ReaderDatabase.Port,
- // MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
- // SSL: true,
- // MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
- // }, "clickhouse", "clickhouse")
- // defer db.ClickHouseReader.Close()
- // defer db.ClickHouseWriter.Close()
+ db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{
+ Username: cfg.ClickHouse.WriterDatabase.Username,
+ Password: cfg.ClickHouse.WriterDatabase.Password,
+ Name: cfg.ClickHouse.WriterDatabase.Name,
+ Host: cfg.ClickHouse.WriterDatabase.Host,
+ Port: cfg.ClickHouse.WriterDatabase.Port,
+ MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns,
+ SSL: true,
+ MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns,
+ }, &types.DatabaseConfig{
+ Username: cfg.ClickHouse.ReaderDatabase.Username,
+ Password: cfg.ClickHouse.ReaderDatabase.Password,
+ Name: cfg.ClickHouse.ReaderDatabase.Name,
+ Host: cfg.ClickHouse.ReaderDatabase.Host,
+ Port: cfg.ClickHouse.ReaderDatabase.Port,
+ MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns,
+ SSL: true,
+ MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns,
+ }, "clickhouse", "clickhouse")
+ defer db.ClickHouseReader.Close()
+ defer db.ClickHouseWriter.Close()
// Initialize the persistent redis client
if requires.Redis {
diff --git a/backend/pkg/api/data_access/notifications.go b/backend/pkg/api/data_access/notifications.go
index f37948ac0..21f3e0ebb 100644
--- a/backend/pkg/api/data_access/notifications.go
+++ b/backend/pkg/api/data_access/notifications.go
@@ -63,6 +63,7 @@ func (*DataAccessService) registerNotificationInterfaceTypes() {
once.Do(func() {
gob.Register(&n.ValidatorProposalNotification{})
gob.Register(&n.ValidatorUpcomingProposalNotification{})
+ gob.Register(&n.ValidatorGroupEfficiencyNotification{})
gob.Register(&n.ValidatorAttestationNotification{})
gob.Register(&n.ValidatorIsOfflineNotification{})
gob.Register(&n.ValidatorIsOnlineNotification{})
diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go
index d5f4b3b8a..3093e9bcb 100644
--- a/backend/pkg/notification/collection.go
+++ b/backend/pkg/notification/collection.go
@@ -12,7 +12,6 @@ import (
"time"
gcp_bigtable "cloud.google.com/go/bigtable"
- "github.com/doug-martin/goqu/v9"
"github.com/ethereum/go-ethereum/common"
"github.com/gobitfly/beaconchain/pkg/commons/cache"
"github.com/gobitfly/beaconchain/pkg/commons/db"
@@ -22,6 +21,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
+ cTypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/lib/pq"
"github.com/rocket-pool/rocketpool-go/utils/eth"
@@ -49,6 +49,7 @@ func notificationCollector() {
once.Do(func() {
gob.Register(&ValidatorProposalNotification{})
gob.Register(&ValidatorUpcomingProposalNotification{})
+ gob.Register(&ValidatorGroupEfficiencyNotification{})
gob.Register(&ValidatorAttestationNotification{})
gob.Register(&ValidatorIsOfflineNotification{})
gob.Register(&ValidatorIsOnlineNotification{})
@@ -62,13 +63,13 @@ func notificationCollector() {
gob.Register(&SyncCommitteeSoonNotification{})
})
+ mc, err := modules.GetModuleContext()
+ if err != nil {
+ log.Fatal(err, "error getting module context", 0)
+ }
+
go func() {
log.Infof("starting head notification collector")
- mc, err := modules.GetModuleContext()
- if err != nil {
- log.Fatal(err, "error getting module context", 0)
- }
-
for ; ; time.Sleep(time.Second * 30) {
// get the head epoch
head, err := mc.ConsClient.GetChainHead()
@@ -160,7 +161,7 @@ func notificationCollector() {
log.Infof("collecting notifications for epoch %v", epoch)
// Network DB Notifications (network related)
- notifications, err := collectNotifications(epoch)
+ notifications, err := collectNotifications(epoch, mc)
if err != nil {
log.Error(err, "error collection notifications", 0)
@@ -279,7 +280,7 @@ func collectUpcomingBlockProposalNotifications(notificationsByUserID types.Notif
return nil
}
-func collectNotifications(epoch uint64) (types.NotificationsPerUserId, error) {
+func collectNotifications(epoch uint64, mc modules.ModuleContext) (types.NotificationsPerUserId, error) {
notificationsByUserID := types.NotificationsPerUserId{}
start := time.Now()
var err error
@@ -310,12 +311,12 @@ func collectNotifications(epoch uint64) (types.NotificationsPerUserId, error) {
// The following functions will collect the notifications and add them to the
// notificationsByUserID map. The notifications will be queued and sent later
// by the notification sender process
- err = collectGroupEfficiencyNotifications(notificationsByUserID, epoch)
+ err = collectGroupEfficiencyNotifications(notificationsByUserID, epoch, mc)
if err != nil {
metrics.Errors.WithLabelValues("notifications_collect_group_efficiency").Inc()
return nil, fmt.Errorf("error collecting validator_group_efficiency notifications: %v", err)
}
- log.Infof("collecting attestation & offline notifications took: %v", time.Since(start))
+ log.Infof("collecting group efficiency notifications took: %v", time.Since(start))
err = collectAttestationAndOfflineValidatorNotifications(notificationsByUserID, epoch)
if err != nil {
@@ -469,7 +470,7 @@ func collectUserDbNotifications(epoch uint64) (types.NotificationsPerUserId, err
return notificationsByUserID, nil
}
-func collectGroupEfficiencyNotifications(notificationsByUserID types.NotificationsPerUserId, epoch uint64) error {
+func collectGroupEfficiencyNotifications(notificationsByUserID types.NotificationsPerUserId, epoch uint64, mc modules.ModuleContext) error {
type dbResult struct {
ValidatorIndex uint64 `db:"validator_index"`
AttestationReward decimal.Decimal `db:"attestations_reward"`
@@ -480,32 +481,67 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
SyncScheduled uint64 `db:"sync_scheduled"`
}
- var queryResult []*dbResult
- clickhouseTable := "validator_dashboard_data_epoch"
- // retrieve efficiency data for the epoch
- ds := goqu.Dialect("postgres").
- From(goqu.L(fmt.Sprintf(`%s AS r FINAL`, clickhouseTable))).
- Select(
- goqu.L("validator_index"),
- goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"),
- goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"),
- goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"),
- goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"),
- goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"),
- goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")).
- Where(goqu.L("r.epoch = ?", epoch))
- query, args, err := ds.Prepared(true).ToSQL()
+ // retrieve rewards for the epoch
+ log.Info("retrieving validator metadata")
+ validators, err := mc.CL.GetValidators(epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch, nil, []cTypes.ValidatorStatus{cTypes.Active})
+ if err != nil {
+ return fmt.Errorf("error getting validators: %w", err)
+ }
+ effectiveBalanceMap := make(map[uint64]uint64)
+ activeValidatorsMap := make(map[uint64]struct{})
+ for _, validator := range validators.Data {
+ effectiveBalanceMap[validator.Index] = validator.Validator.EffectiveBalance
+ activeValidatorsMap[validator.Index] = struct{}{}
+ }
+ log.Info("retrieving attestation reward data")
+ attestationRewards, err := mc.CL.GetAttestationRewards(epoch)
if err != nil {
- return fmt.Errorf("error preparing query: %v", err)
+ return fmt.Errorf("error getting attestation rewards: %w", err)
+ }
+
+ efficiencyMap := make(map[types.ValidatorIndex]*dbResult, len(attestationRewards.Data.TotalRewards))
+
+ idealRewardsMap := make(map[uint64]decimal.Decimal)
+ for _, reward := range attestationRewards.Data.IdealRewards {
+ idealRewardsMap[uint64(reward.EffectiveBalance)] = decimal.NewFromInt(int64(reward.Head) + int64(reward.Target) + int64(reward.Source) + int64(reward.InclusionDelay) + int64(reward.Inactivity))
+ }
+ for _, reward := range attestationRewards.Data.TotalRewards {
+ efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)] = &dbResult{
+ ValidatorIndex: reward.ValidatorIndex,
+ AttestationReward: decimal.NewFromInt(int64(reward.Head) + int64(reward.Target) + int64(reward.Source) + int64(reward.InclusionDelay) + int64(reward.Inactivity)),
+ AttestationIdealReward: idealRewardsMap[effectiveBalanceMap[reward.ValidatorIndex]],
+ }
}
- err = db.ClickHouseReader.Select(&queryResult, query, args...)
+ log.Info("retrieving block proposal data")
+ proposalAssignments, err := mc.CL.GetPropoalAssignments(epoch)
if err != nil {
- return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
+ return fmt.Errorf("error getting proposal assignments: %w", err)
+ }
+ for _, assignment := range proposalAssignments.Data {
+ efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)].BlocksScheduled++
}
- if len(queryResult) == 0 {
- return fmt.Errorf("no efficiency data found for epoch %v", epoch)
+ for slot := epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch; slot < (epoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch; slot++ {
+ header, err := mc.CL.GetBlockHeader(slot)
+ log.Infof("retrieving data for slot %v", slot)
+ if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
+ continue
+ } else if err != nil {
+ return fmt.Errorf("error getting block header for slot %v: %w", slot, err)
+ }
+ efficiencyMap[types.ValidatorIndex(header.Data.Header.Message.ProposerIndex)].BlocksProposed++
+
+ syncRewards, err := mc.CL.GetSyncRewards(slot)
+ if err != nil {
+ return fmt.Errorf("error getting sync rewards for slot %v: %w", slot, err)
+ }
+ for _, reward := range syncRewards.Data {
+ efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)].SyncScheduled++
+ if reward.Reward > 0 {
+ efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)].SyncExecuted++
+ }
+ }
}
subMap, err := GetSubsForEventFilter(types.ValidatorGroupEfficiencyEventName, "", nil, nil)
@@ -554,10 +590,72 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
}
}
- efficiencyMap := make(map[types.ValidatorIndex]*dbResult)
- for _, row := range queryResult {
- efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row
- }
+ // The commented code below can be used to validate data retrieved from the node against
+ // data in clickhouse
+ // var queryResult []*dbResult
+ // clickhouseTable := "validator_dashboard_data_epoch"
+ // // retrieve efficiency data for the epoch
+ // log.Infof("retrieving efficiency data for epoch %v", epoch)
+ // ds := goqu.Dialect("postgres").
+ // From(goqu.L(fmt.Sprintf(`%s AS r`, clickhouseTable))).
+ // Select(
+ // goqu.L("validator_index"),
+ // goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"),
+ // goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"),
+ // goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"),
+ // goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"),
+ // goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"),
+ // goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")).
+ // Where(goqu.L("r.epoch_timestamp = ?", utils.EpochToTime(epoch)))
+ // query, args, err := ds.Prepared(true).ToSQL()
+ // if err != nil {
+ // return fmt.Errorf("error preparing query: %v", err)
+ // }
+
+ // err = db.ClickHouseReader.Select(&queryResult, query, args...)
+ // if err != nil {
+ // return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err)
+ // }
+
+ // if len(queryResult) == 0 {
+ // return fmt.Errorf("no efficiency data found for epoch %v", epoch)
+ // }
+
+ // log.Infof("retrieved %v efficiency data rows", len(queryResult))
+
+ // for _, row := range queryResult {
+ // if _, ok := activeValidatorsMap[row.ValidatorIndex]; !ok {
+ // continue
+ // }
+ // existing := efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)]
+
+ // if existing == nil {
+ // existing = &dbResult{
+ // ValidatorIndex: row.ValidatorIndex,
+ // AttestationReward: decimal.Decimal{},
+ // AttestationIdealReward: decimal.Decimal{},
+ // }
+ // }
+ // if !existing.AttestationIdealReward.Equal(row.AttestationIdealReward) {
+ // log.Fatal(fmt.Errorf("ideal reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationIdealReward, row.AttestationIdealReward), "ideal reward mismatch", 0)
+ // }
+ // if !existing.AttestationReward.Equal(row.AttestationReward) {
+ // log.Fatal(fmt.Errorf("attestation reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationReward, row.AttestationReward), "attestation reward mismatch", 0)
+ // }
+ // if existing.BlocksProposed != row.BlocksProposed {
+ // log.Fatal(fmt.Errorf("blocks proposed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.BlocksProposed, row.BlocksProposed), "blocks proposed mismatch", 0)
+ // }
+ // if existing.BlocksScheduled != row.BlocksScheduled {
+ // log.Fatal(fmt.Errorf("blocks scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.BlocksScheduled, row.BlocksScheduled), "blocks scheduled mismatch", 0)
+ // }
+ // if existing.SyncExecuted != row.SyncExecuted {
+ // log.Fatal(fmt.Errorf("sync executed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncExecuted, row.SyncExecuted), "sync executed mismatch", 0)
+ // }
+ // if existing.SyncScheduled != row.SyncScheduled {
+ // log.Fatal(fmt.Errorf("sync scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncScheduled, row.SyncScheduled), "sync scheduled mismatch", 0)
+ // }
+ // efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row
+ // }
for userId, dashboards := range dashboardMap {
for dashboardId, groups := range dashboards {
@@ -597,7 +695,9 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
efficiency := utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency)
- if efficiency < groupDetails.Subscription.EventThreshold {
+ log.Infof("efficiency: %v, threshold: %v", efficiency, groupDetails.Subscription.EventThreshold*100)
+
+ if efficiency < groupDetails.Subscription.EventThreshold*100 {
log.Infof("creating group efficiency notification for user %v, dashboard %v, group %v in epoch %v", userId, dashboardId, groupId, epoch)
n := &ValidatorGroupEfficiencyNotification{
NotificationBaseImpl: types.NotificationBaseImpl{
@@ -611,7 +711,7 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
DashboardGroupId: groupDetails.Subscription.DashboardGroupId,
DashboardGroupName: groupDetails.Subscription.DashboardGroupName,
},
- Threshold: groupDetails.Subscription.EventThreshold,
+ Threshold: groupDetails.Subscription.EventThreshold * 100,
Efficiency: efficiency,
}
notificationsByUserID.AddNotification(n)
@@ -621,6 +721,8 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
}
}
+ log.Info("done collecting group efficiency notifications")
+
return nil
}
func collectBlockProposalNotifications(notificationsByUserID types.NotificationsPerUserId, status uint64, eventName types.EventName, epoch uint64) error {
diff --git a/backend/pkg/notification/notifications.go b/backend/pkg/notification/notifications.go
index 084bdf4f2..7bb71162a 100644
--- a/backend/pkg/notification/notifications.go
+++ b/backend/pkg/notification/notifications.go
@@ -3,15 +3,21 @@ package notification
import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
+ "github.com/gobitfly/beaconchain/pkg/exporter/modules"
)
// Used for isolated testing
func GetNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.NotificationsPerUserId, error) {
- err := initPubkeyCache(pubkeyCachePath)
+ mc, err := modules.GetModuleContext()
+ if err != nil {
+ log.Fatal(err, "error getting module context", 0)
+ }
+
+ err = initPubkeyCache(pubkeyCachePath)
if err != nil {
log.Fatal(err, "error initializing pubkey cache path for notifications", 0)
}
- return collectNotifications(epoch)
+ return collectNotifications(epoch, mc)
}
// Used for isolated testing
diff --git a/backend/pkg/notification/pubkey_cache.go b/backend/pkg/notification/pubkey_cache.go
index 483884cf5..37f3f5ea2 100644
--- a/backend/pkg/notification/pubkey_cache.go
+++ b/backend/pkg/notification/pubkey_cache.go
@@ -72,7 +72,7 @@ func GetIndexForPubkey(pubkey []byte) (uint64, error) {
if err != nil {
return 0, err
}
- log.Infof("serving index %d for validator %x from db", index, pubkey)
+ // log.Infof("serving index %d for validator %x from db", index, pubkey)
return index, nil
} else if err != nil {
return 0, err
diff --git a/backend/pkg/notification/types.go b/backend/pkg/notification/types.go
index d7b146649..02289ecca 100644
--- a/backend/pkg/notification/types.go
+++ b/backend/pkg/notification/types.go
@@ -51,7 +51,7 @@ func formatSlotLink(format types.NotificationFormat, slot interface{}) string {
return ""
}
-func formatDashboardAndGroupLink(format types.NotificationFormat, n types.Notification) string {
+func formatValidatorPrefixedDashboardAndGroupLink(format types.NotificationFormat, n types.Notification) string {
dashboardAndGroupInfo := ""
if n.GetDashboardId() != nil {
switch format {
@@ -66,6 +66,21 @@ func formatDashboardAndGroupLink(format types.NotificationFormat, n types.Notifi
return dashboardAndGroupInfo
}
+func formatPureDashboardAndGroupLink(format types.NotificationFormat, n types.Notification) string {
+ dashboardAndGroupInfo := ""
+ if n.GetDashboardId() != nil {
+ switch format {
+ case types.NotifciationFormatHtml:
+ dashboardAndGroupInfo = fmt.Sprintf(`Group %[2]v in Dashboard %[3]v`, utils.Config.Frontend.SiteDomain, n.GetDashboardGroupName(), n.GetDashboardName(), *n.GetDashboardId())
+ case types.NotifciationFormatText:
+ dashboardAndGroupInfo = fmt.Sprintf(`Group %[1]v in Dashboard %[2]v`, n.GetDashboardGroupName(), n.GetDashboardName())
+ case types.NotifciationFormatMarkdown:
+ dashboardAndGroupInfo = fmt.Sprintf(`Group **%[1]v** in Dashboard [%[2]v](https://%[3]v/dashboard/%[4]v)`, n.GetDashboardGroupName(), n.GetDashboardName(), utils.Config.Frontend.SiteDomain, *n.GetDashboardId())
+ }
+ }
+ return dashboardAndGroupInfo
+}
+
type ValidatorProposalNotification struct {
types.NotificationBaseImpl
@@ -83,7 +98,7 @@ func (n *ValidatorProposalNotification) GetEntitiyId() string {
func (n *ValidatorProposalNotification) GetInfo(format types.NotificationFormat) string {
vali := formatValidatorLink(format, n.ValidatorIndex)
slot := formatSlotLink(format, n.Slot)
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
switch n.Status {
case 0:
@@ -149,7 +164,7 @@ func (n *ValidatorUpcomingProposalNotification) GetEntitiyId() string {
func (n *ValidatorUpcomingProposalNotification) GetInfo(format types.NotificationFormat) string {
vali := formatValidatorLink(format, n.ValidatorIndex)
slot := formatSlotLink(format, n.Slot)
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
return fmt.Sprintf(`New scheduled block proposal at slot %s for Validator %s%s.`, slot, vali, dashboardAndGroupInfo)
}
@@ -184,7 +199,7 @@ func (n *ValidatorIsOfflineNotification) GetEntitiyId() string {
func (n *ValidatorIsOfflineNotification) GetInfo(format types.NotificationFormat) string {
vali := formatValidatorLink(format, n.ValidatorIndex)
epoch := formatEpochLink(format, n.LatestState)
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
return fmt.Sprintf(`Validator %v%v is offline since epoch %s.`, vali, dashboardAndGroupInfo, epoch)
}
@@ -214,7 +229,7 @@ func (n *ValidatorIsOnlineNotification) GetEntitiyId() string {
func (n *ValidatorIsOnlineNotification) GetInfo(format types.NotificationFormat) string {
vali := formatValidatorLink(format, n.ValidatorIndex)
epoch := formatEpochLink(format, n.Epoch)
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
return fmt.Sprintf(`Validator %v%v is back online since epoch %v.`, vali, dashboardAndGroupInfo, epoch)
}
@@ -243,9 +258,9 @@ func (n *ValidatorGroupEfficiencyNotification) GetEntitiyId() string {
// Overwrite specific methods
func (n *ValidatorGroupEfficiencyNotification) GetInfo(format types.NotificationFormat) string {
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatPureDashboardAndGroupLink(format, n)
epoch := formatEpochLink(format, n.Epoch)
- return fmt.Sprintf(`%s%s efficiency of %.2f is below the threhold of %.2f in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold, epoch)
+ return fmt.Sprintf(`%s efficiency of %.2f%% is below the threhold of %.2f%% in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold, epoch)
}
func (n *ValidatorGroupEfficiencyNotification) GetTitle() string {
@@ -273,7 +288,7 @@ func (n *ValidatorAttestationNotification) GetEntitiyId() string {
}
func (n *ValidatorAttestationNotification) GetInfo(format types.NotificationFormat) string {
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
vali := formatValidatorLink(format, n.ValidatorIndex)
epoch := formatEpochLink(format, n.Epoch)
@@ -347,7 +362,7 @@ func (n *ValidatorGotSlashedNotification) GetEntitiyId() string {
}
func (n *ValidatorGotSlashedNotification) GetInfo(format types.NotificationFormat) string {
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
vali := formatValidatorLink(format, n.ValidatorIndex)
epoch := formatEpochLink(format, n.Epoch)
slasher := formatValidatorLink(format, n.Slasher)
@@ -383,7 +398,7 @@ func (n *ValidatorWithdrawalNotification) GetEntitiyId() string {
}
func (n *ValidatorWithdrawalNotification) GetInfo(format types.NotificationFormat) string {
- dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n)
+ dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n)
vali := formatValidatorLink(format, n.ValidatorIndex)
amount := utils.FormatClCurrencyString(n.Amount, utils.Config.Frontend.MainCurrency, 6, true, false, false)
generalPart := fmt.Sprintf(`An automatic withdrawal of %s has been processed for validator %s%s.`, amount, vali, dashboardAndGroupInfo)
From 2d23a28dcb26e275a84741c33452480ee83f32e5 Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 14:12:32 +0000
Subject: [PATCH 6/8] fix(notifications): correct title for efficiency
notifications
---
backend/pkg/notification/queuing.go | 5 +++++
1 file changed, 5 insertions(+)
diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go
index 90cb78bca..803a618c8 100644
--- a/backend/pkg/notification/queuing.go
+++ b/backend/pkg/notification/queuing.go
@@ -380,6 +380,9 @@ func RenderEmailsForUserEvents(epoch uint64, notificationsByUserID types.Notific
case types.ValidatorExecutedProposalEventName:
//nolint:gosec // this is a static string
bodySummary += template.HTML(fmt.Sprintf("%s: %d validator%s, Reward: %.3f ETH", types.EventLabel[event], count, plural, totalBlockReward))
+ case types.ValidatorGroupEfficiencyEventName:
+ //nolint:gosec // this is a static string
+ bodySummary += template.HTML(fmt.Sprintf("%s: %d Group%s", types.EventLabel[event], count, plural))
default:
//nolint:gosec // this is a static string
bodySummary += template.HTML(fmt.Sprintf("%s: %d Validator%s", types.EventLabel[event], count, plural))
@@ -513,6 +516,8 @@ func RenderPushMessagesForUserEvents(epoch uint64, notificationsByUserID types.N
bodySummary += fmt.Sprintf("%s: %d machine%s", types.EventLabel[event], count, plural)
case types.ValidatorExecutedProposalEventName:
bodySummary += fmt.Sprintf("%s: %d validator%s, Reward: %.3f ETH", types.EventLabel[event], count, plural, totalBlockReward)
+ case types.ValidatorGroupEfficiencyEventName:
+ bodySummary += fmt.Sprintf("%s: %d group%s", types.EventLabel[event], count, plural)
default:
bodySummary += fmt.Sprintf("%s: %d validator%s", types.EventLabel[event], count, plural)
}
From e711eda731d393612f32cf3d8f41b72c757224d2 Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 14:28:12 +0000
Subject: [PATCH 7/8] fix(notifications): improve efficiency message
---
backend/pkg/notification/types.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/backend/pkg/notification/types.go b/backend/pkg/notification/types.go
index 02289ecca..754955cda 100644
--- a/backend/pkg/notification/types.go
+++ b/backend/pkg/notification/types.go
@@ -260,7 +260,7 @@ func (n *ValidatorGroupEfficiencyNotification) GetEntitiyId() string {
func (n *ValidatorGroupEfficiencyNotification) GetInfo(format types.NotificationFormat) string {
dashboardAndGroupInfo := formatPureDashboardAndGroupLink(format, n)
epoch := formatEpochLink(format, n.Epoch)
- return fmt.Sprintf(`%s efficiency of %.2f%% is below the threhold of %.2f%% in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold, epoch)
+ return fmt.Sprintf(`%s efficiency of %.2f%% was below the threhold of %.2f%% in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold, epoch)
}
func (n *ValidatorGroupEfficiencyNotification) GetTitle() string {
From c5f360f220b2d9d8de7ce67c012cb33995e352c0 Mon Sep 17 00:00:00 2001
From: peter <1674920+peterbitfly@users.noreply.github.com>
Date: Tue, 22 Oct 2024 14:46:16 +0000
Subject: [PATCH 8/8] feat(notifications): speed up efficiency notifications
---
backend/pkg/notification/collection.go | 26 ++++++++++++++------------
1 file changed, 14 insertions(+), 12 deletions(-)
diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go
index 3093e9bcb..ba6b0568f 100644
--- a/backend/pkg/notification/collection.go
+++ b/backend/pkg/notification/collection.go
@@ -21,7 +21,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
- cTypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
+ constypes "github.com/gobitfly/beaconchain/pkg/consapi/types"
"github.com/gobitfly/beaconchain/pkg/exporter/modules"
"github.com/lib/pq"
"github.com/rocket-pool/rocketpool-go/utils/eth"
@@ -483,7 +483,7 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
// retrieve rewards for the epoch
log.Info("retrieving validator metadata")
- validators, err := mc.CL.GetValidators(epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch, nil, []cTypes.ValidatorStatus{cTypes.Active})
+ validators, err := mc.CL.GetValidators(epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch, nil, []constypes.ValidatorStatus{constypes.Active})
if err != nil {
return fmt.Errorf("error getting validators: %w", err)
}
@@ -522,24 +522,26 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)].BlocksScheduled++
}
+ syncAssignments, err := mc.CL.GetSyncCommitteesAssignments(nil, epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch)
+ if err != nil {
+ return fmt.Errorf("error getting sync committee assignments: %w", err)
+ }
+
for slot := epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch; slot < (epoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch; slot++ {
- header, err := mc.CL.GetBlockHeader(slot)
log.Infof("retrieving data for slot %v", slot)
+ s, err := mc.CL.GetSlot(slot)
if err != nil && strings.Contains(err.Error(), "NOT_FOUND") {
continue
} else if err != nil {
return fmt.Errorf("error getting block header for slot %v: %w", slot, err)
}
- efficiencyMap[types.ValidatorIndex(header.Data.Header.Message.ProposerIndex)].BlocksProposed++
+ efficiencyMap[types.ValidatorIndex(s.Data.Message.ProposerIndex)].BlocksProposed++
- syncRewards, err := mc.CL.GetSyncRewards(slot)
- if err != nil {
- return fmt.Errorf("error getting sync rewards for slot %v: %w", slot, err)
- }
- for _, reward := range syncRewards.Data {
- efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)].SyncScheduled++
- if reward.Reward > 0 {
- efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)].SyncExecuted++
+ for i, validatorIndex := range syncAssignments.Data.Validators {
+ efficiencyMap[types.ValidatorIndex(validatorIndex)].SyncScheduled++
+
+ if utils.BitAtVector(s.Data.Message.Body.SyncAggregate.SyncCommitteeBits, i) {
+ efficiencyMap[types.ValidatorIndex(validatorIndex)].SyncExecuted++
}
}
}