From d6ed420fc57f4fb8faef44ce113040d7f2d1c651 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 23 Sep 2024 06:01:41 +0000 Subject: [PATCH] feat(notifications): make user db based notifications work --- backend/cmd/misc/main.go | 25 ++++++++++++++++++++++- backend/pkg/commons/types/frontend.go | 10 +++++++++ backend/pkg/notification/db.go | 9 +++++--- backend/pkg/notification/notifications.go | 10 ++++++++- 4 files changed, 49 insertions(+), 5 deletions(-) diff --git a/backend/cmd/misc/main.go b/backend/cmd/misc/main.go index fd77d8217..abca7750f 100644 --- a/backend/cmd/misc/main.go +++ b/backend/cmd/misc/main.go @@ -76,7 +76,7 @@ func Run() { } configPath := fs.String("config", "config/default.config.yml", "Path to the config file") - fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications") + fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications") fs.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch") fs.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch") fs.Uint64Var(&opts.User, "user", 0, "user id") @@ -217,6 +217,14 @@ func Run() { db.PersistentRedisDbClient = rdc defer db.PersistentRedisDbClient.Close() + if utils.Config.TieredCacheProvider != "redis" { + log.Fatal(nil, "no cache provider set, please set TierdCacheProvider (redis)", 0) + } + if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 { + cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint) + log.Infof("tiered Cache initialized, latest finalized epoch: %v", cache.LatestFinalizedEpoch.Get()) + } + switch opts.Command { case "nameValidatorsByRanges": err := nameValidatorsByRanges(opts.ValidatorNameRanges) @@ -459,6 +467,8 @@ func Run() { err = fixEnsAddresses(erigonClient) case "collect-notifications": err = collectNotifications(opts.StartEpoch) + case "collect-user-db-notifications": + err = collectUserDbNotifications(opts.StartEpoch) default: log.Fatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0) } @@ -483,6 +493,19 @@ func collectNotifications(startEpoch uint64) error { return nil } +func collectUserDbNotifications(startEpoch uint64) error { + epoch := startEpoch + + log.Infof("collecting notifications for epoch %v", epoch) + notifications, err := notification.GetUserNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch) + if err != nil { + return err + } + + log.Infof("found %v notifications for epoch %v", len(notifications), epoch) + return nil +} + func fixEns(erigonClient *rpc.ErigonClient) error { log.Infof("command: fix-ens") addrs := []struct { diff --git a/backend/pkg/commons/types/frontend.go b/backend/pkg/commons/types/frontend.go index a7b637d8f..83de47bd7 100644 --- a/backend/pkg/commons/types/frontend.go +++ b/backend/pkg/commons/types/frontend.go @@ -103,6 +103,16 @@ var UserIndexEvents = []EventName{ MonitoringMachineSwitchedToETH1FallbackEventName, } +var UserIndexEventsMap = map[EventName]struct{}{ + EthClientUpdateEventName: {}, + MonitoringMachineCpuLoadEventName: {}, + MonitoringMachineOfflineEventName: {}, + MonitoringMachineDiskAlmostFullEventName: {}, + MonitoringMachineMemoryUsageEventName: {}, + MonitoringMachineSwitchedToETH2FallbackEventName: {}, + MonitoringMachineSwitchedToETH1FallbackEventName: {}, +} + var EventLabel map[EventName]string = map[EventName]string{ ValidatorBalanceDecreasedEventName: "Your validator(s) balance decreased", ValidatorMissedProposalEventName: "Your validator(s) missed a proposal", diff --git a/backend/pkg/notification/db.go b/backend/pkg/notification/db.go index d065f4e87..7d68b125d 100644 --- a/backend/pkg/notification/db.go +++ b/backend/pkg/notification/db.go @@ -33,6 +33,11 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las // where event_name = $1 // ` + eventNameForQuery := utils.GetNetwork() + ":" + string(eventName) + + if _, ok := types.UserIndexEventsMap[eventName]; ok { + eventNameForQuery = string(eventName) + } ds := goqu.Dialect("postgres").From("users_subscriptions").Select( goqu.C("id"), goqu.C("user_id"), @@ -41,7 +46,7 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las goqu.C("created_epoch"), goqu.C("event_threshold"), goqu.C("event_name"), - ).Where(goqu.L("(event_name = ? AND user_id <> 0)", utils.GetNetwork()+":"+string(eventName))) + ).Where(goqu.L("(event_name = ? AND user_id <> 0)", eventNameForQuery)) if lastSentFilter != "" { if len(lastSentFilterArgs) > 0 { @@ -59,8 +64,6 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las return nil, err } - log.Info(query) - subMap := make(map[string][]types.Subscription, 0) err = db.FrontendWriterDB.Select(&subs, query, args...) if err != nil { diff --git a/backend/pkg/notification/notifications.go b/backend/pkg/notification/notifications.go index 89ec6c858..47fc8491d 100644 --- a/backend/pkg/notification/notifications.go +++ b/backend/pkg/notification/notifications.go @@ -53,6 +53,14 @@ func GetNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.Notif return collectNotifications(epoch) } +func GetUserNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.NotificationsPerUserId, error) { + err := initPubkeyCache(pubkeyCachePath) + if err != nil { + log.Fatal(err, "error initializing pubkey cache path for notifications", 0) + } + return collectUserDbNotifications(epoch) +} + func InitNotificationCollector(pubkeyCachePath string) { err := initPubkeyCache(pubkeyCachePath) if err != nil { @@ -2075,7 +2083,7 @@ func collectMonitoringMachine( dbResult, err := GetSubsForEventFilter( eventName, - "(created_epoch <= ? AND (last_sent_epoch < (? - ?) OR last_sent_epoch IS NULL))", + "(created_epoch <= ? AND (last_sent_epoch < (?::int - ?::int) OR last_sent_epoch IS NULL))", // ::int is required here otherwise the generated goose query throw an error []interface{}{epoch, epoch, epochWaitInBetween}, nil, )