Skip to content

Commit

Permalink
fix(notifications): fix discord notification sending
Browse files Browse the repository at this point in the history
fix(notifications): adapt discord webhook payload
  • Loading branch information
peterbitfly authored and guybrush committed Dec 5, 2024
1 parent 8f0fc88 commit ccbd929
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 91 deletions.
8 changes: 5 additions & 3 deletions backend/pkg/commons/types/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,9 +591,11 @@ type TransitDiscord struct {
}

type TransitDiscordContent struct {
Webhook UserWebhook
DiscordRequest DiscordReq `json:"discordRequest"`
UserId UserId `json:"userId"`
Webhook UserWebhook
DiscordRequest DiscordReq `json:"discordRequest"`
UserId UserId `json:"userId"`
DashboardId uint64 `json:"dashboardId"`
DashboardGroupId uint64 `json:"dashboardGroupId"`
}

func (e *TransitDiscordContent) Scan(value interface{}) error {
Expand Down
155 changes: 67 additions & 88 deletions backend/pkg/notification/sending.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/metrics"
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
"golang.org/x/sync/errgroup"
)

Expand Down Expand Up @@ -361,115 +359,97 @@ func sendDiscordNotifications() error {
if err != nil {
return fmt.Errorf("error querying notification queue, err: %w", err)
}
client := &http.Client{Timeout: time.Second * 30}
// webhooks have 5 seconds to respond
client := &http.Client{Timeout: time.Second * 5}

log.Infof("processing %v discord webhook notifications", len(notificationQueueItem))
webhookMap := make(map[uint64]types.UserWebhook)

notifMap := make(map[uint64][]types.TransitDiscord)
// generate webhook id => discord req
// while mapping. aggregate embeds while doing so, up to 10 per req can be sent
// use an error group to throttle webhook requests
g := &errgroup.Group{}
g.SetLimit(50) // issue at most 50 requests at a time
for _, n := range notificationQueueItem {
// purge the event from existence if the retry counter is over 5
n := n
_, err := db.CountSentMessage(NOTIFICAION_WEBHOOK_RATE_LIMIT_BUCKET, n.Content.UserId)
if err != nil {
log.Error(err, "error counting sent webhook", 0)
}

// do not retry after 5 attempts
if n.Content.Webhook.Retries > 5 {
_, err = db.WriterDb.Exec(`DELETE FROM notification_queue where id = $1`, n.Id)
_, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
if err != nil {
log.Warnf("failed to delete notification from queue: %v", err)
return fmt.Errorf("error deleting from notification queue: %w", err)
}
continue
}
if _, exists := webhookMap[n.Content.Webhook.ID]; !exists {
webhookMap[n.Content.Webhook.ID] = n.Content.Webhook

reqBody := new(bytes.Buffer)

err = json.NewEncoder(reqBody).Encode(n.Content.DiscordRequest)
if err != nil {
log.Error(err, "error marshalling webhook event", 0)
}
if _, exists := notifMap[n.Content.Webhook.ID]; !exists {
notifMap[n.Content.Webhook.ID] = make([]types.TransitDiscord, 0)

_, err = url.Parse(n.Content.Webhook.Url)
if err != nil {
_, err := db.WriterDb.Exec(`DELETE FROM notification_queue WHERE id = $1`, n.Id)
if err != nil {
return fmt.Errorf("error deleting from notification queue: %w", err)
}
continue
}
notifMap[n.Content.Webhook.ID] = append(notifMap[n.Content.Webhook.ID], n)
}
// use an error group to throttle webhook requests
g := &errgroup.Group{}
g.SetLimit(50) // issue at most 50 requests at a time

for _, webhook := range webhookMap {
webhook := webhook
g.Go(func() error {
defer func() {
// update retries counters in db based on end result
if webhook.DashboardId == 0 && webhook.DashboardGroupId == 0 {
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, webhook.Retries, webhook.ID)
} else {
_, err = db.WriterDb.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = $1, webhook_last_sent = now() WHERE id = $2 AND dashboard_id = $3;`, webhook.Retries, webhook.DashboardGroupId, webhook.DashboardId)
}
if err != nil {
log.Warnf("failed to update retries counter to %v for webhook %v: %v", webhook.Retries, webhook.ID, err)
}

// mark notifcations as sent in db
ids := make([]uint64, 0)
for _, req := range notifMap[webhook.ID] {
ids = append(ids, req.Id)
}
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() where id = ANY($1)`, pq.Array(ids))
if err != nil {
log.Warnf("failed to update sent for notifcations in queue: %v", err)
}
}()
if n.Content.Webhook.Retries > 0 {
time.Sleep(time.Duration(n.Content.Webhook.Retries) * time.Second)
}
resp, err := client.Post(n.Content.Webhook.Url, "application/json", reqBody)
if err != nil {
log.Warnf("error sending discord webhook request: %v", err)
metrics.NotificationsSent.WithLabelValues("webhook_discord", "error").Inc()
return nil
} else {
metrics.NotificationsSent.WithLabelValues("webhook_discord", resp.Status).Inc()
}
defer resp.Body.Close()

_, err = url.Parse(webhook.Url)
_, err = db.WriterDb.Exec(`UPDATE notification_queue SET sent = now() WHERE id = $1`, n.Id)
if err != nil {
log.Error(err, "error parsing url", 0, log.Fields{"webhook_id": webhook.ID})
log.Error(err, "error updating notification_queue table", 0)
return nil
}

for i := 0; i < len(notifMap[webhook.ID]); i++ {
if webhook.Retries > 5 {
break // stop
if resp != nil && resp.StatusCode < 400 {
// update retries counters in db based on end result
if n.Content.Webhook.DashboardId == 0 && n.Content.Webhook.DashboardGroupId == 0 {
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = $1, last_sent = now() WHERE id = $2;`, n.Content.Webhook.Retries, n.Content.Webhook.ID)
} else {
_, err = db.WriterDb.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = $1, webhook_last_sent = now() WHERE id = $2 AND dashboard_id = $3;`, n.Content.Webhook.Retries, n.Content.Webhook.DashboardGroupId, n.Content.Webhook.DashboardId)
}
// sleep between retries
time.Sleep(time.Duration(webhook.Retries) * time.Second)

reqBody := new(bytes.Buffer)
err := json.NewEncoder(reqBody).Encode(notifMap[webhook.ID][i].Content.DiscordRequest)
if err != nil {
log.Error(err, "error marshalling discord webhook event", 0)
continue // skip
log.Warnf("failed to update retries counter to %v for webhook %v: %v", n.Content.Webhook.Retries, n.Content.Webhook.ID, err)
}
} else {
var errResp types.ErrorResponse

resp, err := client.Post(webhook.Url, "application/json", reqBody)
if err != nil {
log.Warnf("failed sending discord webhook request %v: %v", webhook.ID, err)
metrics.NotificationsSent.WithLabelValues("webhook_discord", "error").Inc()
} else {
metrics.NotificationsSent.WithLabelValues("webhook_discord", resp.Status).Inc()
}
if resp != nil && resp.StatusCode < 400 {
webhook.Retries = 0
} else {
webhook.Retries++
var errResp types.ErrorResponse

if resp != nil {
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Error(err, "error reading body", 0)
} else {
errResp.Body = string(b)
}
errResp.Status = resp.Status
resp.Body.Close()

if resp.StatusCode != http.StatusOK {
log.WarnWithFields(map[string]interface{}{"errResp.Body": utils.FirstN(errResp.Body, 1000), "webhook.Url": webhook.Url}, "error pushing discord webhook")
}
if webhook.DashboardId == 0 && webhook.DashboardGroupId == 0 {
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET request = $2, response = $3 WHERE id = $1;`, webhook.ID, notifMap[webhook.ID][i].Content.DiscordRequest, errResp)
}
if err != nil {
log.Error(err, "error storing failure data in users_webhooks table", 0)
}
if resp != nil {
b, err := io.ReadAll(resp.Body)
if err != nil {
log.Error(err, "error reading body", 0)
}

i-- // retry, IMPORTANT to be at the END of the ELSE, otherwise the wrong index will be used in the commands above!
errResp.Status = resp.Status
errResp.Body = string(b)
}

if n.Content.Webhook.DashboardId == 0 && n.Content.Webhook.DashboardGroupId == 0 {
_, err = db.FrontendWriterDB.Exec(`UPDATE users_webhooks SET retries = retries + 1, last_sent = now(), request = $2, response = $3 WHERE id = $1;`, n.Content.Webhook.ID, n.Content, errResp)
} else {
_, err = db.WriterDb.Exec(`UPDATE users_val_dashboards_groups SET webhook_retries = webhook_retries + 1, webhook_last_sent = now() WHERE id = $1 AND dashboard_id = $2;`, n.Content.Webhook.DashboardGroupId, n.Content.Webhook.DashboardId)
}
if err != nil {
log.Error(err, "error updating users_webhooks table", 0)
return nil
}
}
return nil
Expand All @@ -480,7 +460,6 @@ func sendDiscordNotifications() error {
if err != nil {
log.Error(err, "error waiting for errgroup", 0)
}

return nil
}

Expand Down

0 comments on commit ccbd929

Please sign in to comment.