diff --git a/backend/pkg/api/data_access/notifications.go b/backend/pkg/api/data_access/notifications.go index 868efb171..7ae5c2515 100644 --- a/backend/pkg/api/data_access/notifications.go +++ b/backend/pkg/api/data_access/notifications.go @@ -27,6 +27,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/gobitfly/beaconchain/pkg/notification" n "github.com/gobitfly/beaconchain/pkg/notification" "github.com/lib/pq" "github.com/shopspring/decimal" @@ -2269,14 +2270,11 @@ func (d *DataAccessService) AddOrRemoveEvent(eventsToInsert *[]goqu.Record, even } func (d *DataAccessService) QueueTestEmailNotification(ctx context.Context, userId uint64) error { - // TODO: @Data Access - return nil + return notification.SendTestEmail(ctx, types.UserId(userId), d.userReader) } func (d *DataAccessService) QueueTestPushNotification(ctx context.Context, userId uint64) error { - // TODO: @Data Access - return nil + return notification.QueueTestPushNotification(ctx, types.UserId(userId), d.userReader, d.readerDb) } func (d *DataAccessService) QueueTestWebhookNotification(ctx context.Context, userId uint64, webhookUrl string, isDiscordWebhook bool) error { - // TODO: @Data Access - return nil + return notification.SendTestWebhookNotification(ctx, types.UserId(userId), webhookUrl, isDiscordWebhook) } diff --git a/backend/pkg/commons/mail/mail.go b/backend/pkg/commons/mail/mail.go index 60f27a14c..7e2491514 100644 --- a/backend/pkg/commons/mail/mail.go +++ b/backend/pkg/commons/mail/mail.go @@ -72,15 +72,9 @@ func createTextMessage(msg types.Email) string { // SendMailRateLimited sends an email to a given address with the given message. // It will return a ratelimit-error if the configured ratelimit is exceeded. -func SendMailRateLimited(content types.TransitEmailContent) error { +func SendMailRateLimited(content types.TransitEmailContent, maxEmailsPerDay int64, bucket string) error { sendThresholdReachedMail := false - maxEmailsPerDay := int64(0) - userInfo, err := db.GetUserInfo(context.Background(), uint64(content.UserId), db.FrontendReaderDB) - if err != nil { - return err - } - maxEmailsPerDay = int64(userInfo.PremiumPerks.EmailNotificationsPerDay) - count, err := db.CountSentMessage("n_mails", content.UserId) + count, err := db.CountSentMessage(bucket, content.UserId) if err != nil { return err } diff --git a/backend/pkg/notification/db.go b/backend/pkg/notification/db.go index 56c216fb5..d7ab55394 100644 --- a/backend/pkg/notification/db.go +++ b/backend/pkg/notification/db.go @@ -12,6 +12,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/log" "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/jmoiron/sqlx" "github.com/lib/pq" ) @@ -282,7 +283,7 @@ func GetSubsForEventFilter(eventName types.EventName, lastSentFilter string, las return subMap, nil } -func GetUserPushTokenByIds(ids []types.UserId) (map[types.UserId][]string, error) { +func GetUserPushTokenByIds(ids []types.UserId, userDbConn *sqlx.DB) (map[types.UserId][]string, error) { pushByID := map[types.UserId][]string{} if len(ids) == 0 { return pushByID, nil @@ -292,7 +293,7 @@ func GetUserPushTokenByIds(ids []types.UserId) (map[types.UserId][]string, error Token string `db:"notification_token"` } - err := db.FrontendWriterDB.Select(&rows, "SELECT DISTINCT ON (user_id, notification_token) user_id, notification_token FROM users_devices WHERE (user_id = ANY($1) AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2)) AND notify_enabled = true AND active = true AND notification_token IS NOT NULL AND LENGTH(notification_token) > 20 ORDER BY user_id, notification_token, id DESC", pq.Array(ids), types.PushNotificationChannel) + err := userDbConn.Select(&rows, "SELECT DISTINCT ON (user_id, notification_token) user_id, notification_token FROM users_devices WHERE (user_id = ANY($1) AND user_id NOT IN (SELECT user_id from users_notification_channels WHERE active = false and channel = $2)) AND notify_enabled = true AND active = true AND notification_token IS NOT NULL AND LENGTH(notification_token) > 20 ORDER BY user_id, notification_token, id DESC", pq.Array(ids), types.PushNotificationChannel) if err != nil { return nil, err } diff --git a/backend/pkg/notification/queuing.go b/backend/pkg/notification/queuing.go index 803a618c8..0a93b7846 100644 --- a/backend/pkg/notification/queuing.go +++ b/backend/pkg/notification/queuing.go @@ -3,6 +3,7 @@ package notification import ( "bytes" "compress/gzip" + "context" "database/sql" "encoding/gob" "fmt" @@ -450,7 +451,7 @@ func RenderPushMessagesForUserEvents(epoch uint64, notificationsByUserID types.N userIDs := slices.Collect(maps.Keys(notificationsByUserID)) - tokensByUserID, err := GetUserPushTokenByIds(userIDs) + tokensByUserID, err := GetUserPushTokenByIds(userIDs, db.FrontendReaderDB) if err != nil { metrics.Errors.WithLabelValues("notifications_send_push_notifications").Inc() return nil, fmt.Errorf("error when sending push-notifications: could not get tokens: %w", err) @@ -587,6 +588,47 @@ func QueuePushNotification(epoch uint64, notificationsByUserID types.Notificatio return nil } +func QueueTestPushNotification(ctx context.Context, userId types.UserId, userDbConn *sqlx.DB, networkDbConn *sqlx.DB) error { + count, err := db.CountSentMessage("n_test_push", userId) + if err != nil { + return err + } + if count > 10 { + return fmt.Errorf("rate limit has been exceeded") + } + tokens, err := GetUserPushTokenByIds([]types.UserId{userId}, userDbConn) + if err != nil { + return err + } + + messages := []*messaging.Message{} + for _, tokensOfUser := range tokens { + for _, token := range tokensOfUser { + log.Infof("sending test push to user %d with token %v", userId, token) + messages = append(messages, &messaging.Message{ + Notification: &messaging.Notification{ + Title: "Test Push", + Body: "This is a test push from beaconcha.in", + }, + Token: token, + }) + } + } + + if len(messages) == 0 { + return fmt.Errorf("no push tokens found for user %v", userId) + } + + transit := types.TransitPushContent{ + Messages: messages, + UserId: userId, + } + + _, err = networkDbConn.ExecContext(ctx, `INSERT INTO notification_queue (created, channel, content) VALUES (NOW(), 'push', $1)`, transit) + + return err +} + func QueueWebhookNotifications(notificationsByUserID types.NotificationsPerUserId, tx *sqlx.Tx) error { for userID, userNotifications := range notificationsByUserID { var webhooks []types.UserWebhook diff --git a/backend/pkg/notification/sending.go b/backend/pkg/notification/sending.go index 699f4c3f2..5d280bfae 100644 --- a/backend/pkg/notification/sending.go +++ b/backend/pkg/notification/sending.go @@ -18,6 +18,7 @@ import ( "github.com/gobitfly/beaconchain/pkg/commons/services" "github.com/gobitfly/beaconchain/pkg/commons/types" "github.com/gobitfly/beaconchain/pkg/commons/utils" + "github.com/jmoiron/sqlx" "github.com/lib/pq" ) @@ -154,7 +155,11 @@ func sendEmailNotifications() error { log.Infof("processing %v email notifications", len(notificationQueueItem)) for _, n := range notificationQueueItem { - err = mail.SendMailRateLimited(n.Content) + userInfo, err := db.GetUserInfo(context.Background(), uint64(n.Content.UserId), db.FrontendReaderDB) + if err != nil { + return err + } + err = mail.SendMailRateLimited(n.Content, int64(userInfo.PremiumPerks.EmailNotificationsPerDay), "n_emails") if err != nil { if !strings.Contains(err.Error(), "rate limit has been exceeded") { metrics.Errors.WithLabelValues("notifications_send_email").Inc() @@ -433,3 +438,69 @@ func sendDiscordNotifications() error { return nil } + +func SendTestEmail(ctx context.Context, userId types.UserId, dbConn *sqlx.DB) error { + var email string + err := dbConn.GetContext(ctx, &email, `SELECT email FROM users WHERE id = $1`, userId) + if err != nil { + return err + } + content := types.TransitEmailContent{ + UserId: userId, + Address: email, + Subject: "Test Email", + Email: types.Email{ + Title: "beaconcha.in - Test Email", + Body: "This is a test email from beaconcha.in", + }, + Attachments: []types.EmailAttachment{}, + CreatedTs: time.Now(), + } + err = mail.SendMailRateLimited(content, 10, "n_test_emails") + if err != nil { + return fmt.Errorf("error sending test email, err: %w", err) + } + + return nil +} + +func SendTestWebhookNotification(ctx context.Context, userId types.UserId, webhookUrl string, isDiscordWebhook bool) error { + count, err := db.CountSentMessage("n_test_push", userId) + if err != nil { + return err + } + if count > 10 { + return fmt.Errorf("rate limit has been exceeded") + } + + client := http.Client{Timeout: time.Second * 5} + + if isDiscordWebhook { + req := types.DiscordReq{ + Content: "This is a test notification from beaconcha.in", + } + reqBody := new(bytes.Buffer) + err := json.NewEncoder(reqBody).Encode(req) + if err != nil { + return fmt.Errorf("error marshalling discord webhook event: %w", err) + } + resp, err := client.Post(webhookUrl, "application/json", reqBody) + if err != nil { + return fmt.Errorf("error sending discord webhook request: %w", err) + } + defer resp.Body.Close() + } else { + // send a test webhook notification with the text "TEST" in the post body + reqBody := new(bytes.Buffer) + err := json.NewEncoder(reqBody).Encode(`{data: "TEST"}`) + if err != nil { + return fmt.Errorf("error marshalling webhook event: %w", err) + } + resp, err := client.Post(webhookUrl, "application/json", reqBody) + if err != nil { + return fmt.Errorf("error sending webhook request: %w", err) + } + defer resp.Body.Close() + } + return nil +}