From 5013243850ad320ec6db4f7e1f62cfe76c15f214 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Mon, 21 Oct 2024 12:00:06 +0000 Subject: [PATCH 1/9] feat(notifications): add webhook config retrieval --- backend/pkg/notification/queuing.go | 99 +++++++++++++++++++++++------ 1 file changed, 79 insertions(+), 20 deletions(-) diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 90cb78bca..7a86e0ea2 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -19,6 +19,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" "github.com/jmoiron/sqlx" + "github.com/lib/pq" "golang.org/x/text/cases" "golang.org/x/text/language" ) @@ -583,29 +584,87 @@ func QueuePushNotification(epoch uint64, notificationsByUserID types.Notificatio } func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserId, tx *sqlx.Tx) error { + var webhooks []types.UserWebhook + userIds := slices.Collect(maps.Keys(notificationsByUserID)) + err := db.FrontendWriterDB.Select(&webhooks, ` + SELECT + id, + user_id, + url, + retries, + event_names, + last_sent, + destination + FROM + users_webhooks + WHERE + user_id = $1 AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2) + `, pq.Array(userIds), types.WebhookNotificationChannel) + + if err != nil { + return fmt.Errorf("error quering users_webhooks, err: %w", err) + } + webhooksMap := make(map[uint64][]types.UserWebhook) + for _, w := range webhooks { + if _, exists := webhooksMap[w.UserID]; !exists { + webhooksMap[w.UserID] = make([]types.UserWebhook, 0) + } + webhooksMap[w.UserID] = append(webhooksMap[w.UserID], w) + } + + // now fetch the webhooks for each dashboard config + var dashboardWebhooks []struct { + UserId types.UserId `db:"user_id"` + DashboardID types.DashboardId `db:"dashboard_id"` + GroupId types.DashboardGroupId `db:"id"` + WebhookTarget string `db:"webhook_target"` + WebhookFormat string `db:"webhook_format"` + WebhookRetries uint64 `db:"webhook_retries"` + } + + err = db.ReaderDb.Select(&dashboardWebhooks, ` + SELECT + users_val_dashboards_groups.id, + dashboard_id, + webhook_target, + webhook_format, + webhook_retries + FROM users_val_dashboards_groups + LEFT JOIN users_val_dashboards ON users_val_dashboards_groups.dashboard_id = users_val_dashboards.id + WHERE users_val_dashboards.user_id = ANY($1) + AND webhook_target IS NOT NULL + AND webhook_format IS NOT NULL; + `, pq.Array(userIds)) + if err != nil { + return fmt.Errorf("error quering users_val_dashboards_groups, err: %w", err) + } + dashboardWebhookMap := make(map[types.UserId]map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) + for _, w := range dashboardWebhooks { + if _, exists := dashboardWebhookMap[w.UserId]; !exists { + dashboardWebhookMap[w.UserId] = make(map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) + } + if _, exists := dashboardWebhookMap[w.UserId][w.DashboardID]; !exists { + dashboardWebhookMap[w.UserId][w.DashboardID] = make(map[types.DashboardGroupId]types.UserWebhook) + } + + uw := types.UserWebhook{ + UserID: uint64(w.UserId), + Url: w.WebhookTarget, + Retries: w.WebhookRetries, + } + if w.WebhookFormat == "discord" { + uw.Destination = sql.NullString{String: "webhook_discord", Valid: true} + } else { + uw.Destination = sql.NullString{String: "webhook", Valid: true} + } + dashboardWebhookMap[w.UserId][w.DashboardID][w.GroupId] = uw + } + for userID, userNotifications := range notificationsByUserID { - var webhooks []types.UserWebhook - err := db.FrontendWriterDB.Select(&webhooks, ` - SELECT - id, - user_id, - url, - retries, - event_names, - last_sent, - destination - FROM - users_webhooks - WHERE - user_id = $1 AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2) - `, userID, types.WebhookNotificationChannel) - // continue if the user does not have a webhook - if err == sql.ErrNoRows { + webhooks, exists := webhooksMap[uint64(userID)] + if !exists { continue } - if err != nil { - return fmt.Errorf("error quering users_webhooks, err: %w", err) - } // webhook => [] notifications discordNotifMap := make(map[uint64][]types.TransitDiscordContent) notifs := make([]types.TransitWebhook, 0) From 7a208e1f98f87c7f44638931404bdd7c5a10ba1d Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Tue, 22 Oct 2024 07:22:14 +0000 Subject: [PATCH 2/9] feat(notifications): improve webhook handling --- backend/pkg/api/data_access/notifications.go | 2 +- backend/pkg/commons/types/frontend.go | 20 +- backend/pkg/notification/queuing.go | 206 +++++++++---------- 3 files changed, 111 insertions(+), 117 deletions(-) diff --git a/backend/pkg/api/data_access/notifications.go b/backend/pkg/api/data_access/notifications.go index 2bf6d619f..5532c27d7 100644 --- a/backend/pkg/api/data_access/notifications.go +++ b/backend/pkg/api/data_access/notifications.go @@ -81,7 +81,7 @@ const ( ValidatorDashboardEventPrefix string = "vdb" AccountDashboardEventPrefix string = "adb" - DiscordWebhookFormat string = "discord" + DiscordWebhookFormat string = "webhook_discord" GroupOfflineThresholdDefault float64 = 0.1 MaxCollateralThresholdDefault float64 = 1.0 diff --git a/backend/pkg/commons/types/frontend.go b/backend/pkg/commons/types/frontend.go index 0ef4bde69..bb51cf0e6 100644 --- a/backend/pkg/commons/types/frontend.go +++ b/backend/pkg/commons/types/frontend.go @@ -636,15 +636,17 @@ type Email struct { } type UserWebhook struct { - ID uint64 `db:"id" json:"id"` - UserID uint64 `db:"user_id" json:"-"` - Url string `db:"url" json:"url"` - Retries uint64 `db:"retries" json:"retries"` - LastSent sql.NullTime `db:"last_sent" json:"lastRetry"` - Response sql.NullString `db:"response" json:"response"` - Request sql.NullString `db:"request" json:"request"` - Destination sql.NullString `db:"destination" json:"destination"` - EventNames pq.StringArray `db:"event_names" json:"-"` + ID uint64 `db:"id" json:"id"` + UserID uint64 `db:"user_id" json:"-"` + Url string `db:"url" json:"url"` + Retries uint64 `db:"retries" json:"retries"` + LastSent sql.NullTime `db:"last_sent" json:"lastRetry"` + Response sql.NullString `db:"response" json:"response"` + Request sql.NullString `db:"request" json:"request"` + Destination sql.NullString `db:"destination" json:"destination"` + EventNames pq.StringArray `db:"event_names" json:"-"` + DashboardId uint64 `db:"dashboard_id" json:"dashboardId"` + DashboardGroupId uint64 `db:"dashboard_group_id" json:"dashboardGroupId"` } type UserWebhookSubscriptions struct { diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 7a86e0ea2..091acdfbb 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -3,7 +3,6 @@ package notification import ( "bytes" "compress/gzip" - "database/sql" "encoding/gob" "fmt" "html/template" @@ -613,22 +612,14 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI } // now fetch the webhooks for each dashboard config - var dashboardWebhooks []struct { - UserId types.UserId `db:"user_id"` - DashboardID types.DashboardId `db:"dashboard_id"` - GroupId types.DashboardGroupId `db:"id"` - WebhookTarget string `db:"webhook_target"` - WebhookFormat string `db:"webhook_format"` - WebhookRetries uint64 `db:"webhook_retries"` - } - - err = db.ReaderDb.Select(&dashboardWebhooks, ` + err = db.ReaderDb.Select(&webhooks, ` SELECT - users_val_dashboards_groups.id, - dashboard_id, - webhook_target, - webhook_format, - webhook_retries + users_val_dashboards_groups.id AS dashboard_group_id, + dashboard_id AS dashboard_id, + webhook_target AS url, + COALESCE(webhook_format, "webhook") AS destination, + webhook_retries AS retries, + webhook_last_sent AS last_sent FROM users_val_dashboards_groups LEFT JOIN users_val_dashboards ON users_val_dashboards_groups.dashboard_id = users_val_dashboards.id WHERE users_val_dashboards.user_id = ANY($1) @@ -639,25 +630,15 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI return fmt.Errorf("error quering users_val_dashboards_groups, err: %w", err) } dashboardWebhookMap := make(map[types.UserId]map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) - for _, w := range dashboardWebhooks { - if _, exists := dashboardWebhookMap[w.UserId]; !exists { - dashboardWebhookMap[w.UserId] = make(map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) + for _, w := range webhooks { + if _, exists := dashboardWebhookMap[types.UserId(w.UserID)]; !exists { + dashboardWebhookMap[types.UserId(w.UserID)] = make(map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) } - if _, exists := dashboardWebhookMap[w.UserId][w.DashboardID]; !exists { - dashboardWebhookMap[w.UserId][w.DashboardID] = make(map[types.DashboardGroupId]types.UserWebhook) + if _, exists := dashboardWebhookMap[types.UserId(w.UserID)][types.DashboardId(w.DashboardId)]; !exists { + dashboardWebhookMap[types.UserId(w.UserID)][types.DashboardId(w.DashboardId)] = make(map[types.DashboardGroupId]types.UserWebhook) } - uw := types.UserWebhook{ - UserID: uint64(w.UserId), - Url: w.WebhookTarget, - Retries: w.WebhookRetries, - } - if w.WebhookFormat == "discord" { - uw.Destination = sql.NullString{String: "webhook_discord", Valid: true} - } else { - uw.Destination = sql.NullString{String: "webhook", Valid: true} - } - dashboardWebhookMap[w.UserId][w.DashboardID][w.GroupId] = uw + dashboardWebhookMap[types.UserId(w.UserID)][types.DashboardId(w.DashboardId)][types.DashboardGroupId(w.DashboardGroupId)] = w } for userID, userNotifications := range notificationsByUserID { @@ -671,90 +652,101 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI // send the notifications to each registered webhook for _, w := range webhooks { for dashboardId, notificationsPerDashboard := range userNotifications { - if dashboardId != 0 { // disable webhooks for dashboard notifications for now - continue - } for _, notificationsPerGroup := range notificationsPerDashboard { - for event, notifications := range notificationsPerGroup { - // check if the webhook is subscribed to the type of event - eventSubscribed := slices.Contains(w.EventNames, string(event)) - - if eventSubscribed { - if len(notifications) > 0 { - // reset Retries - if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) { - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID) - if err != nil { - log.Error(err, "error updating users_webhooks table; setting retries to zero", 0) + if dashboardId != 0 { // disable webhooks for dashboard notifications for now + // retrieve the associated webhook config from the map + if _, exists := dashboardWebhookMap[types.UserId(userID)]; !exists { + continue + } + if _, exists := dashboardWebhookMap[types.UserId(userID)][types.DashboardId(dashboardId)]; !exists { + continue + } + if _, exists := dashboardWebhookMap[types.UserId(userID)][types.DashboardId(dashboardId)][0]; !exists { + continue + } + w = dashboardWebhookMap[types.UserId(userID)][types.DashboardId(dashboardId)][0] + } else { + for event, notifications := range notificationsPerGroup { + // check if the webhook is subscribed to the type of event + eventSubscribed := slices.Contains(w.EventNames, string(event)) + + if eventSubscribed { + if len(notifications) > 0 { + // reset Retries + if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID) + if err != nil { + log.Error(err, "error updating users_webhooks table; setting retries to zero", 0) + continue + } + } else if w.Retries > 5 && !w.LastSent.Valid { + log.Warnf("webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) continue } - } else if w.Retries > 5 && !w.LastSent.Valid { - log.Warnf("webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) - continue - } - - if w.Retries >= 5 { - // early return - continue - } - } - for _, n := range notifications { - if w.Destination.Valid && w.Destination.String == "webhook_discord" { - if _, exists := discordNotifMap[w.ID]; !exists { - discordNotifMap[w.ID] = make([]types.TransitDiscordContent, 0) - } - l_notifs := len(discordNotifMap[w.ID]) - if l_notifs == 0 || len(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds) >= 10 { - discordNotifMap[w.ID] = append(discordNotifMap[w.ID], types.TransitDiscordContent{ - Webhook: w, - DiscordRequest: types.DiscordReq{ - Username: utils.Config.Frontend.SiteDomain, - }, - UserId: userID, - }) - l_notifs++ + if w.Retries >= 5 { + // early return + continue } + } - fields := []types.DiscordEmbedField{ - { - Name: "Epoch", - Value: fmt.Sprintf("[%[1]v](https://%[2]s/%[1]v)", n.GetEpoch(), utils.Config.Frontend.SiteDomain+"/epoch"), - Inline: false, - }, - } + for _, n := range notifications { + if w.Destination.Valid && w.Destination.String == "webhook_discord" { + if _, exists := discordNotifMap[w.ID]; !exists { + discordNotifMap[w.ID] = make([]types.TransitDiscordContent, 0) + } + l_notifs := len(discordNotifMap[w.ID]) + if l_notifs == 0 || len(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds) >= 10 { + discordNotifMap[w.ID] = append(discordNotifMap[w.ID], types.TransitDiscordContent{ + Webhook: w, + DiscordRequest: types.DiscordReq{ + Username: utils.Config.Frontend.SiteDomain, + }, + UserId: userID, + }) + l_notifs++ + } - if strings.HasPrefix(string(n.GetEventName()), "monitoring") || n.GetEventName() == types.EthClientUpdateEventName || n.GetEventName() == types.RocketpoolCollateralMaxReachedEventName || n.GetEventName() == types.RocketpoolCollateralMinReachedEventName { - fields = append(fields, - types.DiscordEmbedField{ - Name: "Target", - Value: fmt.Sprintf("%v", n.GetEventFilter()), + fields := []types.DiscordEmbedField{ + { + Name: "Epoch", + Value: fmt.Sprintf("[%[1]v](https://%[2]s/%[1]v)", n.GetEpoch(), utils.Config.Frontend.SiteDomain+"/epoch"), Inline: false, - }) - } - discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds = append(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds, types.DiscordEmbed{ - Type: "rich", - Color: "16745472", - Description: n.GetLegacyInfo(), - Title: n.GetLegacyTitle(), - Fields: fields, - }) - } else { - notifs = append(notifs, types.TransitWebhook{ - Channel: w.Destination.String, - Content: types.TransitWebhookContent{ - Webhook: w, - Event: types.WebhookEvent{ - Network: utils.GetNetwork(), - Name: string(n.GetEventName()), - Title: n.GetLegacyTitle(), - Description: n.GetLegacyInfo(), - Epoch: n.GetEpoch(), - Target: n.GetEventFilter(), }, - UserId: userID, - }, - }) + } + + if strings.HasPrefix(string(n.GetEventName()), "monitoring") || n.GetEventName() == types.EthClientUpdateEventName || n.GetEventName() == types.RocketpoolCollateralMaxReachedEventName || n.GetEventName() == types.RocketpoolCollateralMinReachedEventName { + fields = append(fields, + types.DiscordEmbedField{ + Name: "Target", + Value: fmt.Sprintf("%v", n.GetEventFilter()), + Inline: false, + }) + } + discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds = append(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds, types.DiscordEmbed{ + Type: "rich", + Color: "16745472", + Description: n.GetLegacyInfo(), + Title: n.GetLegacyTitle(), + Fields: fields, + }) + } else { + notifs = append(notifs, types.TransitWebhook{ + Channel: w.Destination.String, + Content: types.TransitWebhookContent{ + Webhook: w, + Event: types.WebhookEvent{ + Network: utils.GetNetwork(), + Name: string(n.GetEventName()), + Title: n.GetLegacyTitle(), + Description: n.GetLegacyInfo(), + Epoch: n.GetEpoch(), + Target: n.GetEventFilter(), + }, + UserId: userID, + }, + }) + } } } } From 4902c9b80f12263840ff9c266a00136fe85b69ac Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Thu, 24 Oct 2024 08:25:54 +0000 Subject: [PATCH 3/9] feat(notifications): wip dashboard discord webhooks --- backend/pkg/notification/queuing.go | 315 ++++++++++++++++++---------- backend/pkg/notification/sending.go | 11 +- 2 files changed, 214 insertions(+), 112 deletions(-) diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 518687b44..fb03b41b3 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -644,7 +644,7 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI FROM users_webhooks WHERE - user_id = $1 AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2) + user_id = ANY($1) AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2) `, pq.Array(userIds), types.WebhookNotificationChannel) if err != nil { @@ -664,20 +664,24 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI users_val_dashboards_groups.id AS dashboard_group_id, dashboard_id AS dashboard_id, webhook_target AS url, - COALESCE(webhook_format, "webhook") AS destination, + COALESCE(webhook_format, 'webhook') AS destination, webhook_retries AS retries, webhook_last_sent AS last_sent FROM users_val_dashboards_groups LEFT JOIN users_val_dashboards ON users_val_dashboards_groups.dashboard_id = users_val_dashboards.id WHERE users_val_dashboards.user_id = ANY($1) AND webhook_target IS NOT NULL - AND webhook_format IS NOT NULL; - `, pq.Array(userIds)) + AND webhook_format IS NOT NULL + AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2); + `, pq.Array(userIds), types.WebhookNotificationChannel) if err != nil { return fmt.Errorf("error quering users_val_dashboards_groups, err: %w", err) } dashboardWebhookMap := make(map[types.UserId]map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) for _, w := range webhooks { + if w.Destination.Valid && w.Destination.String == "discord" { + w.Destination.String = "webhook_discord" + } if _, exists := dashboardWebhookMap[types.UserId(w.UserID)]; !exists { dashboardWebhookMap[types.UserId(w.UserID)] = make(map[types.DashboardId]map[types.DashboardGroupId]types.UserWebhook) } @@ -689,136 +693,227 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI } for userID, userNotifications := range notificationsByUserID { - webhooks, exists := webhooksMap[uint64(userID)] - if !exists { - continue - } - // webhook => [] notifications discordNotifMap := make(map[uint64][]types.TransitDiscordContent) notifs := make([]types.TransitWebhook, 0) - // send the notifications to each registered webhook - for _, w := range webhooks { - for dashboardId, notificationsPerDashboard := range userNotifications { - for _, notificationsPerGroup := range notificationsPerDashboard { - if dashboardId != 0 { // disable webhooks for dashboard notifications for now - // retrieve the associated webhook config from the map - if _, exists := dashboardWebhookMap[types.UserId(userID)]; !exists { - continue - } - if _, exists := dashboardWebhookMap[types.UserId(userID)][types.DashboardId(dashboardId)]; !exists { - continue - } - if _, exists := dashboardWebhookMap[types.UserId(userID)][types.DashboardId(dashboardId)][0]; !exists { + webhooks, exists := webhooksMap[uint64(userID)] + if exists { + // webhook => [] notifications + // send the notifications to each registered webhook + for _, w := range webhooks { + for dashboardId, notificationsPerDashboard := range userNotifications { + for _, notificationsPerGroup := range notificationsPerDashboard { + if dashboardId != 0 { continue - } - w = dashboardWebhookMap[types.UserId(userID)][types.DashboardId(dashboardId)][0] - } else { - for event, notifications := range notificationsPerGroup { - // check if the webhook is subscribed to the type of event - eventSubscribed := slices.Contains(w.EventNames, string(event)) - - if eventSubscribed { - if len(notifications) > 0 { - // reset Retries - if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) { - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID) - if err != nil { - log.Error(err, "error updating users_webhooks table; setting retries to zero", 0) + } else { + for event, notifications := range notificationsPerGroup { + // check if the webhook is subscribed to the type of event + eventSubscribed := slices.Contains(w.EventNames, string(event)) + + if eventSubscribed { + if len(notifications) > 0 { + // reset Retries + if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID) + if err != nil { + log.Error(err, "error updating users_webhooks table; setting retries to zero", 0) + continue + } + } else if w.Retries > 5 && !w.LastSent.Valid { + log.Warnf("webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) continue } - } else if w.Retries > 5 && !w.LastSent.Valid { - log.Warnf("webhook '%v' has more than 5 retries and does not have a valid last_sent timestamp", w.Url) - continue - } - if w.Retries >= 5 { - // early return - continue + if w.Retries >= 5 { + // early return + continue + } } - } - for _, n := range notifications { - if w.Destination.Valid && w.Destination.String == "webhook_discord" { - if _, exists := discordNotifMap[w.ID]; !exists { - discordNotifMap[w.ID] = make([]types.TransitDiscordContent, 0) - } - l_notifs := len(discordNotifMap[w.ID]) - if l_notifs == 0 || len(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds) >= 10 { - discordNotifMap[w.ID] = append(discordNotifMap[w.ID], types.TransitDiscordContent{ - Webhook: w, - DiscordRequest: types.DiscordReq{ - Username: utils.Config.Frontend.SiteDomain, + for _, n := range notifications { + if w.Destination.Valid && w.Destination.String == "webhook_discord" { + if _, exists := discordNotifMap[w.ID]; !exists { + discordNotifMap[w.ID] = make([]types.TransitDiscordContent, 0) + } + l_notifs := len(discordNotifMap[w.ID]) + if l_notifs == 0 || len(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds) >= 10 { + discordNotifMap[w.ID] = append(discordNotifMap[w.ID], types.TransitDiscordContent{ + Webhook: w, + DiscordRequest: types.DiscordReq{ + Username: utils.Config.Frontend.SiteDomain, + }, + UserId: userID, + }) + l_notifs++ + } + + fields := []types.DiscordEmbedField{ + { + Name: "Epoch", + Value: fmt.Sprintf("[%[1]v](https://%[2]s/%[1]v)", n.GetEpoch(), utils.Config.Frontend.SiteDomain+"/epoch"), + Inline: false, + }, + } + + if strings.HasPrefix(string(n.GetEventName()), "monitoring") || n.GetEventName() == types.EthClientUpdateEventName || n.GetEventName() == types.RocketpoolCollateralMaxReachedEventName || n.GetEventName() == types.RocketpoolCollateralMinReachedEventName { + fields = append(fields, + types.DiscordEmbedField{ + Name: "Target", + Value: fmt.Sprintf("%v", n.GetEventFilter()), + Inline: false, + }) + } + discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds = append(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds, types.DiscordEmbed{ + Type: "rich", + Color: "16745472", + Description: n.GetLegacyInfo(), + Title: n.GetLegacyTitle(), + Fields: fields, + }) + } else { + notifs = append(notifs, types.TransitWebhook{ + Channel: w.Destination.String, + Content: types.TransitWebhookContent{ + Webhook: w, + Event: types.WebhookEvent{ + Network: utils.GetNetwork(), + Name: string(n.GetEventName()), + Title: n.GetLegacyTitle(), + Description: n.GetLegacyInfo(), + Epoch: n.GetEpoch(), + Target: n.GetEventFilter(), + }, + UserId: userID, }, - UserId: userID, }) - l_notifs++ } + } + } + } + } + } + } + } + } + // process dashboard webhooks + for dashboardId, notificationsPerDashboard := range userNotifications { + if dashboardId == 0 { + continue + } + for dashboardGroupId, notificationsPerGroup := range notificationsPerDashboard { + // retrieve the associated webhook config from the map + if _, exists := dashboardWebhookMap[userID]; !exists { + continue + } + if _, exists := dashboardWebhookMap[userID][dashboardId]; !exists { + continue + } + if _, exists := dashboardWebhookMap[userID][dashboardId][dashboardGroupId]; !exists { + continue + } + w := dashboardWebhookMap[userID][dashboardId][dashboardGroupId] - fields := []types.DiscordEmbedField{ - { - Name: "Epoch", - Value: fmt.Sprintf("[%[1]v](https://%[2]s/%[1]v)", n.GetEpoch(), utils.Config.Frontend.SiteDomain+"/epoch"), - Inline: false, - }, - } + // reset Retries + if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) { + _, err = db.WriterDb.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = 0 WHERE id = $1 AND dashboard_id = $2;`, dashboardGroupId, dashboardId) + if err != nil { + log.Error(err, "error updating users_webhooks table; setting retries to zero", 0) + continue + } + } else if w.Retries > 5 && !w.LastSent.Valid { + log.Warnf("webhook '%v' for dashboard %d and group %d has more than 5 retries and does not have a valid last_sent timestamp", w.Url, dashboardId, dashboardGroupId) + continue + } - if strings.HasPrefix(string(n.GetEventName()), "monitoring") || n.GetEventName() == types.EthClientUpdateEventName || n.GetEventName() == types.RocketpoolCollateralMaxReachedEventName || n.GetEventName() == types.RocketpoolCollateralMinReachedEventName { - fields = append(fields, - types.DiscordEmbedField{ - Name: "Target", - Value: fmt.Sprintf("%v", n.GetEventFilter()), - Inline: false, - }) - } - discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds = append(discordNotifMap[w.ID][l_notifs-1].DiscordRequest.Embeds, types.DiscordEmbed{ - Type: "rich", - Color: "16745472", - Description: n.GetLegacyInfo(), - Title: n.GetLegacyTitle(), - Fields: fields, - }) - } else { - notifs = append(notifs, types.TransitWebhook{ - Channel: w.Destination.String, - Content: types.TransitWebhookContent{ - Webhook: w, - Event: types.WebhookEvent{ - Network: utils.GetNetwork(), - Name: string(n.GetEventName()), - Title: n.GetLegacyTitle(), - Description: n.GetLegacyInfo(), - Epoch: n.GetEpoch(), - Target: n.GetEventFilter(), - }, - UserId: userID, - }, - }) - } + if w.Retries >= 5 { + // early return + continue + } + + for event, notifications := range notificationsPerGroup { + if w.Destination.Valid && w.Destination.String == "webhook_discord" { + content := types.TransitDiscordContent{ + Webhook: w, + UserId: userID, + DiscordRequest: types.DiscordReq{ + Username: utils.Config.Frontend.SiteDomain, + }, + } + + totalBlockReward := float64(0) + details := "" + if event == types.ValidatorExecutedProposalEventName { + for _, n := range notifications { + proposalNotification, ok := n.(*ValidatorProposalNotification) + if !ok { + log.Error(fmt.Errorf("error casting proposal notification"), "", 0) + continue } + totalBlockReward += proposalNotification.Reward + + details += fmt.Sprintf("%s\n", n.GetInfo(types.NotifciationFormatMarkdown)) } } + + count := len(notifications) + summary := "" + plural := "" + if count > 1 { + plural = "s" + } + switch event { + case types.RocketpoolCollateralMaxReachedEventName, types.RocketpoolCollateralMinReachedEventName: + summary += fmt.Sprintf("%s: %d node%s", types.EventLabel[event], count, plural) + case types.TaxReportEventName, types.NetworkLivenessIncreasedEventName: + summary += fmt.Sprintf("%s: %d event%s", types.EventLabel[event], count, plural) + case types.EthClientUpdateEventName: + summary += fmt.Sprintf("%s: %d client%s", types.EventLabel[event], count, plural) + case types.MonitoringMachineCpuLoadEventName, types.MonitoringMachineMemoryUsageEventName, types.MonitoringMachineDiskAlmostFullEventName, types.MonitoringMachineOfflineEventName: + summary += fmt.Sprintf("%s: %d machine%s", types.EventLabel[event], count, plural) + case types.ValidatorExecutedProposalEventName: + summary += fmt.Sprintf("%s: %d validator%s, Reward: %.3f ETH", types.EventLabel[event], count, plural, totalBlockReward) + case types.ValidatorGroupEfficiencyEventName: + summary += fmt.Sprintf("%s: %d group%s", types.EventLabel[event], count, plural) + default: + summary += fmt.Sprintf("%s: %d validator%s", types.EventLabel[event], count, plural) + } + content.DiscordRequest.Content = summary + "\n" + details + if _, exists := discordNotifMap[w.ID]; !exists { + discordNotifMap[w.ID] = make([]types.TransitDiscordContent, 0) + } + log.Infof("adding discord notification for user %d, dashboard %d, group %d and type %s", userID, dashboardId, dashboardGroupId, event) + + discordNotifMap[w.ID] = append(discordNotifMap[w.ID], content) + } else { + // TODO: implement } } } } + // process notifs - for _, n := range notifs { - _, err = tx.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content) - if err != nil { - log.Error(err, "error inserting into webhooks_queue", 0) - } else { - metrics.NotificationsQueued.WithLabelValues(n.Channel, n.Content.Event.Name).Inc() + if len(notifs) > 0 { + log.Infof("queueing %v webhooks notifications", len(notifs)) + for _, n := range notifs { + _, err = tx.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content) + if err != nil { + log.Error(err, "error inserting into webhooks_queue", 0) + } else { + metrics.NotificationsQueued.WithLabelValues(n.Channel, n.Content.Event.Name).Inc() + } } } // process discord notifs - for _, dNotifs := range discordNotifMap { - for _, n := range dNotifs { - _, err = tx.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n) - if err != nil { - log.Error(err, "error inserting into webhooks_queue (discord)", 0) - continue - } else { - metrics.NotificationsQueued.WithLabelValues("webhook_discord", "multi").Inc() + if len(discordNotifMap) > 0 { + log.Infof("queueing %v discord notifications", len(discordNotifMap)) + for _, dNotifs := range discordNotifMap { + for _, n := range dNotifs { + _, err = tx.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n) + if err != nil { + log.Error(err, "error inserting into webhooks_queue (discord)", 0) + continue + } else { + metrics.NotificationsQueued.WithLabelValues("webhook_discord", "multi").Inc() + } } } } diff --git a/backend/pkg/notification/sending.go b/backend/pkg/notification/sending.go index 5d280bfae..440333333 100644 --- a/backend/pkg/notification/sending.go +++ b/backend/pkg/notification/sending.go @@ -362,7 +362,11 @@ func sendDiscordNotifications() error { go func(webhook types.UserWebhook, reqs []types.TransitDiscord) { defer func() { // update retries counters in db based on end result - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID) + if webhook.DashboardId == 0 && webhook.DashboardGroupId == 0 { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID) + } else { + _, err = db.WriterDb.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = $1, webhook_last_sent = now() WHERE id = $2 AND dashboard_id = $3;`, webhook.Retries, webhook.DashboardGroupId, webhook.DashboardId) + } if err != nil { log.Warnf("failed to update retries counter to %v for webhook %v: %v", webhook.Retries, webhook.ID, err) } @@ -398,6 +402,7 @@ func sendDiscordNotifications() error { continue // skip } + log.Infof("sending discord webhook request to %s with: %v", webhook.Url, reqs[i].Content.DiscordRequest) resp, err := client.Post(webhook.Url, "application/json", reqBody) if err != nil { log.Warnf("failed sending discord webhook request %v: %v", webhook.ID, err) @@ -424,7 +429,9 @@ func sendDiscordNotifications() error { if resp.StatusCode != http.StatusOK { log.WarnWithFields(map[string]interface{}{"errResp.Body": utils.FirstN(errResp.Body, 1000), "webhook.Url": webhook.Url}, "error pushing discord webhook") } - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp) + if webhook.DashboardId == 0 && webhook.DashboardGroupId == 0 { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp) + } if err != nil { log.Error(err, "error storing failure data in users_webhooks table", 0) } From 9e87e31ac5417a3772b975a2ec1074307b2edb63 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Fri, 25 Oct 2024 06:36:48 +0000 Subject: [PATCH 4/9] feat(notifications): working discord notifications --- backend/cmd/misc/main.go | 33 ++++++++++++++++------- backend/pkg/notification/collection.go | 4 +-- backend/pkg/notification/notifications.go | 15 ++++++++++- backend/pkg/notification/queuing.go | 33 ++++++++++++++++++++--- 4 files changed, 69 insertions(+), 16 deletions(-) diff --git a/backend/cmd/misc/main.go b/backend/cmd/misc/main.go index c567a0371..5134cc5e9 100644 --- a/backend/cmd/misc/main.go +++ b/backend/cmd/misc/main.go @@ -555,7 +555,7 @@ func Run() { func collectNotifications(startEpoch uint64) error { epoch := startEpoch - notifications, err := notification.GetNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch) + notifications, err := notification.GetHeadNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch) if err != nil { return err } @@ -565,19 +565,34 @@ func collectNotifications(startEpoch uint64) error { spew.Dump(notifications[0]) } - emails, err := notification.RenderEmailsForUserEvents(0, notifications) + tx, err := db.WriterDb.Beginx() if err != nil { return err } + defer tx.Rollback() - for _, email := range emails { - // if email.Address == "" { - log.Infof("to: %v", email.Address) - log.Infof("subject: %v", email.Subject) - log.Infof("body: %v", email.Email.Body) - log.Info("-----") - // } + err = notification.QueueWebhookNotifications(notifications, tx) + if err != nil { + return err } + err = tx.Commit() + if err != nil { + return err + } + + // emails, err := notification.RenderEmailsForUserEvents(0, notifications) + // if err != nil { + // return err + // } + + // for _, email := range emails { + // // if email.Address == "" { + // log.Infof("to: %v", email.Address) + // log.Infof("subject: %v", email.Subject) + // log.Infof("body: %v", email.Email.Body) + // log.Info("-----") + // // } + // } // pushMessages, err := notification.RenderPushMessagesForUserEvents(0, notifications) // if err != nil { diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go index ba6b0568f..2c703e583 100644 --- a/backend/pkg/notification/collection.go +++ b/backend/pkg/notification/collection.go @@ -161,7 +161,7 @@ func notificationCollector() { log.Infof("collecting notifications for epoch %v", epoch) // Network DB Notifications (network related) - notifications, err := collectNotifications(epoch, mc) + notifications, err := collectNotifications(mc, epoch) if err != nil { log.Error(err, "error collection notifications", 0) @@ -280,7 +280,7 @@ func collectUpcomingBlockProposalNotifications(notificationsByUserID types.Notif return nil } -func collectNotifications(epoch uint64, mc modules.ModuleContext) (types.NotificationsPerUserId, error) { +func collectNotifications(mc modules.ModuleContext, epoch uint64) (types.NotificationsPerUserId, error) { notificationsByUserID := types.NotificationsPerUserId{} start := time.Now() var err error diff --git a/backend/pkg/notification/notifications.go b/backend/pkg/notification/notifications.go index 7bb71162a..ebddb2aad 100644 --- a/backend/pkg/notification/notifications.go +++ b/backend/pkg/notification/notifications.go @@ -17,7 +17,20 @@ func GetNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.Notif if err != nil { log.Fatal(err, "error initializing pubkey cache path for notifications", 0) } - return collectNotifications(epoch, mc) + return collectNotifications(mc, epoch) +} + +func GetHeadNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.NotificationsPerUserId, error) { + mc, err := modules.GetModuleContext() + if err != nil { + log.Fatal(err, "error getting module context", 0) + } + + err = initPubkeyCache(pubkeyCachePath) + if err != nil { + log.Fatal(err, "error initializing pubkey cache path for notifications", 0) + } + return collectHeadNotifications(mc, epoch) } // Used for isolated testing diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index fb03b41b3..0822a53f4 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -661,6 +661,7 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI // now fetch the webhooks for each dashboard config err = db.ReaderDb.Select(&webhooks, ` SELECT + users_val_dashboards.user_id AS user_id, users_val_dashboards_groups.id AS dashboard_group_id, dashboard_id AS dashboard_id, webhook_target AS url, @@ -840,18 +841,29 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI } totalBlockReward := float64(0) + epoch := uint64(0) details := "" - if event == types.ValidatorExecutedProposalEventName { - for _, n := range notifications { + i := 0 + for _, n := range notifications { + if event == types.ValidatorExecutedProposalEventName { proposalNotification, ok := n.(*ValidatorProposalNotification) if !ok { log.Error(fmt.Errorf("error casting proposal notification"), "", 0) continue } totalBlockReward += proposalNotification.Reward - + } + if i <= 10 { details += fmt.Sprintf("%s\n", n.GetInfo(types.NotifciationFormatMarkdown)) } + i++ + if i == 11 { + details += fmt.Sprintf("... and %d more notifications\n", len(notifications)-i) + continue + } + if epoch == 0 { + epoch = n.GetEpoch() + } } count := len(notifications) @@ -876,7 +888,20 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI default: summary += fmt.Sprintf("%s: %d validator%s", types.EventLabel[event], count, plural) } - content.DiscordRequest.Content = summary + "\n" + details + content.DiscordRequest.Embeds = append(content.DiscordRequest.Embeds, types.DiscordEmbed{ + Type: "rich", + Color: "16745472", + Description: details, + Title: summary, + Fields: []types.DiscordEmbedField{ + { + Name: "Epoch", + Value: fmt.Sprintf("[%[1]v](https://%[2]s/epoch/%[1]v)", epoch, utils.Config.Frontend.SiteDomain), + Inline: false, + }, + }, + }) + if _, exists := discordNotifMap[w.ID]; !exists { discordNotifMap[w.ID] = make([]types.TransitDiscordContent, 0) } From ff9221f99b464cc92212a03b0b75cc37cc606630 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Fri, 25 Oct 2024 07:11:03 +0000 Subject: [PATCH 5/9] feat(notifications): implement normal webhook handling --- backend/pkg/commons/types/frontend.go | 5 ++-- backend/pkg/notification/collection.go | 8 +++---- backend/pkg/notification/queuing.go | 32 ++++++++++++++++++++++---- 3 files changed, 35 insertions(+), 10 deletions(-) diff --git a/backend/pkg/commons/types/frontend.go b/backend/pkg/commons/types/frontend.go index 0089ed96b..b66a02f1a 100644 --- a/backend/pkg/commons/types/frontend.go +++ b/backend/pkg/commons/types/frontend.go @@ -549,8 +549,9 @@ type TransitWebhook struct { type TransitWebhookContent struct { Webhook UserWebhook - Event WebhookEvent `json:"event"` - UserId UserId `json:"userId"` + Event *WebhookEvent `json:"event,omitempty"` + Events []*WebhookEvent `json:"events,omitempty"` + UserId UserId `json:"userId"` } type WebhookEvent struct { diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go index 2c703e583..56929acf7 100644 --- a/backend/pkg/notification/collection.go +++ b/backend/pkg/notification/collection.go @@ -230,10 +230,10 @@ func collectUpcomingBlockProposalNotifications(notificationsByUserID types.Notif nextEpoch := headEpoch + 1 log.Infof("collecting upcoming block proposal notifications for epoch %v (head epoch is %d)", nextEpoch, headEpoch) - if utils.EpochToTime(nextEpoch).Before(time.Now()) { - log.Error(fmt.Errorf("error upcoming block proposal notifications for epoch %v are already in the past", nextEpoch), "", 0) - return nil - } + // if utils.EpochToTime(nextEpoch).Before(time.Now()) { + // log.Error(fmt.Errorf("error upcoming block proposal notifications for epoch %v are already in the past", nextEpoch), "", 0) + // return nil + // } assignments, err := mc.CL.GetPropoalAssignments(nextEpoch) if err != nil { diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 0822a53f4..7ca956298 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -775,7 +775,7 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI Channel: w.Destination.String, Content: types.TransitWebhookContent{ Webhook: w, - Event: types.WebhookEvent{ + Event: &types.WebhookEvent{ Network: utils.GetNetwork(), Name: string(n.GetEventName()), Title: n.GetLegacyTitle(), @@ -908,8 +908,26 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI log.Infof("adding discord notification for user %d, dashboard %d, group %d and type %s", userID, dashboardId, dashboardGroupId, event) discordNotifMap[w.ID] = append(discordNotifMap[w.ID], content) - } else { - // TODO: implement + } else if w.Destination.Valid && w.Destination.String == "webhook" { + events := []*types.WebhookEvent{} + for _, n := range notifications { + events = append(events, &types.WebhookEvent{ + Network: utils.GetNetwork(), + Name: string(n.GetEventName()), + Title: n.GetTitle(), + Description: n.GetInfo(types.NotifciationFormatText), + Epoch: n.GetEpoch(), + Target: n.GetEventFilter(), + }) + } + notifs = append(notifs, types.TransitWebhook{ + Channel: w.Destination.String, + Content: types.TransitWebhookContent{ + Webhook: w, + Events: events, + UserId: userID, + }, + }) } } } @@ -923,7 +941,13 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI if err != nil { log.Error(err, "error inserting into webhooks_queue", 0) } else { - metrics.NotificationsQueued.WithLabelValues(n.Channel, n.Content.Event.Name).Inc() + if n.Content.Event != nil { + metrics.NotificationsQueued.WithLabelValues(n.Channel, n.Content.Event.Name).Inc() + } else { + for _, e := range n.Content.Events { + metrics.NotificationsQueued.WithLabelValues(n.Channel, e.Name).Inc() + } + } } } } From 6832664d57ac0dda5aec000bb6c2f42cdd551a82 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Fri, 25 Oct 2024 07:28:33 +0000 Subject: [PATCH 6/9] feat(notifications): improve webhook data processing --- backend/pkg/notification/collection.go | 8 +-- backend/pkg/notification/queuing.go | 77 ++++++++++++++++---------- 2 files changed, 51 insertions(+), 34 deletions(-) diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go index 56929acf7..2c703e583 100644 --- a/backend/pkg/notification/collection.go +++ b/backend/pkg/notification/collection.go @@ -230,10 +230,10 @@ func collectUpcomingBlockProposalNotifications(notificationsByUserID types.Notif nextEpoch := headEpoch + 1 log.Infof("collecting upcoming block proposal notifications for epoch %v (head epoch is %d)", nextEpoch, headEpoch) - // if utils.EpochToTime(nextEpoch).Before(time.Now()) { - // log.Error(fmt.Errorf("error upcoming block proposal notifications for epoch %v are already in the past", nextEpoch), "", 0) - // return nil - // } + if utils.EpochToTime(nextEpoch).Before(time.Now()) { + log.Error(fmt.Errorf("error upcoming block proposal notifications for epoch %v are already in the past", nextEpoch), "", 0) + return nil + } assignments, err := mc.CL.GetPropoalAssignments(nextEpoch) if err != nil { diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 7ca956298..6787a03b3 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -693,9 +693,10 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI dashboardWebhookMap[types.UserId(w.UserID)][types.DashboardId(w.DashboardId)][types.DashboardGroupId(w.DashboardGroupId)] = w } + discordNotifMap := make(map[uint64][]types.TransitDiscordContent) + notifs := make([]types.TransitWebhook, 0) + for userID, userNotifications := range notificationsByUserID { - discordNotifMap := make(map[uint64][]types.TransitDiscordContent) - notifs := make([]types.TransitWebhook, 0) webhooks, exists := webhooksMap[uint64(userID)] if exists { // webhook => [] notifications @@ -932,41 +933,57 @@ func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI } } } + } - // process notifs - if len(notifs) > 0 { - log.Infof("queueing %v webhooks notifications", len(notifs)) - for _, n := range notifs { - _, err = tx.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content) - if err != nil { - log.Error(err, "error inserting into webhooks_queue", 0) - } else { - if n.Content.Event != nil { - metrics.NotificationsQueued.WithLabelValues(n.Channel, n.Content.Event.Name).Inc() - } else { - for _, e := range n.Content.Events { - metrics.NotificationsQueued.WithLabelValues(n.Channel, e.Name).Inc() - } - } + // process notifs + log.Infof("queueing %v webhooks notifications", len(notifs)) + if len(notifs) > 0 { + type insertData struct { + Content types.TransitWebhookContent `db:"content"` + } + insertRows := make([]insertData, 0, len(notifs)) + for _, n := range notifs { + if n.Content.Event != nil { + metrics.NotificationsQueued.WithLabelValues(n.Channel, n.Content.Event.Name).Inc() + } else { + for _, e := range n.Content.Events { + metrics.NotificationsQueued.WithLabelValues(n.Channel, e.Name).Inc() } } + + insertRows = append(insertRows, insertData{ + Content: n.Content, + }) } - // process discord notifs - if len(discordNotifMap) > 0 { - log.Infof("queueing %v discord notifications", len(discordNotifMap)) - for _, dNotifs := range discordNotifMap { - for _, n := range dNotifs { - _, err = tx.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n) - if err != nil { - log.Error(err, "error inserting into webhooks_queue (discord)", 0) - continue - } else { - metrics.NotificationsQueued.WithLabelValues("webhook_discord", "multi").Inc() - } - } + _, err = tx.NamedExec(`INSERT INTO notification_queue (created, channel, content) VALUES (NOW(), 'webhook', :content)`, insertRows) + if err != nil { + return fmt.Errorf("error writing transit push to db: %w", err) + } + } + + // process discord notifs + log.Infof("queueing %v discord notifications", len(discordNotifMap)) + if len(discordNotifMap) > 0 { + type insertData struct { + Content types.TransitDiscordContent `db:"content"` + } + insertRows := make([]insertData, 0, len(discordNotifMap)) + + for _, dNotifs := range discordNotifMap { + for _, n := range dNotifs { + insertRows = append(insertRows, insertData{ + Content: n, + }) + metrics.NotificationsQueued.WithLabelValues("webhook_discord", "multi").Inc() } } + + _, err = tx.NamedExec(`INSERT INTO notification_queue (created, channel, content) VALUES (NOW(), 'webhook_discord', :content)`, insertRows) + if err != nil { + return fmt.Errorf("error writing transit push to db: %w", err) + } } + return nil } From 1553236aa20c8543a66a5a90c3848a92f676ff15 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Fri, 25 Oct 2024 08:08:56 +0000 Subject: [PATCH 7/9] chore(notifications): remove debug logging --- backend/pkg/notification/sending.go | 42 +++++++++++++++++++++-------- 1 file changed, 31 insertions(+), 11 deletions(-) diff --git a/backend/pkg/notification/sending.go b/backend/pkg/notification/sending.go index 440333333..95c7101fb 100644 --- a/backend/pkg/notification/sending.go +++ b/backend/pkg/notification/sending.go @@ -20,6 +20,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/utils" "github.com/jmoiron/sqlx" "github.com/lib/pq" + "golang.org/x/sync/errgroup" ) func InitNotificationSender() { @@ -233,11 +234,17 @@ func sendWebhookNotifications() error { if err != nil { return fmt.Errorf("error querying notification queue, err: %w", err) } - client := &http.Client{Timeout: time.Second * 30} + + // webhooks have 5 seconds to respond + client := &http.Client{Timeout: time.Second * 5} log.Infof("processing %v webhook notifications", len(notificationQueueItem)) + // use an error group to throttle webhook requests + g := &errgroup.Group{} + g.SetLimit(50) // issue at most 50 requests at a time for _, n := range notificationQueueItem { + n := n _, err := db.CountSentMessage("n_webhooks", n.Content.UserId) if err != nil { log.Error(err, "error counting sent webhook", 0) @@ -268,7 +275,7 @@ func sendWebhookNotifications() error { continue } - go func(n types.TransitWebhook) { + g.Go(func() error { if n.Content.Webhook.Retries > 0 { time.Sleep(time.Duration(n.Content.Webhook.Retries) * time.Second) } @@ -276,7 +283,7 @@ func sendWebhookNotifications() error { if err != nil { log.Warnf("error sending webhook request: %v", err) metrics.NotificationsSent.WithLabelValues("webhook", "error").Inc() - return + return nil } else { metrics.NotificationsSent.WithLabelValues("webhook", resp.Status).Inc() } @@ -285,14 +292,18 @@ func sendWebhookNotifications() error { _, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id) if err != nil { log.Error(err, "error updating notification_queue table", 0) - return + return nil } if resp != nil && resp.StatusCode < 400 { - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID) + // update retries counters in db based on end result + if n.Content.Webhook.DashboardId == 0 && n.Content.Webhook.DashboardGroupId == 0 { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, n.Content.Webhook.Retries, n.Content.Webhook.ID) + } else { + _, err = db.WriterDb.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = $1, webhook_last_sent = now() WHERE id = $2 AND dashboard_id = $3;`, n.Content.Webhook.Retries, n.Content.Webhook.DashboardGroupId, n.Content.Webhook.DashboardId) + } if err != nil { - log.Error(err, "error updating users_webhooks table", 0) - return + log.Warnf("failed to update retries counter to %v for webhook %v: %v", n.Content.Webhook.Retries, n.Content.Webhook.ID, err) } } else { var errResp types.ErrorResponse @@ -307,13 +318,23 @@ func sendWebhookNotifications() error { errResp.Body = string(b) } - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp) + if n.Content.Webhook.DashboardId == 0 && n.Content.Webhook.DashboardGroupId == 0 { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = retries + 1, webhook_last_sent = now() WHERE id = $1 AND dashboard_id = $2;`, n.Content.Webhook.DashboardGroupId, n.Content.Webhook.DashboardId) + } else { + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp) + } if err != nil { log.Error(err, "error updating users_webhooks table", 0) - return + return nil } } - }(n) + return nil + }) + } + + err = g.Wait() + if err != nil { + log.Error(err, "error waiting for errgroup", 0) } return nil } @@ -402,7 +423,6 @@ func sendDiscordNotifications() error { continue // skip } - log.Infof("sending discord webhook request to %s with: %v", webhook.Url, reqs[i].Content.DiscordRequest) resp, err := client.Post(webhook.Url, "application/json", reqBody) if err != nil { log.Warnf("failed sending discord webhook request %v: %v", webhook.ID, err) From 3adaf9defda3cae9e64460e4410c17c13edbaada Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Fri, 25 Oct 2024 11:20:57 +0000 Subject: [PATCH 8/9] chore(notifications): throttle discord webhook calls --- backend/pkg/notification/sending.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/backend/pkg/notification/sending.go b/backend/pkg/notification/sending.go index 95c7101fb..12cdd867b 100644 --- a/backend/pkg/notification/sending.go +++ b/backend/pkg/notification/sending.go @@ -377,10 +377,13 @@ func sendDiscordNotifications() error { } notifMap[n.Content.Webhook.ID] = append(notifMap[n.Content.Webhook.ID], n) } + // use an error group to throttle webhook requests + g := &errgroup.Group{} + g.SetLimit(50) // issue at most 50 requests at a time + for _, webhook := range webhookMap { - // todo: this has the potential to spin up thousands of go routines - // should use an errgroup instead if we decide to keep the aproach - go func(webhook types.UserWebhook, reqs []types.TransitDiscord) { + webhook := webhook + g.Go(func() error { defer func() { // update retries counters in db based on end result if webhook.DashboardId == 0 && webhook.DashboardGroupId == 0 { @@ -394,7 +397,7 @@ func sendDiscordNotifications() error { // mark notifcations as sent in db ids := make([]uint64, 0) - for _, req := range reqs { + for _, req := range notifMap[webhook.ID] { ids = append(ids, req.Id) } _, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids)) @@ -406,10 +409,10 @@ func sendDiscordNotifications() error { _, err = url.Parse(webhook.Url) if err != nil { log.Error(err, "error parsing url", 0, log.Fields{"webhook_id": webhook.ID}) - return + return nil } - for i := 0; i < len(reqs); i++ { + for i := 0; i < len(notifMap[webhook.ID]); i++ { if webhook.Retries > 5 { break // stop } @@ -417,7 +420,7 @@ func sendDiscordNotifications() error { time.Sleep(time.Duration(webhook.Retries) * time.Second) reqBody := new(bytes.Buffer) - err := json.NewEncoder(reqBody).Encode(reqs[i].Content.DiscordRequest) + err := json.NewEncoder(reqBody).Encode(notifMap[webhook.ID][i].Content.DiscordRequest) if err != nil { log.Error(err, "error marshalling discord webhook event", 0) continue // skip @@ -450,7 +453,7 @@ func sendDiscordNotifications() error { log.WarnWithFields(map[string]interface{}{"errResp.Body": utils.FirstN(errResp.Body, 1000), "webhook.Url": webhook.Url}, "error pushing discord webhook") } if webhook.DashboardId == 0 && webhook.DashboardGroupId == 0 { - _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp) + _, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, notifMap[webhook.ID][i].Content.DiscordRequest, errResp) } if err != nil { log.Error(err, "error storing failure data in users_webhooks table", 0) @@ -460,7 +463,13 @@ func sendDiscordNotifications() error { i-- // retry, IMPORTANT to be at the END of the ELSE, otherwise the wrong index will be used in the commands above! } } - }(webhook, notifMap[webhook.ID]) + return nil + }) + } + + err = g.Wait() + if err != nil { + log.Error(err, "error waiting for errgroup", 0) } return nil From 6f52db8d5e64ba944911dddc73d379584e3552c2 Mon Sep 17 00:00:00 2001 From: peter <1674920+peterbitfly@users.noreply.github.com> Date: Thu, 31 Oct 2024 07:23:45 +0000 Subject: [PATCH 9/9] fix(notifications): properly initialize efficiency map in all cases --- backend/cmd/misc/main.go | 2 +- backend/pkg/notification/collection.go | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/backend/cmd/misc/main.go b/backend/cmd/misc/main.go index 5134cc5e9..2851ff268 100644 --- a/backend/cmd/misc/main.go +++ b/backend/cmd/misc/main.go @@ -569,7 +569,7 @@ func collectNotifications(startEpoch uint64) error { if err != nil { return err } - defer tx.Rollback() + defer utils.Rollback(tx) err = notification.QueueWebhookNotifications(notifications, tx) if err != nil { diff --git a/backend/pkg/notification/collection.go b/backend/pkg/notification/collection.go index 2c703e583..04089dec9 100644 --- a/backend/pkg/notification/collection.go +++ b/backend/pkg/notification/collection.go @@ -519,6 +519,13 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio return fmt.Errorf("error getting proposal assignments: %w", err) } for _, assignment := range proposalAssignments.Data { + if _, ok := efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)]; !ok { + efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)] = &dbResult{ + ValidatorIndex: assignment.ValidatorIndex, + AttestationReward: decimal.Decimal{}, + AttestationIdealReward: decimal.Decimal{}, + } + } efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)].BlocksScheduled++ } @@ -538,6 +545,13 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio efficiencyMap[types.ValidatorIndex(s.Data.Message.ProposerIndex)].BlocksProposed++ for i, validatorIndex := range syncAssignments.Data.Validators { + if _, ok := efficiencyMap[types.ValidatorIndex(validatorIndex)]; !ok { + efficiencyMap[types.ValidatorIndex(validatorIndex)] = &dbResult{ + ValidatorIndex: uint64(validatorIndex), + AttestationReward: decimal.Decimal{}, + AttestationIdealReward: decimal.Decimal{}, + } + } efficiencyMap[types.ValidatorIndex(validatorIndex)].SyncScheduled++ if utils.BitAtVector(s.Data.Message.Body.SyncAggregate.SyncCommitteeBits, i) {