diff --git a/backend/cmd/misc/main.go b/backend/cmd/misc/main.go index 5555c2e7e..c567a0371 100644 --- a/backend/cmd/misc/main.go +++ b/backend/cmd/misc/main.go @@ -247,27 +247,27 @@ func Run() { } // clickhouse - // db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{ - // Username: cfg.ClickHouse.WriterDatabase.Username, - // Password: cfg.ClickHouse.WriterDatabase.Password, - // Name: cfg.ClickHouse.WriterDatabase.Name, - // Host: cfg.ClickHouse.WriterDatabase.Host, - // Port: cfg.ClickHouse.WriterDatabase.Port, - // MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns, - // SSL: true, - // MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns, - // }, &types.DatabaseConfig{ - // Username: cfg.ClickHouse.ReaderDatabase.Username, - // Password: cfg.ClickHouse.ReaderDatabase.Password, - // Name: cfg.ClickHouse.ReaderDatabase.Name, - // Host: cfg.ClickHouse.ReaderDatabase.Host, - // Port: cfg.ClickHouse.ReaderDatabase.Port, - // MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns, - // SSL: true, - // MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns, - // }, "clickhouse", "clickhouse") - // defer db.ClickHouseReader.Close() - // defer db.ClickHouseWriter.Close() + db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{ + Username: cfg.ClickHouse.WriterDatabase.Username, + Password: cfg.ClickHouse.WriterDatabase.Password, + Name: cfg.ClickHouse.WriterDatabase.Name, + Host: cfg.ClickHouse.WriterDatabase.Host, + Port: cfg.ClickHouse.WriterDatabase.Port, + MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns, + SSL: true, + MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns, + }, &types.DatabaseConfig{ + Username: cfg.ClickHouse.ReaderDatabase.Username, + Password: cfg.ClickHouse.ReaderDatabase.Password, + Name: cfg.ClickHouse.ReaderDatabase.Name, + Host: cfg.ClickHouse.ReaderDatabase.Host, + Port: cfg.ClickHouse.ReaderDatabase.Port, + MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns, + SSL: true, + MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns, + }, "clickhouse", "clickhouse") + defer db.ClickHouseReader.Close() + defer db.ClickHouseWriter.Close() // Initialize the persistent redis client if requires.Redis { diff --git a/backend/cmd/notification_collector/main.go b/backend/cmd/notification_collector/main.go index d6556eaa7..feeecf98b 100644 --- a/backend/cmd/notification_collector/main.go +++ b/backend/cmd/notification_collector/main.go @@ -150,6 +150,31 @@ func Run() { }, "pgx", "postgres") }() + wg.Add(1) + go func() { + defer wg.Done() + // clickhouse + db.ClickHouseWriter, db.ClickHouseReader = db.MustInitDB(&types.DatabaseConfig{ + Username: cfg.ClickHouse.WriterDatabase.Username, + Password: cfg.ClickHouse.WriterDatabase.Password, + Name: cfg.ClickHouse.WriterDatabase.Name, + Host: cfg.ClickHouse.WriterDatabase.Host, + Port: cfg.ClickHouse.WriterDatabase.Port, + MaxOpenConns: cfg.ClickHouse.WriterDatabase.MaxOpenConns, + SSL: true, + MaxIdleConns: cfg.ClickHouse.WriterDatabase.MaxIdleConns, + }, &types.DatabaseConfig{ + Username: cfg.ClickHouse.ReaderDatabase.Username, + Password: cfg.ClickHouse.ReaderDatabase.Password, + Name: cfg.ClickHouse.ReaderDatabase.Name, + Host: cfg.ClickHouse.ReaderDatabase.Host, + Port: cfg.ClickHouse.ReaderDatabase.Port, + MaxOpenConns: cfg.ClickHouse.ReaderDatabase.MaxOpenConns, + SSL: true, + MaxIdleConns: cfg.ClickHouse.ReaderDatabase.MaxIdleConns, + }, "clickhouse", "clickhouse") + }() + wg.Add(1) go func() { defer wg.Done() @@ -184,6 +209,8 @@ func Run() { defer db.FrontendWriterDB.Close() defer db.AlloyReader.Close() defer db.AlloyWriter.Close() + defer db.ClickHouseReader.Close() + defer db.ClickHouseWriter.Close() defer db.BigtableClient.Close() log.Infof("database connection established") diff --git a/backend/pkg/api/data_access/mobile.go b/backend/pkg/api/data_access/mobile.go index 4d6165b20..dc526746b 100644 --- a/backend/pkg/api/data_access/mobile.go +++ b/backend/pkg/api/data_access/mobile.go @@ -171,7 +171,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex if err != nil { return nil, fmt.Errorf("error retrieving validator dashboard overview data: %w", err) } - data.NetworkEfficiency = d.calculateTotalEfficiency( + data.NetworkEfficiency = utils.CalculateTotalEfficiency( efficiency.AttestationEfficiency[enums.AllTime], efficiency.ProposalEfficiency[enums.AllTime], efficiency.SyncEfficiency[enums.AllTime]) // Validator status @@ -327,7 +327,7 @@ func (d *DataAccessService) GetValidatorDashboardMobileWidget(ctx context.Contex syncEfficiency.Float64 = float64(queryResult.SyncExecuted) / float64(queryResult.SyncScheduled) syncEfficiency.Valid = true } - *efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) + *efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) return nil }) diff --git a/backend/pkg/api/data_access/notifications.go b/backend/pkg/api/data_access/notifications.go index d9c9f71fa..74f777c3b 100644 --- a/backend/pkg/api/data_access/notifications.go +++ b/backend/pkg/api/data_access/notifications.go @@ -63,6 +63,7 @@ func (*DataAccessService) registerNotificationInterfaceTypes() { once.Do(func() { gob.Register(&n.ValidatorProposalNotification{}) gob.Register(&n.ValidatorUpcomingProposalNotification{}) + gob.Register(&n.ValidatorGroupEfficiencyNotification{}) gob.Register(&n.ValidatorAttestationNotification{}) gob.Register(&n.ValidatorIsOfflineNotification{}) gob.Register(&n.ValidatorIsOnlineNotification{}) diff --git a/backend/pkg/api/data_access/vdb_helpers.go b/backend/pkg/api/data_access/vdb_helpers.go index 12bfe2c46..30fd72f61 100644 --- a/backend/pkg/api/data_access/vdb_helpers.go +++ b/backend/pkg/api/data_access/vdb_helpers.go @@ -41,28 +41,6 @@ func (d DataAccessService) getDashboardValidators(ctx context.Context, dashboard return dashboardId.Validators, nil } -func (d DataAccessService) calculateTotalEfficiency(attestationEff, proposalEff, syncEff sql.NullFloat64) float64 { - efficiency := float64(0) - - if !attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid { - efficiency = 0 - } else if attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid { - efficiency = attestationEff.Float64 * 100.0 - } else if attestationEff.Valid && proposalEff.Valid && !syncEff.Valid { - efficiency = ((56.0 / 64.0 * attestationEff.Float64) + (8.0 / 64.0 * proposalEff.Float64)) * 100.0 - } else if attestationEff.Valid && !proposalEff.Valid && syncEff.Valid { - efficiency = ((62.0 / 64.0 * attestationEff.Float64) + (2.0 / 64.0 * syncEff.Float64)) * 100.0 - } else { - efficiency = (((54.0 / 64.0) * attestationEff.Float64) + ((8.0 / 64.0) * proposalEff.Float64) + ((2.0 / 64.0) * syncEff.Float64)) * 100.0 - } - - if efficiency < 0 { - efficiency = 0 - } - - return efficiency -} - func (d DataAccessService) calculateChartEfficiency(efficiencyType enums.VDBSummaryChartEfficiencyType, row *t.VDBValidatorSummaryChartRow) (float64, error) { efficiency := float64(0) switch efficiencyType { @@ -81,7 +59,7 @@ func (d DataAccessService) calculateChartEfficiency(efficiencyType enums.VDBSumm syncEfficiency.Valid = true } - efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) + efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) case enums.VDBSummaryChartAttestation: if row.AttestationIdealReward > 0 { efficiency = (row.AttestationReward / row.AttestationIdealReward) * 100 diff --git a/backend/pkg/api/data_access/vdb_management.go b/backend/pkg/api/data_access/vdb_management.go index cbf0a4b64..d152d6cac 100644 --- a/backend/pkg/api/data_access/vdb_management.go +++ b/backend/pkg/api/data_access/vdb_management.go @@ -520,7 +520,7 @@ func (d *DataAccessService) GetValidatorDashboardOverview(ctx context.Context, d syncEfficiency.Float64 = float64(queryResult.SyncExecuted) / float64(queryResult.SyncScheduled) syncEfficiency.Valid = true } - *efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) + *efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) return nil }) diff --git a/backend/pkg/api/data_access/vdb_summary.go b/backend/pkg/api/data_access/vdb_summary.go index 7f51b5bed..f4216ec7b 100644 --- a/backend/pkg/api/data_access/vdb_summary.go +++ b/backend/pkg/api/data_access/vdb_summary.go @@ -90,7 +90,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da if err != nil { return nil, nil, err } - averageNetworkEfficiency := d.calculateTotalEfficiency( + averageNetworkEfficiency := utils.CalculateTotalEfficiency( efficiency.AttestationEfficiency[period], efficiency.ProposalEfficiency[period], efficiency.SyncEfficiency[period]) // ------------------------------------------------------------------------------------------------------------------ @@ -366,7 +366,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da syncEfficiency.Float64 = float64(queryEntry.SyncExecuted) / float64(queryEntry.SyncScheduled) syncEfficiency.Valid = true } - resultEntry.Efficiency = d.calculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) + resultEntry.Efficiency = utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) // Add the duties info to the total total.AttestationReward = total.AttestationReward.Add(queryEntry.AttestationReward) @@ -486,7 +486,7 @@ func (d *DataAccessService) GetValidatorDashboardSummary(ctx context.Context, da totalSyncEfficiency.Float64 = float64(total.SyncExecuted) / float64(total.SyncScheduled) totalSyncEfficiency.Valid = true } - totalEntry.Efficiency = d.calculateTotalEfficiency(totalAttestationEfficiency, totalProposerEfficiency, totalSyncEfficiency) + totalEntry.Efficiency = utils.CalculateTotalEfficiency(totalAttestationEfficiency, totalProposerEfficiency, totalSyncEfficiency) result = append([]t.VDBSummaryTableRow{totalEntry}, result...) } @@ -1021,7 +1021,7 @@ func (d *DataAccessService) GetValidatorDashboardSummaryChart(ctx context.Contex if err != nil { return nil, err } - averageNetworkEfficiency := d.calculateTotalEfficiency( + averageNetworkEfficiency := utils.CalculateTotalEfficiency( efficiency.AttestationEfficiency[enums.Last24h], efficiency.ProposalEfficiency[enums.Last24h], efficiency.SyncEfficiency[enums.Last24h]) for ts := range tsMap { diff --git a/backend/pkg/commons/types/frontend.go b/backend/pkg/commons/types/frontend.go index 7fafe28be..60e0a7f48 100644 --- a/backend/pkg/commons/types/frontend.go +++ b/backend/pkg/commons/types/frontend.go @@ -131,6 +131,7 @@ var EventSortOrder = []EventName{ SyncCommitteeSoonEventName, ValidatorIsOfflineEventName, ValidatorIsOnlineEventName, + ValidatorGroupEfficiencyEventName, ValidatorReceivedWithdrawalEventName, NetworkLivenessIncreasedEventName, EthClientUpdateEventName, @@ -174,6 +175,7 @@ var MachineEventsMap = map[EventName]struct{}{ var LegacyEventLabel map[EventName]string = map[EventName]string{ ValidatorUpcomingProposalEventName: "Your validator(s) will soon propose a block", + ValidatorGroupEfficiencyEventName: "Your validator group efficiency is low", ValidatorMissedProposalEventName: "Your validator(s) missed a proposal", ValidatorExecutedProposalEventName: "Your validator(s) submitted a proposal", ValidatorMissedAttestationEventName: "Your validator(s) missed an attestation", @@ -198,6 +200,7 @@ var LegacyEventLabel map[EventName]string = map[EventName]string{ var EventLabel map[EventName]string = map[EventName]string{ ValidatorUpcomingProposalEventName: "Upcoming block proposal", + ValidatorGroupEfficiencyEventName: "Low validator group efficiency", ValidatorMissedProposalEventName: "Block proposal missed", ValidatorExecutedProposalEventName: "Block proposal submitted", ValidatorMissedAttestationEventName: "Attestation missed", @@ -232,6 +235,7 @@ func IsMachineNotification(event EventName) bool { var EventNames = []EventName{ ValidatorExecutedProposalEventName, + ValidatorGroupEfficiencyEventName, ValidatorMissedProposalEventName, ValidatorMissedAttestationEventName, ValidatorGotSlashedEventName, diff --git a/backend/pkg/commons/utils/efficiency.go b/backend/pkg/commons/utils/efficiency.go new file mode 100644 index 000000000..5bb7cd57c --- /dev/null +++ b/backend/pkg/commons/utils/efficiency.go @@ -0,0 +1,25 @@ +package utils + +import "database/sql" + +func CalculateTotalEfficiency(attestationEff, proposalEff, syncEff sql.NullFloat64) float64 { + efficiency := float64(0) + + if !attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid { + efficiency = 0 + } else if attestationEff.Valid && !proposalEff.Valid && !syncEff.Valid { + efficiency = attestationEff.Float64 * 100.0 + } else if attestationEff.Valid && proposalEff.Valid && !syncEff.Valid { + efficiency = ((56.0 / 64.0 * attestationEff.Float64) + (8.0 / 64.0 * proposalEff.Float64)) * 100.0 + } else if attestationEff.Valid && !proposalEff.Valid && syncEff.Valid { + efficiency = ((62.0 / 64.0 * attestationEff.Float64) + (2.0 / 64.0 * syncEff.Float64)) * 100.0 + } else { + efficiency = (((54.0 / 64.0) * attestationEff.Float64) + ((8.0 / 64.0) * proposalEff.Float64) + ((2.0 / 64.0) * syncEff.Float64)) * 100.0 + } + + if efficiency < 0 { + efficiency = 0 + } + + return efficiency +} diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go index 7633e786a..ba6b0568f 100644 --- a/backend/pkg/notification/collection.go +++ b/backend/pkg/notification/collection.go @@ -21,9 +21,11 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/services" "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" + constypes "github.com/gobitfly/beaconchain/pkg/consapi/types" "github.com/gobitfly/beaconchain/pkg/exporter/modules" "github.com/lib/pq" "github.com/rocket-pool/rocketpool-go/utils/eth" + "github.com/shopspring/decimal" ) func InitNotificationCollector(pubkeyCachePath string) { @@ -47,6 +49,7 @@ func notificationCollector() { once.Do(func() { gob.Register(&ValidatorProposalNotification{}) gob.Register(&ValidatorUpcomingProposalNotification{}) + gob.Register(&ValidatorGroupEfficiencyNotification{}) gob.Register(&ValidatorAttestationNotification{}) gob.Register(&ValidatorIsOfflineNotification{}) gob.Register(&ValidatorIsOnlineNotification{}) @@ -60,13 +63,13 @@ func notificationCollector() { gob.Register(&SyncCommitteeSoonNotification{}) }) + mc, err := modules.GetModuleContext() + if err != nil { + log.Fatal(err, "error getting module context", 0) + } + go func() { log.Infof("starting head notification collector") - mc, err := modules.GetModuleContext() - if err != nil { - log.Fatal(err, "error getting module context", 0) - } - for ; ; time.Sleep(time.Second * 30) { // get the head epoch head, err := mc.ConsClient.GetChainHead() @@ -158,7 +161,7 @@ func notificationCollector() { log.Infof("collecting notifications for epoch %v", epoch) // Network DB Notifications (network related) - notifications, err := collectNotifications(epoch) + notifications, err := collectNotifications(epoch, mc) if err != nil { log.Error(err, "error collection notifications", 0) @@ -277,7 +280,7 @@ func collectUpcomingBlockProposalNotifications(notificationsByUserID types.Notif return nil } -func collectNotifications(epoch uint64) (types.NotificationsPerUserId, error) { +func collectNotifications(epoch uint64, mc modules.ModuleContext) (types.NotificationsPerUserId, error) { notificationsByUserID := types.NotificationsPerUserId{} start := time.Now() var err error @@ -308,6 +311,13 @@ func collectNotifications(epoch uint64) (types.NotificationsPerUserId, error) { // The following functions will collect the notifications and add them to the // notificationsByUserID map. The notifications will be queued and sent later // by the notification sender process + err = collectGroupEfficiencyNotifications(notificationsByUserID, epoch, mc) + if err != nil { + metrics.Errors.WithLabelValues("notifications_collect_group_efficiency").Inc() + return nil, fmt.Errorf("error collecting validator_group_efficiency notifications: %v", err) + } + log.Infof("collecting group efficiency notifications took: %v", time.Since(start)) + err = collectAttestationAndOfflineValidatorNotifications(notificationsByUserID, epoch) if err != nil { metrics.Errors.WithLabelValues("notifications_collect_missed_attestation").Inc() @@ -460,6 +470,263 @@ func collectUserDbNotifications(epoch uint64) (types.NotificationsPerUserId, err return notificationsByUserID, nil } +func collectGroupEfficiencyNotifications(notificationsByUserID types.NotificationsPerUserId, epoch uint64, mc modules.ModuleContext) error { + type dbResult struct { + ValidatorIndex uint64 `db:"validator_index"` + AttestationReward decimal.Decimal `db:"attestations_reward"` + AttestationIdealReward decimal.Decimal `db:"attestations_ideal_reward"` + BlocksProposed uint64 `db:"blocks_proposed"` + BlocksScheduled uint64 `db:"blocks_scheduled"` + SyncExecuted uint64 `db:"sync_executed"` + SyncScheduled uint64 `db:"sync_scheduled"` + } + + // retrieve rewards for the epoch + log.Info("retrieving validator metadata") + validators, err := mc.CL.GetValidators(epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch, nil, []constypes.ValidatorStatus{constypes.Active}) + if err != nil { + return fmt.Errorf("error getting validators: %w", err) + } + effectiveBalanceMap := make(map[uint64]uint64) + activeValidatorsMap := make(map[uint64]struct{}) + for _, validator := range validators.Data { + effectiveBalanceMap[validator.Index] = validator.Validator.EffectiveBalance + activeValidatorsMap[validator.Index] = struct{}{} + } + log.Info("retrieving attestation reward data") + attestationRewards, err := mc.CL.GetAttestationRewards(epoch) + if err != nil { + return fmt.Errorf("error getting attestation rewards: %w", err) + } + + efficiencyMap := make(map[types.ValidatorIndex]*dbResult, len(attestationRewards.Data.TotalRewards)) + + idealRewardsMap := make(map[uint64]decimal.Decimal) + for _, reward := range attestationRewards.Data.IdealRewards { + idealRewardsMap[uint64(reward.EffectiveBalance)] = decimal.NewFromInt(int64(reward.Head) + int64(reward.Target) + int64(reward.Source) + int64(reward.InclusionDelay) + int64(reward.Inactivity)) + } + for _, reward := range attestationRewards.Data.TotalRewards { + efficiencyMap[types.ValidatorIndex(reward.ValidatorIndex)] = &dbResult{ + ValidatorIndex: reward.ValidatorIndex, + AttestationReward: decimal.NewFromInt(int64(reward.Head) + int64(reward.Target) + int64(reward.Source) + int64(reward.InclusionDelay) + int64(reward.Inactivity)), + AttestationIdealReward: idealRewardsMap[effectiveBalanceMap[reward.ValidatorIndex]], + } + } + + log.Info("retrieving block proposal data") + proposalAssignments, err := mc.CL.GetPropoalAssignments(epoch) + if err != nil { + return fmt.Errorf("error getting proposal assignments: %w", err) + } + for _, assignment := range proposalAssignments.Data { + efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)].BlocksScheduled++ + } + + syncAssignments, err := mc.CL.GetSyncCommitteesAssignments(nil, epoch*utils.Config.Chain.ClConfig.SlotsPerEpoch) + if err != nil { + return fmt.Errorf("error getting sync committee assignments: %w", err) + } + + for slot := epoch * utils.Config.Chain.ClConfig.SlotsPerEpoch; slot < (epoch+1)*utils.Config.Chain.ClConfig.SlotsPerEpoch; slot++ { + log.Infof("retrieving data for slot %v", slot) + s, err := mc.CL.GetSlot(slot) + if err != nil && strings.Contains(err.Error(), "NOT_FOUND") { + continue + } else if err != nil { + return fmt.Errorf("error getting block header for slot %v: %w", slot, err) + } + efficiencyMap[types.ValidatorIndex(s.Data.Message.ProposerIndex)].BlocksProposed++ + + for i, validatorIndex := range syncAssignments.Data.Validators { + efficiencyMap[types.ValidatorIndex(validatorIndex)].SyncScheduled++ + + if utils.BitAtVector(s.Data.Message.Body.SyncAggregate.SyncCommitteeBits, i) { + efficiencyMap[types.ValidatorIndex(validatorIndex)].SyncExecuted++ + } + } + } + + subMap, err := GetSubsForEventFilter(types.ValidatorGroupEfficiencyEventName, "", nil, nil) + if err != nil { + return fmt.Errorf("error getting subscriptions for (missed) block proposals %w", err) + } + + // create a lookup map for the dashboard & groups + type groupDetails struct { + Validators []types.ValidatorIndex + Subscription *types.Subscription + } + dashboardMap := make(map[types.UserId]map[types.DashboardId]map[types.DashboardGroupId]*groupDetails) + + for _, subs := range subMap { + for _, sub := range subs { + if sub.DashboardId == nil || sub.DashboardGroupId == nil { + continue + } + userId := *sub.UserID + dashboardId := types.DashboardId(*sub.DashboardId) + groupId := types.DashboardGroupId(*sub.DashboardGroupId) + if _, ok := dashboardMap[userId]; !ok { + dashboardMap[userId] = make(map[types.DashboardId]map[types.DashboardGroupId]*groupDetails) + } + if _, ok := dashboardMap[userId][dashboardId]; !ok { + dashboardMap[userId][dashboardId] = make(map[types.DashboardGroupId]*groupDetails) + } + if _, ok := dashboardMap[userId][dashboardId][groupId]; !ok { + dashboardMap[userId][dashboardId][groupId] = &groupDetails{ + Validators: []types.ValidatorIndex{}, + } + } + if sub.EventFilter != "" { + pubkeyDecoded, err := hex.DecodeString(sub.EventFilter) + if err != nil { + return fmt.Errorf("error decoding pubkey %v: %w", sub.EventFilter, err) + } + validatorIndex, err := GetIndexForPubkey(pubkeyDecoded) + if err != nil { + return fmt.Errorf("error getting validator index for pubkey %v: %w", sub.EventFilter, err) + } + dashboardMap[userId][dashboardId][groupId].Validators = append(dashboardMap[*sub.UserID][dashboardId][groupId].Validators, types.ValidatorIndex(validatorIndex)) + } + dashboardMap[userId][dashboardId][groupId].Subscription = sub + } + } + + // The commented code below can be used to validate data retrieved from the node against + // data in clickhouse + // var queryResult []*dbResult + // clickhouseTable := "validator_dashboard_data_epoch" + // // retrieve efficiency data for the epoch + // log.Infof("retrieving efficiency data for epoch %v", epoch) + // ds := goqu.Dialect("postgres"). + // From(goqu.L(fmt.Sprintf(`%s AS r`, clickhouseTable))). + // Select( + // goqu.L("validator_index"), + // goqu.L("COALESCE(r.attestations_reward, 0) AS attestations_reward"), + // goqu.L("COALESCE(r.attestations_ideal_reward, 0) AS attestations_ideal_reward"), + // goqu.L("COALESCE(r.blocks_proposed, 0) AS blocks_proposed"), + // goqu.L("COALESCE(r.blocks_scheduled, 0) AS blocks_scheduled"), + // goqu.L("COALESCE(r.sync_executed, 0) AS sync_executed"), + // goqu.L("COALESCE(r.sync_scheduled, 0) AS sync_scheduled")). + // Where(goqu.L("r.epoch_timestamp = ?", utils.EpochToTime(epoch))) + // query, args, err := ds.Prepared(true).ToSQL() + // if err != nil { + // return fmt.Errorf("error preparing query: %v", err) + // } + + // err = db.ClickHouseReader.Select(&queryResult, query, args...) + // if err != nil { + // return fmt.Errorf("error retrieving data from table %s: %v", clickhouseTable, err) + // } + + // if len(queryResult) == 0 { + // return fmt.Errorf("no efficiency data found for epoch %v", epoch) + // } + + // log.Infof("retrieved %v efficiency data rows", len(queryResult)) + + // for _, row := range queryResult { + // if _, ok := activeValidatorsMap[row.ValidatorIndex]; !ok { + // continue + // } + // existing := efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] + + // if existing == nil { + // existing = &dbResult{ + // ValidatorIndex: row.ValidatorIndex, + // AttestationReward: decimal.Decimal{}, + // AttestationIdealReward: decimal.Decimal{}, + // } + // } + // if !existing.AttestationIdealReward.Equal(row.AttestationIdealReward) { + // log.Fatal(fmt.Errorf("ideal reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationIdealReward, row.AttestationIdealReward), "ideal reward mismatch", 0) + // } + // if !existing.AttestationReward.Equal(row.AttestationReward) { + // log.Fatal(fmt.Errorf("attestation reward mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.AttestationReward, row.AttestationReward), "attestation reward mismatch", 0) + // } + // if existing.BlocksProposed != row.BlocksProposed { + // log.Fatal(fmt.Errorf("blocks proposed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.BlocksProposed, row.BlocksProposed), "blocks proposed mismatch", 0) + // } + // if existing.BlocksScheduled != row.BlocksScheduled { + // log.Fatal(fmt.Errorf("blocks scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.BlocksScheduled, row.BlocksScheduled), "blocks scheduled mismatch", 0) + // } + // if existing.SyncExecuted != row.SyncExecuted { + // log.Fatal(fmt.Errorf("sync executed mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncExecuted, row.SyncExecuted), "sync executed mismatch", 0) + // } + // if existing.SyncScheduled != row.SyncScheduled { + // log.Fatal(fmt.Errorf("sync scheduled mismatch for validator %v: %v != %v", row.ValidatorIndex, existing.SyncScheduled, row.SyncScheduled), "sync scheduled mismatch", 0) + // } + // efficiencyMap[types.ValidatorIndex(row.ValidatorIndex)] = row + // } + + for userId, dashboards := range dashboardMap { + for dashboardId, groups := range dashboards { + for groupId, groupDetails := range groups { + attestationReward := decimal.Decimal{} + attestationIdealReward := decimal.Decimal{} + blocksProposed := uint64(0) + blocksScheduled := uint64(0) + syncExecuted := uint64(0) + syncScheduled := uint64(0) + + for _, validatorIndex := range groupDetails.Validators { + if row, ok := efficiencyMap[validatorIndex]; ok { + attestationReward = attestationReward.Add(row.AttestationReward) + attestationIdealReward = attestationIdealReward.Add(row.AttestationIdealReward) + blocksProposed += row.BlocksProposed + blocksScheduled += row.BlocksScheduled + syncExecuted += row.SyncExecuted + syncScheduled += row.SyncScheduled + } + } + + var attestationEfficiency, proposerEfficiency, syncEfficiency sql.NullFloat64 + + if !attestationIdealReward.IsZero() { + attestationEfficiency.Float64 = attestationReward.Div(attestationIdealReward).InexactFloat64() + attestationEfficiency.Valid = true + } + if blocksScheduled > 0 { + proposerEfficiency.Float64 = float64(blocksProposed) / float64(blocksScheduled) + proposerEfficiency.Valid = true + } + if syncScheduled > 0 { + syncEfficiency.Float64 = float64(syncExecuted) / float64(syncScheduled) + syncEfficiency.Valid = true + } + + efficiency := utils.CalculateTotalEfficiency(attestationEfficiency, proposerEfficiency, syncEfficiency) + + log.Infof("efficiency: %v, threshold: %v", efficiency, groupDetails.Subscription.EventThreshold*100) + + if efficiency < groupDetails.Subscription.EventThreshold*100 { + log.Infof("creating group efficiency notification for user %v, dashboard %v, group %v in epoch %v", userId, dashboardId, groupId, epoch) + n := &ValidatorGroupEfficiencyNotification{ + NotificationBaseImpl: types.NotificationBaseImpl{ + SubscriptionID: *groupDetails.Subscription.ID, + UserID: *groupDetails.Subscription.UserID, + Epoch: epoch, + EventName: groupDetails.Subscription.EventName, + EventFilter: "-", + DashboardId: groupDetails.Subscription.DashboardId, + DashboardName: groupDetails.Subscription.DashboardName, + DashboardGroupId: groupDetails.Subscription.DashboardGroupId, + DashboardGroupName: groupDetails.Subscription.DashboardGroupName, + }, + Threshold: groupDetails.Subscription.EventThreshold * 100, + Efficiency: efficiency, + } + notificationsByUserID.AddNotification(n) + metrics.NotificationsCollected.WithLabelValues(string(n.GetEventName())).Inc() + } + } + } + } + + log.Info("done collecting group efficiency notifications") + + return nil +} func collectBlockProposalNotifications(notificationsByUserID types.NotificationsPerUserId, status uint64, eventName types.EventName, epoch uint64) error { type dbResult struct { Proposer uint64 `db:"proposer"` diff --git a/backend/pkg/notification/notifications.go b/backend/pkg/notification/notifications.go index 084bdf4f2..7bb71162a 100644 --- a/backend/pkg/notification/notifications.go +++ b/backend/pkg/notification/notifications.go @@ -3,15 +3,21 @@ package notification import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/types" + "github.com/gobitfly/beaconchain/pkg/exporter/modules" ) // Used for isolated testing func GetNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.NotificationsPerUserId, error) { - err := initPubkeyCache(pubkeyCachePath) + mc, err := modules.GetModuleContext() + if err != nil { + log.Fatal(err, "error getting module context", 0) + } + + err = initPubkeyCache(pubkeyCachePath) if err != nil { log.Fatal(err, "error initializing pubkey cache path for notifications", 0) } - return collectNotifications(epoch) + return collectNotifications(epoch, mc) } // Used for isolated testing diff --git a/backend/pkg/notification/pubkey_cache.go b/backend/pkg/notification/pubkey_cache.go index 483884cf5..37f3f5ea2 100644 --- a/backend/pkg/notification/pubkey_cache.go +++ b/backend/pkg/notification/pubkey_cache.go @@ -72,7 +72,7 @@ func GetIndexForPubkey(pubkey []byte) (uint64, error) { if err != nil { return 0, err } - log.Infof("serving index %d for validator %x from db", index, pubkey) + // log.Infof("serving index %d for validator %x from db", index, pubkey) return index, nil } else if err != nil { return 0, err diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 90cb78bca..803a618c8 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -380,6 +380,9 @@ func RenderEmailsForUserEvents(epoch uint64, notificationsByUserID types.Notific case types.ValidatorExecutedProposalEventName: //nolint:gosec // this is a static string bodySummary += template.HTML(fmt.Sprintf("%s: %d validator%s, Reward: %.3f ETH", types.EventLabel[event], count, plural, totalBlockReward)) + case types.ValidatorGroupEfficiencyEventName: + //nolint:gosec // this is a static string + bodySummary += template.HTML(fmt.Sprintf("%s: %d Group%s", types.EventLabel[event], count, plural)) default: //nolint:gosec // this is a static string bodySummary += template.HTML(fmt.Sprintf("%s: %d Validator%s", types.EventLabel[event], count, plural)) @@ -513,6 +516,8 @@ func RenderPushMessagesForUserEvents(epoch uint64, notificationsByUserID types.N bodySummary += fmt.Sprintf("%s: %d machine%s", types.EventLabel[event], count, plural) case types.ValidatorExecutedProposalEventName: bodySummary += fmt.Sprintf("%s: %d validator%s, Reward: %.3f ETH", types.EventLabel[event], count, plural, totalBlockReward) + case types.ValidatorGroupEfficiencyEventName: + bodySummary += fmt.Sprintf("%s: %d group%s", types.EventLabel[event], count, plural) default: bodySummary += fmt.Sprintf("%s: %d validator%s", types.EventLabel[event], count, plural) } diff --git a/backend/pkg/notification/types.go b/backend/pkg/notification/types.go index b3dcc8377..754955cda 100644 --- a/backend/pkg/notification/types.go +++ b/backend/pkg/notification/types.go @@ -51,7 +51,7 @@ func formatSlotLink(format types.NotificationFormat, slot interface{}) string { return "" } -func formatDashboardAndGroupLink(format types.NotificationFormat, n types.Notification) string { +func formatValidatorPrefixedDashboardAndGroupLink(format types.NotificationFormat, n types.Notification) string { dashboardAndGroupInfo := "" if n.GetDashboardId() != nil { switch format { @@ -66,6 +66,21 @@ func formatDashboardAndGroupLink(format types.NotificationFormat, n types.Notifi return dashboardAndGroupInfo } +func formatPureDashboardAndGroupLink(format types.NotificationFormat, n types.Notification) string { + dashboardAndGroupInfo := "" + if n.GetDashboardId() != nil { + switch format { + case types.NotifciationFormatHtml: + dashboardAndGroupInfo = fmt.Sprintf(`Group %[2]v in Dashboard %[3]v`, utils.Config.Frontend.SiteDomain, n.GetDashboardGroupName(), n.GetDashboardName(), *n.GetDashboardId()) + case types.NotifciationFormatText: + dashboardAndGroupInfo = fmt.Sprintf(`Group %[1]v in Dashboard %[2]v`, n.GetDashboardGroupName(), n.GetDashboardName()) + case types.NotifciationFormatMarkdown: + dashboardAndGroupInfo = fmt.Sprintf(`Group **%[1]v** in Dashboard [%[2]v](https://%[3]v/dashboard/%[4]v)`, n.GetDashboardGroupName(), n.GetDashboardName(), utils.Config.Frontend.SiteDomain, *n.GetDashboardId()) + } + } + return dashboardAndGroupInfo +} + type ValidatorProposalNotification struct { types.NotificationBaseImpl @@ -83,7 +98,7 @@ func (n *ValidatorProposalNotification) GetEntitiyId() string { func (n *ValidatorProposalNotification) GetInfo(format types.NotificationFormat) string { vali := formatValidatorLink(format, n.ValidatorIndex) slot := formatSlotLink(format, n.Slot) - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) switch n.Status { case 0: @@ -149,7 +164,7 @@ func (n *ValidatorUpcomingProposalNotification) GetEntitiyId() string { func (n *ValidatorUpcomingProposalNotification) GetInfo(format types.NotificationFormat) string { vali := formatValidatorLink(format, n.ValidatorIndex) slot := formatSlotLink(format, n.Slot) - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) return fmt.Sprintf(`New scheduled block proposal at slot %s for Validator %s%s.`, slot, vali, dashboardAndGroupInfo) } @@ -184,7 +199,7 @@ func (n *ValidatorIsOfflineNotification) GetEntitiyId() string { func (n *ValidatorIsOfflineNotification) GetInfo(format types.NotificationFormat) string { vali := formatValidatorLink(format, n.ValidatorIndex) epoch := formatEpochLink(format, n.LatestState) - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) return fmt.Sprintf(`Validator %v%v is offline since epoch %s.`, vali, dashboardAndGroupInfo, epoch) } @@ -214,7 +229,7 @@ func (n *ValidatorIsOnlineNotification) GetEntitiyId() string { func (n *ValidatorIsOnlineNotification) GetInfo(format types.NotificationFormat) string { vali := formatValidatorLink(format, n.ValidatorIndex) epoch := formatEpochLink(format, n.Epoch) - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) return fmt.Sprintf(`Validator %v%v is back online since epoch %v.`, vali, dashboardAndGroupInfo, epoch) } @@ -230,55 +245,35 @@ func (n *ValidatorIsOnlineNotification) GetLegacyTitle() string { return "Validator Back Online" } -// type validatorGroupIsOfflineNotification struct { -// types.NotificationBaseImpl - -// IsOffline bool -// } - -// func (n *validatorGroupIsOfflineNotification) GetEntitiyId() string { -// return fmt.Sprintf("%s - %s", n.GetDashboardName(), n.GetDashboardGroupName()) -// } - -// // Overwrite specific methods -// func (n *validatorGroupIsOfflineNotification) GetInfo(format types.NotificationFormat) string { -// epoch := "" -// if n.IsOffline { -// epoch = formatEpochLink(format, n.LatestState) -// } else { -// epoch = formatEpochLink(format, n.Epoch) -// } - -// if n.IsOffline { -// return fmt.Sprintf(`Group %s is offline since epoch %s.`, n.DashboardGroupName, epoch) -// } else { -// return fmt.Sprintf(`Group %s is back online since epoch %v.`, n.DashboardGroupName, epoch) -// } -// } - -// func (n *validatorGroupIsOfflineNotification) GetTitle() string { -// if n.IsOffline { -// return "Group is offline" -// } else { -// return "Group is back online" -// } -// } - -// func (n *validatorGroupIsOfflineNotification) GetLegacyInfo() string { -// if n.IsOffline { -// return fmt.Sprintf(`Group %s is offline since epoch %s.`, n.DashboardGroupName, n.LatestState) -// } else { -// return fmt.Sprintf(`Group %s is back online since epoch %v.`, n.DashboardGroupName, n.Epoch) -// } -// } - -// func (n *validatorGroupIsOfflineNotification) GetLegacyTitle() string { -// if n.IsOffline { -// return "Group is offline" -// } else { -// return "Group is back online" -// } -// } +type ValidatorGroupEfficiencyNotification struct { + types.NotificationBaseImpl + + Threshold float64 + Efficiency float64 +} + +func (n *ValidatorGroupEfficiencyNotification) GetEntitiyId() string { + return fmt.Sprintf("%s - %s", n.GetDashboardName(), n.GetDashboardGroupName()) +} + +// Overwrite specific methods +func (n *ValidatorGroupEfficiencyNotification) GetInfo(format types.NotificationFormat) string { + dashboardAndGroupInfo := formatPureDashboardAndGroupLink(format, n) + epoch := formatEpochLink(format, n.Epoch) + return fmt.Sprintf(`%s efficiency of %.2f%% was below the threhold of %.2f%% in epoch %s.`, dashboardAndGroupInfo, n.Efficiency, n.Threshold, epoch) +} + +func (n *ValidatorGroupEfficiencyNotification) GetTitle() string { + return "Low group efficiency" +} + +func (n *ValidatorGroupEfficiencyNotification) GetLegacyInfo() string { + return n.GetInfo(types.NotifciationFormatText) +} + +func (n *ValidatorGroupEfficiencyNotification) GetLegacyTitle() string { + return n.GetTitle() +} type ValidatorAttestationNotification struct { types.NotificationBaseImpl @@ -293,7 +288,7 @@ func (n *ValidatorAttestationNotification) GetEntitiyId() string { } func (n *ValidatorAttestationNotification) GetInfo(format types.NotificationFormat) string { - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) vali := formatValidatorLink(format, n.ValidatorIndex) epoch := formatEpochLink(format, n.Epoch) @@ -367,7 +362,7 @@ func (n *ValidatorGotSlashedNotification) GetEntitiyId() string { } func (n *ValidatorGotSlashedNotification) GetInfo(format types.NotificationFormat) string { - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) vali := formatValidatorLink(format, n.ValidatorIndex) epoch := formatEpochLink(format, n.Epoch) slasher := formatValidatorLink(format, n.Slasher) @@ -403,7 +398,7 @@ func (n *ValidatorWithdrawalNotification) GetEntitiyId() string { } func (n *ValidatorWithdrawalNotification) GetInfo(format types.NotificationFormat) string { - dashboardAndGroupInfo := formatDashboardAndGroupLink(format, n) + dashboardAndGroupInfo := formatValidatorPrefixedDashboardAndGroupLink(format, n) vali := formatValidatorLink(format, n.ValidatorIndex) amount := utils.FormatClCurrencyString(n.Amount, utils.Config.Frontend.MainCurrency, 6, true, false, false) generalPart := fmt.Sprintf(`An automatic withdrawal of %s has been processed for validator %s%s.`, amount, vali, dashboardAndGroupInfo)