Skip to content

Commit

Permalink
Merge pull request #1028 from gobitfly/BEDS-865/implement_test_notifi…
Browse files Browse the repository at this point in the history
…cation_funcs

Beds 865/implement test notification funcs
  • Loading branch information
peterbitfly authored Oct 24, 2024
2 parents 76f315a + 5395522 commit 40801a8
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 18 deletions.
10 changes: 4 additions & 6 deletions backend/pkg/api/data_access/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/gobitfly/beaconchain/pkg/notification"
n "github.com/gobitfly/beaconchain/pkg/notification"
"github.com/lib/pq"
"github.com/shopspring/decimal"
Expand Down Expand Up @@ -2269,14 +2270,11 @@ func (d *DataAccessService) AddOrRemoveEvent(eventsToInsert *[]goqu.Record, even
}

func (d *DataAccessService) QueueTestEmailNotification(ctx context.Context, userId uint64) error {
// TODO: @Data Access
return nil
return notification.SendTestEmail(ctx, types.UserId(userId), d.userReader)
}
func (d *DataAccessService) QueueTestPushNotification(ctx context.Context, userId uint64) error {
// TODO: @Data Access
return nil
return notification.QueueTestPushNotification(ctx, types.UserId(userId), d.userReader, d.readerDb)
}
func (d *DataAccessService) QueueTestWebhookNotification(ctx context.Context, userId uint64, webhookUrl string, isDiscordWebhook bool) error {
// TODO: @Data Access
return nil
return notification.SendTestWebhookNotification(ctx, types.UserId(userId), webhookUrl, isDiscordWebhook)
}
10 changes: 2 additions & 8 deletions backend/pkg/commons/mail/mail.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,15 +72,9 @@ func createTextMessage(msg types.Email) string {

// SendMailRateLimited sends an email to a given address with the given message.
// It will return a ratelimit-error if the configured ratelimit is exceeded.
func SendMailRateLimited(content types.TransitEmailContent) error {
func SendMailRateLimited(content types.TransitEmailContent, maxEmailsPerDay int64, bucket string) error {
sendThresholdReachedMail := false
maxEmailsPerDay := int64(0)
userInfo, err := db.GetUserInfo(context.Background(), uint64(content.UserId), db.FrontendReaderDB)
if err != nil {
return err
}
maxEmailsPerDay = int64(userInfo.PremiumPerks.EmailNotificationsPerDay)
count, err := db.CountSentMessage("n_mails", content.UserId)
count, err := db.CountSentMessage(bucket, content.UserId)
if err != nil {
return err
}
Expand Down
5 changes: 3 additions & 2 deletions backend/pkg/notification/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/log"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

Expand Down Expand Up @@ -282,7 +283,7 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las
return subMap, nil
}

func GetUserPushTokenByIds(ids []types.UserId) (map[types.UserId][]string, error) {
func GetUserPushTokenByIds(ids []types.UserId, userDbConn *sqlx.DB) (map[types.UserId][]string, error) {
pushByID := map[types.UserId][]string{}
if len(ids) == 0 {
return pushByID, nil
Expand All @@ -292,7 +293,7 @@ func GetUserPushTokenByIds(ids []types.UserId) (map[types.UserId][]string, error
Token string `db:"notification_token"`
}

err := db.FrontendWriterDB.Select(&rows, "SELECT DISTINCT ON (user_id, notification_token) user_id, notification_token FROM users_devices WHERE (user_id = ANY($1) AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2)) AND notify_enabled = true AND active = true AND notification_token IS NOT NULL AND LENGTH(notification_token) > 20 ORDER BY user_id, notification_token, id DESC", pq.Array(ids), types.PushNotificationChannel)
err := userDbConn.Select(&rows, "SELECT DISTINCT ON (user_id, notification_token) user_id, notification_token FROM users_devices WHERE (user_id = ANY($1) AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2)) AND notify_enabled = true AND active = true AND notification_token IS NOT NULL AND LENGTH(notification_token) > 20 ORDER BY user_id, notification_token, id DESC", pq.Array(ids), types.PushNotificationChannel)
if err != nil {
return nil, err
}
Expand Down
44 changes: 43 additions & 1 deletion backend/pkg/notification/queuing.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package notification
import (
"bytes"
"compress/gzip"
"context"
"database/sql"
"encoding/gob"
"fmt"
Expand Down Expand Up @@ -450,7 +451,7 @@ func RenderPushMessagesForUserEvents(epoch uint64, notificationsByUserID types.N

userIDs := slices.Collect(maps.Keys(notificationsByUserID))

tokensByUserID, err := GetUserPushTokenByIds(userIDs)
tokensByUserID, err := GetUserPushTokenByIds(userIDs, db.FrontendReaderDB)
if err != nil {
metrics.Errors.WithLabelValues("notifications_send_push_notifications").Inc()
return nil, fmt.Errorf("error when sending push-notifications: could not get tokens: %w", err)
Expand Down Expand Up @@ -587,6 +588,47 @@ func QueuePushNotification(epoch uint64, notificationsByUserID types.Notificatio
return nil
}

func QueueTestPushNotification(ctx context.Context, userId types.UserId, userDbConn *sqlx.DB, networkDbConn *sqlx.DB) error {
count, err := db.CountSentMessage("n_test_push", userId)
if err != nil {
return err
}
if count > 10 {
return fmt.Errorf("rate limit has been exceeded")
}
tokens, err := GetUserPushTokenByIds([]types.UserId{userId}, userDbConn)
if err != nil {
return err
}

messages := []*messaging.Message{}
for _, tokensOfUser := range tokens {
for _, token := range tokensOfUser {
log.Infof("sending test push to user %d with token %v", userId, token)
messages = append(messages, &messaging.Message{
Notification: &messaging.Notification{
Title: "Test Push",
Body: "This is a test push from beaconcha.in",
},
Token: token,
})
}
}

if len(messages) == 0 {
return fmt.Errorf("no push tokens found for user %v", userId)
}

transit := types.TransitPushContent{
Messages: messages,
UserId: userId,
}

_, err = networkDbConn.ExecContext(ctx, `INSERT INTO notification_queue (created, channel, content) VALUES (NOW(), 'push', $1)`, transit)

return err
}

func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserId, tx *sqlx.Tx) error {
for userID, userNotifications := range notificationsByUserID {
var webhooks []types.UserWebhook
Expand Down
73 changes: 72 additions & 1 deletion backend/pkg/notification/sending.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/gobitfly/beaconchain/pkg/commons/services"
"github.com/gobitfly/beaconchain/pkg/commons/types"
"github.com/gobitfly/beaconchain/pkg/commons/utils"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
)

Expand Down Expand Up @@ -154,7 +155,11 @@ func sendEmailNotifications() error {
log.Infof("processing %v email notifications", len(notificationQueueItem))

for _, n := range notificationQueueItem {
err = mail.SendMailRateLimited(n.Content)
userInfo, err := db.GetUserInfo(context.Background(), uint64(n.Content.UserId), db.FrontendReaderDB)
if err != nil {
return err
}
err = mail.SendMailRateLimited(n.Content, int64(userInfo.PremiumPerks.EmailNotificationsPerDay), "n_emails")
if err != nil {
if !strings.Contains(err.Error(), "rate limit has been exceeded") {
metrics.Errors.WithLabelValues("notifications_send_email").Inc()
Expand Down Expand Up @@ -433,3 +438,69 @@ func sendDiscordNotifications() error {

return nil
}

func SendTestEmail(ctx context.Context, userId types.UserId, dbConn *sqlx.DB) error {
var email string
err := dbConn.GetContext(ctx, &email, `SELECT email FROM users WHERE id = $1`, userId)
if err != nil {
return err
}
content := types.TransitEmailContent{
UserId: userId,
Address: email,
Subject: "Test Email",
Email: types.Email{
Title: "beaconcha.in - Test Email",
Body: "This is a test email from beaconcha.in",
},
Attachments: []types.EmailAttachment{},
CreatedTs: time.Now(),
}
err = mail.SendMailRateLimited(content, 10, "n_test_emails")
if err != nil {
return fmt.Errorf("error sending test email, err: %w", err)
}

return nil
}

func SendTestWebhookNotification(ctx context.Context, userId types.UserId, webhookUrl string, isDiscordWebhook bool) error {
count, err := db.CountSentMessage("n_test_push", userId)
if err != nil {
return err
}
if count > 10 {
return fmt.Errorf("rate limit has been exceeded")
}

client := http.Client{Timeout: time.Second * 5}

if isDiscordWebhook {
req := types.DiscordReq{
Content: "This is a test notification from beaconcha.in",
}
reqBody := new(bytes.Buffer)
err := json.NewEncoder(reqBody).Encode(req)
if err != nil {
return fmt.Errorf("error marshalling discord webhook event: %w", err)
}
resp, err := client.Post(webhookUrl, "application/json", reqBody)
if err != nil {
return fmt.Errorf("error sending discord webhook request: %w", err)
}
defer resp.Body.Close()
} else {
// send a test webhook notification with the text "TEST" in the post body
reqBody := new(bytes.Buffer)
err := json.NewEncoder(reqBody).Encode(`{data: "TEST"}`)
if err != nil {
return fmt.Errorf("error marshalling webhook event: %w", err)
}
resp, err := client.Post(webhookUrl, "application/json", reqBody)
if err != nil {
return fmt.Errorf("error sending webhook request: %w", err)
}
defer resp.Body.Close()
}
return nil
}

0 comments on commit 40801a8

Please sign in to comment.