Skip to content

Commit

Permalink
feat(notifications): make user db based notifications work
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbitfly committed Sep 23, 2024
1 parent 28b0e56 commit d6ed420
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 5 deletions.
25 changes: 24 additions & 1 deletion backend/cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func Run() {
}

configPath := fs.String("config", "config/default.config.yml", "Path to the config file")
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications")
fs.StringVar(&opts.Command, "command", "", "command to run, available: updateAPIKey, applyDbSchema, initBigtableSchema, epoch-export, debug-rewards, debug-blocks, clear-bigtable, index-old-eth1-blocks, update-aggregation-bits, historic-prices-export, index-missing-blocks, export-epoch-missed-slots, migrate-last-attestation-slot-bigtable, export-genesis-validators, update-block-finalization-sequentially, nameValidatorsByRanges, export-stats-totals, export-sync-committee-periods, export-sync-committee-validator-stats, partition-validator-stats, migrate-app-purchases, collect-notifications, collect-user-db-notifications")
fs.Uint64Var(&opts.StartEpoch, "start-epoch", 0, "start epoch")
fs.Uint64Var(&opts.EndEpoch, "end-epoch", 0, "end epoch")
fs.Uint64Var(&opts.User, "user", 0, "user id")
Expand Down Expand Up @@ -217,6 +217,14 @@ func Run() {
db.PersistentRedisDbClient = rdc
defer db.PersistentRedisDbClient.Close()

if utils.Config.TieredCacheProvider != "redis" {
log.Fatal(nil, "no cache provider set, please set TierdCacheProvider (redis)", 0)
}
if utils.Config.TieredCacheProvider == "redis" || len(utils.Config.RedisCacheEndpoint) != 0 {
cache.MustInitTieredCache(utils.Config.RedisCacheEndpoint)
log.Infof("tiered Cache initialized, latest finalized epoch: %v", cache.LatestFinalizedEpoch.Get())
}

switch opts.Command {
case "nameValidatorsByRanges":
err := nameValidatorsByRanges(opts.ValidatorNameRanges)
Expand Down Expand Up @@ -459,6 +467,8 @@ func Run() {
err = fixEnsAddresses(erigonClient)
case "collect-notifications":
err = collectNotifications(opts.StartEpoch)
case "collect-user-db-notifications":
err = collectUserDbNotifications(opts.StartEpoch)
default:
log.Fatal(nil, fmt.Sprintf("unknown command %s", opts.Command), 0)
}
Expand All @@ -483,6 +493,19 @@ func collectNotifications(startEpoch uint64) error {
return nil
}

func collectUserDbNotifications(startEpoch uint64) error {
epoch := startEpoch

log.Infof("collecting notifications for epoch %v", epoch)
notifications, err := notification.GetUserNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch)
if err != nil {
return err
}

log.Infof("found %v notifications for epoch %v", len(notifications), epoch)
return nil
}

func fixEns(erigonClient *rpc.ErigonClient) error {
log.Infof("command: fix-ens")
addrs := []struct {
Expand Down
10 changes: 10 additions & 0 deletions backend/pkg/commons/types/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,16 @@ var UserIndexEvents = []EventName{
MonitoringMachineSwitchedToETH1FallbackEventName,
}

var UserIndexEventsMap = map[EventName]struct{}{
EthClientUpdateEventName: {},
MonitoringMachineCpuLoadEventName: {},
MonitoringMachineOfflineEventName: {},
MonitoringMachineDiskAlmostFullEventName: {},
MonitoringMachineMemoryUsageEventName: {},
MonitoringMachineSwitchedToETH2FallbackEventName: {},
MonitoringMachineSwitchedToETH1FallbackEventName: {},
}

var EventLabel map[EventName]string = map[EventName]string{
ValidatorBalanceDecreasedEventName: "Your validator(s) balance decreased",
ValidatorMissedProposalEventName: "Your validator(s) missed a proposal",
Expand Down
9 changes: 6 additions & 3 deletions backend/pkg/notification/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las
// where event_name = $1
// `

eventNameForQuery := utils.GetNetwork() + ":" + string(eventName)

if _, ok := types.UserIndexEventsMap[eventName]; ok {
eventNameForQuery = string(eventName)
}
ds := goqu.Dialect("postgres").From("users_subscriptions").Select(
goqu.C("id"),
goqu.C("user_id"),
Expand All @@ -41,7 +46,7 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las
goqu.C("created_epoch"),
goqu.C("event_threshold"),
goqu.C("event_name"),
).Where(goqu.L("(event_name = ? AND user_id <> 0)", utils.GetNetwork()+":"+string(eventName)))
).Where(goqu.L("(event_name = ? AND user_id <> 0)", eventNameForQuery))

if lastSentFilter != "" {
if len(lastSentFilterArgs) > 0 {
Expand All @@ -59,8 +64,6 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las
return nil, err
}

log.Info(query)

subMap := make(map[string][]types.Subscription, 0)
err = db.FrontendWriterDB.Select(&subs, query, args...)
if err != nil {
Expand Down
10 changes: 9 additions & 1 deletion backend/pkg/notification/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ func GetNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.Notif
return collectNotifications(epoch)
}

func GetUserNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.NotificationsPerUserId, error) {
err := initPubkeyCache(pubkeyCachePath)
if err != nil {
log.Fatal(err, "error initializing pubkey cache path for notifications", 0)
}
return collectUserDbNotifications(epoch)
}

func InitNotificationCollector(pubkeyCachePath string) {
err := initPubkeyCache(pubkeyCachePath)
if err != nil {
Expand Down Expand Up @@ -2075,7 +2083,7 @@ func collectMonitoringMachine(

dbResult, err := GetSubsForEventFilter(
eventName,
"(created_epoch <= ? AND (last_sent_epoch < (? - ?) OR last_sent_epoch IS NULL))",
"(created_epoch <= ? AND (last_sent_epoch < (?::int - ?::int) OR last_sent_epoch IS NULL))", // ::int is required here otherwise the generated goose query throw an error
[]interface{}{epoch, epoch, epochWaitInBetween},
nil,
)
Expand Down

0 comments on commit d6ed420

Please sign in to comment.