Skip to content

Commit

Permalink
Merge pull request #1051 from gobitfly/BEDS-397/initialize_efficiency…
Browse files Browse the repository at this point in the history
…_map

Beds 397/initialize efficiency map
  • Loading branch information
peterbitfly authored Oct 31, 2024
2 parents a5628cc + 3f1a5fa commit 966e5f8
Show file tree
Hide file tree
Showing 6 changed files with 454 additions and 161 deletions.
33 changes: 24 additions & 9 deletions backend/cmd/misc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ func Run() {
func collectNotifications(startEpoch uint64) error {
epoch := startEpoch

notifications, err := notification.GetNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch)
notifications, err := notification.GetHeadNotificationsForEpoch(utils.Config.Notifications.PubkeyCachePath, epoch)
if err != nil {
return err
}
Expand All @@ -565,20 +565,35 @@ func collectNotifications(startEpoch uint64) error {
spew.Dump(notifications[0])
}

emails, err := notification.RenderEmailsForUserEvents(0, notifications)
tx, err := db.WriterDb.Beginx()
if err != nil {
return err
}
defer utils.Rollback(tx)

for _, email := range emails {
// if email.Address == "" {
log.Infof("to: %v", email.Address)
log.Infof("subject: %v", email.Subject)
log.Infof("body: %v", email.Email.Body)
log.Info("-----")
// }
err = notification.QueueWebhookNotifications(notifications, tx)
if err != nil {
return err
}
err = tx.Commit()
if err != nil {
return err
}

// emails, err := notification.RenderEmailsForUserEvents(0, notifications)
// if err != nil {
// return err
// }

// for _, email := range emails {
// // if email.Address == "" {
// log.Infof("to: %v", email.Address)
// log.Infof("subject: %v", email.Subject)
// log.Infof("body: %v", email.Email.Body)
// log.Info("-----")
// // }
// }

// pushMessages, err := notification.RenderPushMessagesForUserEvents(0, notifications)
// if err != nil {
// return err
Expand Down
25 changes: 14 additions & 11 deletions backend/pkg/commons/types/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -546,8 +546,9 @@ type TransitWebhook struct {

type TransitWebhookContent struct {
Webhook UserWebhook
Event WebhookEvent `json:"event"`
UserId UserId `json:"userId"`
Event *WebhookEvent `json:"event,omitempty"`
Events []*WebhookEvent `json:"events,omitempty"`
UserId UserId `json:"userId"`
}

type WebhookEvent struct {
Expand Down Expand Up @@ -639,15 +640,17 @@ type Email struct {
}

type UserWebhook struct {
ID uint64 `db:"id" json:"id"`
UserID uint64 `db:"user_id" json:"-"`
Url string `db:"url" json:"url"`
Retries uint64 `db:"retries" json:"retries"`
LastSent sql.NullTime `db:"last_sent" json:"lastRetry"`
Response sql.NullString `db:"response" json:"response"`
Request sql.NullString `db:"request" json:"request"`
Destination sql.NullString `db:"destination" json:"destination"`
EventNames pq.StringArray `db:"event_names" json:"-"`
ID uint64 `db:"id" json:"id"`
UserID uint64 `db:"user_id" json:"-"`
Url string `db:"url" json:"url"`
Retries uint64 `db:"retries" json:"retries"`
LastSent sql.NullTime `db:"last_sent" json:"lastRetry"`
Response sql.NullString `db:"response" json:"response"`
Request sql.NullString `db:"request" json:"request"`
Destination sql.NullString `db:"destination" json:"destination"`
EventNames pq.StringArray `db:"event_names" json:"-"`
DashboardId uint64 `db:"dashboard_id" json:"dashboardId"`
DashboardGroupId uint64 `db:"dashboard_group_id" json:"dashboardGroupId"`
}

type UserWebhookSubscriptions struct {
Expand Down
18 changes: 16 additions & 2 deletions backend/pkg/notification/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func notificationCollector() {
log.Infof("collecting notifications for epoch %v", epoch)

// Network DB Notifications (network related)
notifications, err := collectNotifications(epoch, mc)
notifications, err := collectNotifications(mc, epoch)

if err != nil {
log.Error(err, "error collection notifications", 0)
Expand Down Expand Up @@ -280,7 +280,7 @@ func collectUpcomingBlockProposalNotifications(notificationsByUserID types.Notif
return nil
}

func collectNotifications(epoch uint64, mc modules.ModuleContext) (types.NotificationsPerUserId, error) {
func collectNotifications(mc modules.ModuleContext, epoch uint64) (types.NotificationsPerUserId, error) {
notificationsByUserID := types.NotificationsPerUserId{}
start := time.Now()
var err error
Expand Down Expand Up @@ -519,6 +519,13 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
return fmt.Errorf("error getting proposal assignments: %w", err)
}
for _, assignment := range proposalAssignments.Data {
if _, ok := efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)]; !ok {
efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)] = &dbResult{
ValidatorIndex: assignment.ValidatorIndex,
AttestationReward: decimal.Decimal{},
AttestationIdealReward: decimal.Decimal{},
}
}
efficiencyMap[types.ValidatorIndex(assignment.ValidatorIndex)].BlocksScheduled++
}

Expand All @@ -538,6 +545,13 @@ func collectGroupEfficiencyNotifications(notificationsByUserID types.Notificatio
efficiencyMap[types.ValidatorIndex(s.Data.Message.ProposerIndex)].BlocksProposed++

for i, validatorIndex := range syncAssignments.Data.Validators {
if _, ok := efficiencyMap[types.ValidatorIndex(validatorIndex)]; !ok {
efficiencyMap[types.ValidatorIndex(validatorIndex)] = &dbResult{
ValidatorIndex: uint64(validatorIndex),
AttestationReward: decimal.Decimal{},
AttestationIdealReward: decimal.Decimal{},
}
}
efficiencyMap[types.ValidatorIndex(validatorIndex)].SyncScheduled++

if utils.BitAtVector(s.Data.Message.Body.SyncAggregate.SyncCommitteeBits, i) {
Expand Down
15 changes: 14 additions & 1 deletion backend/pkg/notification/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,20 @@ func GetNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.Notif
if err != nil {
log.Fatal(err, "error initializing pubkey cache path for notifications", 0)
}
return collectNotifications(epoch, mc)
return collectNotifications(mc, epoch)
}

func GetHeadNotificationsForEpoch(pubkeyCachePath string, epoch uint64) (types.NotificationsPerUserId, error) {
mc, err := modules.GetModuleContext()
if err != nil {
log.Fatal(err, "error getting module context", 0)
}

err = initPubkeyCache(pubkeyCachePath)
if err != nil {
log.Fatal(err, "error initializing pubkey cache path for notifications", 0)
}
return collectHeadNotifications(mc, epoch)
}

// Used for isolated testing
Expand Down
Loading

0 comments on commit 966e5f8

Please sign in to comment.