Skip to content

Commit

Permalink
feat(notifications): store queued notifications in network db
Browse files Browse the repository at this point in the history
  • Loading branch information
peterbitfly committed Sep 9, 2024
1 parent 23ed6e2 commit c8d0e0f
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 47 deletions.
4 changes: 2 additions & 2 deletions backend/pkg/commons/db/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ func GetSubscriptions(filter GetSubscriptionsFilter) ([]*types.Subscription, err
}

// UpdateSubscriptionsLastSent updates `last_sent_ts` column of the `users_subscriptions` table.
func UpdateSubscriptionsLastSent(subscriptionIDs []uint64, sent time.Time, epoch uint64, useDB *sqlx.DB) error {
_, err := useDB.Exec(`
func UpdateSubscriptionsLastSent(subscriptionIDs []uint64, sent time.Time, epoch uint64) error {
_, err := FrontendWriterDB.Exec(`
UPDATE users_subscriptions
SET last_sent_ts = TO_TIMESTAMP($1), last_sent_epoch = $2
WHERE id = ANY($3)`, sent.Unix(), epoch, pq.Array(subscriptionIDs))
Expand Down
89 changes: 44 additions & 45 deletions backend/pkg/notification/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"

"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"github.com/rocket-pool/rocketpool-go/utils/eth"
"golang.org/x/text/cases"
Expand Down Expand Up @@ -126,7 +125,7 @@ func notificationCollector() {
break
}

queueNotifications(notifications, db.FrontendWriterDB) // this caused the collected notifications to be queued and sent
queueNotifications(notifications) // this caused the collected notifications to be queued and sent

// Network DB Notifications (user related, must only run on one instance ever!!!!)
if utils.Config.Notifications.UserDBNotifications {
Expand All @@ -139,7 +138,7 @@ func notificationCollector() {
continue
}

queueNotifications(userNotifications, db.FrontendWriterDB)
queueNotifications(userNotifications)
}

log.InfoWithFields(log.Fields{"notifications": len(notifications), "duration": time.Since(start), "epoch": epoch}, "notifications completed")
Expand Down Expand Up @@ -177,12 +176,12 @@ func notificationSender() {
}

log.Infof("lock obtained")
err = dispatchNotifications(db.FrontendWriterDB)
err = dispatchNotifications()
if err != nil {
log.Error(err, "error dispatching notifications", 0)
}

err = garbageCollectNotificationQueue(db.FrontendWriterDB)
err = garbageCollectNotificationQueue()
if err != nil {
log.Error(err, "error garbage collecting notification queue", 0)
}
Expand Down Expand Up @@ -464,20 +463,20 @@ func collectUserDbNotifications(epoch uint64) (types.NotificationsPerUserId, err
return notificationsByUserID, nil
}

func queueNotifications(notificationsByUserID types.NotificationsPerUserId, useDB *sqlx.DB) {
func queueNotifications(notificationsByUserID types.NotificationsPerUserId) {
subByEpoch := map[uint64][]uint64{}

err := queueEmailNotifications(notificationsByUserID, useDB)
err := queueEmailNotifications(notificationsByUserID)
if err != nil {
log.Error(err, "error queuing email notifications", 0)
}

err = queuePushNotification(notificationsByUserID, useDB)
err = queuePushNotification(notificationsByUserID)
if err != nil {
log.Error(err, "error queuing push notifications", 0)
}

err = queueWebhookNotifications(notificationsByUserID, useDB)
err = queueWebhookNotifications(notificationsByUserID)
if err != nil {
log.Error(err, "error queuing webhook notifications", 0)
}
Expand All @@ -498,7 +497,7 @@ func queueNotifications(notificationsByUserID types.NotificationsPerUserId, useD
// obsolete as notifications are anyway sent on a per-epoch basis
for epoch, subIDs := range subByEpoch {
// update that we've queued the subscription (last sent rather means last queued)
err := db.UpdateSubscriptionsLastSent(subIDs, time.Now(), epoch, useDB)
err := db.UpdateSubscriptionsLastSent(subIDs, time.Now(), epoch)
if err != nil {
log.Error(err, "error updating sent-time of sent notifications", 0)
metrics.Errors.WithLabelValues("notifications_updating_sent_time").Inc()
Expand Down Expand Up @@ -537,23 +536,23 @@ func queueNotifications(notificationsByUserID types.NotificationsPerUserId, useD
}
}

func dispatchNotifications(useDB *sqlx.DB) error {
err := sendEmailNotifications(useDB)
func dispatchNotifications() error {
err := sendEmailNotifications()
if err != nil {
return fmt.Errorf("error sending email notifications, err: %w", err)
}

err = sendPushNotifications(useDB)
err = sendPushNotifications()
if err != nil {
return fmt.Errorf("error sending push notifications, err: %w", err)
}

err = sendWebhookNotifications(useDB)
err = sendWebhookNotifications()
if err != nil {
return fmt.Errorf("error sending webhook notifications, err: %w", err)
}

err = sendDiscordNotifications(useDB)
err = sendDiscordNotifications()
if err != nil {
return fmt.Errorf("error sending webhook discord notifications, err: %w", err)
}
Expand All @@ -562,8 +561,8 @@ func dispatchNotifications(useDB *sqlx.DB) error {
}

// garbageCollectNotificationQueue deletes entries from the notification queue that have been processed
func garbageCollectNotificationQueue(useDB *sqlx.DB) error {
rows, err := useDB.Exec(`DELETE FROM notification_queue WHERE (sent < now() - INTERVAL '30 minutes') OR (created < now() - INTERVAL '1 hour')`)
func garbageCollectNotificationQueue() error {
rows, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE (sent < now() - INTERVAL '30 minutes') OR (created < now() - INTERVAL '1 hour')`)
if err != nil {
return fmt.Errorf("error deleting from notification_queue %w", err)
}
Expand All @@ -583,7 +582,7 @@ func getNetwork() string {
return ""
}

func queuePushNotification(notificationsByUserID types.NotificationsPerUserId, useDB *sqlx.DB) error {
func queuePushNotification(notificationsByUserID types.NotificationsPerUserId) error {
userIDs := slices.Collect(maps.Keys(notificationsByUserID))

tokensByUserID, err := GetUserPushTokenByIds(userIDs)
Expand Down Expand Up @@ -636,7 +635,7 @@ func queuePushNotification(notificationsByUserID types.NotificationsPerUserId, u
Messages: batch,
}

_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'push', $2)`, time.Now(), transitPushContent)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'push', $2)`, time.Now(), transitPushContent)
if err != nil {
log.Error(err, "error writing transit push notification to db", 0)
return
Expand All @@ -646,10 +645,10 @@ func queuePushNotification(notificationsByUserID types.NotificationsPerUserId, u
return nil
}

func sendPushNotifications(useDB *sqlx.DB) error {
func sendPushNotifications() error {
var notificationQueueItem []types.TransitPush

err := useDB.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand Down Expand Up @@ -679,7 +678,7 @@ func sendPushNotifications(useDB *sqlx.DB) error {
metrics.NotificationsSent.WithLabelValues("push", "200").Add(float64(len(n.Content.Messages)))
}

_, err = useDB.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error updating sent status for push notification with id: %v, err: %w", n.Id, err)
}
Expand All @@ -688,7 +687,7 @@ func sendPushNotifications(useDB *sqlx.DB) error {
return nil
}

func queueEmailNotifications(notificationsByUserID types.NotificationsPerUserId, useDB *sqlx.DB) error {
func queueEmailNotifications(notificationsByUserID types.NotificationsPerUserId) error {
userIDs := slices.Collect(maps.Keys(notificationsByUserID))

emailsByUserID, err := GetUserEmailsByIds(userIDs)
Expand Down Expand Up @@ -777,7 +776,7 @@ func queueEmailNotifications(notificationsByUserID types.NotificationsPerUserId,
Attachments: attachments,
}

_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'email', $2)`, time.Now(), transitEmailContent)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES ($1, 'email', $2)`, time.Now(), transitEmailContent)
if err != nil {
log.Error(err, "error writing transit email to db", 0)
}
Expand All @@ -786,10 +785,10 @@ func queueEmailNotifications(notificationsByUserID types.NotificationsPerUserId,
return nil
}

func sendEmailNotifications(useDb *sqlx.DB) error {
func sendEmailNotifications() error {
var notificationQueueItem []types.TransitEmail

err := useDb.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand All @@ -812,18 +811,18 @@ func sendEmailNotifications(useDb *sqlx.DB) error {
metrics.NotificationsSent.WithLabelValues("email", "200").Inc()
}
}
_, err = useDb.Exec(`UPDATE notification_queue set sent = now() where id = $1`, n.Id)
_, err = db.WriterDb.Exec(`UPDATE notification_queue set sent = now() where id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error updating sent status for email notification with id: %v, err: %w", n.Id, err)
}
}
return nil
}

func queueWebhookNotifications(notificationsByUserID types.NotificationsPerUserId, useDB *sqlx.DB) error {
func queueWebhookNotifications(notificationsByUserID types.NotificationsPerUserId) error {
for userID, userNotifications := range notificationsByUserID {
var webhooks []types.UserWebhook
err := useDB.Select(&webhooks, `
err := db.FrontendWriterDB.Select(&webhooks, `
SELECT
id,
user_id,
Expand Down Expand Up @@ -861,7 +860,7 @@ func queueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI
if len(notifications) > 0 {
// reset Retries
if w.Retries > 5 && w.LastSent.Valid && w.LastSent.Time.Add(time.Hour).Before(time.Now()) {
_, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0 WHERE id = $1;`, w.ID)
if err != nil {
log.Error(err, "error updating users_webhooks table; setting retries to zero", 0)
continue
Expand Down Expand Up @@ -938,7 +937,7 @@ func queueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI
}
// process notifs
for _, n := range notifs {
_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), $1, $2);`, n.Channel, n.Content)
if err != nil {
log.Error(err, "error inserting into webhooks_queue", 0)
} else {
Expand All @@ -948,7 +947,7 @@ func queueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI
// process discord notifs
for _, dNotifs := range discordNotifMap {
for _, n := range dNotifs {
_, err = useDB.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n)
_, err = db.WriterDb.Exec(`INSERT INTO notification_queue (created, channel, content) VALUES (now(), 'webhook_discord', $1);`, n)
if err != nil {
log.Error(err, "error inserting into webhooks_queue (discord)", 0)
continue
Expand All @@ -961,10 +960,10 @@ func queueWebhookNotifications(notificationsByUserID types.NotificationsPerUserI
return nil
}

func sendWebhookNotifications(useDB *sqlx.DB) error {
func sendWebhookNotifications() error {
var notificationQueueItem []types.TransitWebhook

err := useDB.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand All @@ -981,7 +980,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
for _, n := range notificationQueueItem {
// do not retry after 5 attempts
if n.Content.Webhook.Retries > 5 {
_, err := db.FrontendWriterDB.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
_, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error deleting from notification queue: %w", err)
}
Expand All @@ -997,7 +996,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {

_, err = url.Parse(n.Content.Webhook.Url)
if err != nil {
_, err := db.FrontendWriterDB.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
_, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error deleting from notification queue: %w", err)
}
Expand All @@ -1017,14 +1016,14 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
}
defer resp.Body.Close()

_, err = useDB.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
if err != nil {
log.Error(err, "error updating notification_queue table", 0)
return
}

if resp != nil && resp.StatusCode < 400 {
_, err = useDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = 0, last_sent = now() WHERE id = $1;`, n.Content.Webhook.ID)
if err != nil {
log.Error(err, "error updating users_webhooks table", 0)
return
Expand All @@ -1042,7 +1041,7 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
errResp.Body = string(b)
}

_, err = useDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp)
if err != nil {
log.Error(err, "error updating users_webhooks table", 0)
return
Expand All @@ -1053,10 +1052,10 @@ func sendWebhookNotifications(useDB *sqlx.DB) error {
return nil
}

func sendDiscordNotifications(useDB *sqlx.DB) error {
func sendDiscordNotifications() error {
var notificationQueueItem []types.TransitDiscord

err := useDB.Select(&notificationQueueItem, `SELECT
err := db.WriterDb.Select(&notificationQueueItem, `SELECT
id,
created,
sent,
Expand All @@ -1077,7 +1076,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
for _, n := range notificationQueueItem {
// purge the event from existence if the retry counter is over 5
if n.Content.Webhook.Retries > 5 {
_, err = db.FrontendWriterDB.Exec(`DELETE FROM notification_queue where id = $1`, n.Id)
_, err = db.WriterDb.Exec(`DELETE FROM notification_queue where id = $1`, n.Id)
if err != nil {
log.Warnf("failed to delete notification from queue: %v", err)
}
Expand All @@ -1097,7 +1096,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
go func(webhook types.UserWebhook, reqs []types.TransitDiscord) {
defer func() {
// update retries counters in db based on end result
_, err = useDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID)
if err != nil {
log.Warnf("failed to update retries counter to %v for webhook %v: %v", webhook.Retries, webhook.ID, err)
}
Expand All @@ -1107,7 +1106,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
for _, req := range reqs {
ids = append(ids, req.Id)
}
_, err = db.FrontendWriterDB.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids))
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids))
if err != nil {
log.Warnf("failed to update sent for notifcations in queue: %v", err)
}
Expand Down Expand Up @@ -1161,7 +1160,7 @@ func sendDiscordNotifications(useDB *sqlx.DB) error {
} else {
log.Error(nil, "error pushing discord webhook", 0, map[string]interface{}{"errResp.Body": errResp.Body, "webhook.Url": webhook.Url})
}
_, err = useDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp)
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, reqs[i].Content.DiscordRequest, errResp)
if err != nil {
log.Error(err, "error storing failure data in users_webhooks table", 0)
}
Expand Down

0 comments on commit c8d0e0f

Please sign in to comment.