Skip to content

Commit

Permalink
feat: impl porter states notification
Browse files Browse the repository at this point in the history
  • Loading branch information
MuZhou233 committed Jun 15, 2024
1 parent e9e2d34 commit c66f20c
Show file tree
Hide file tree
Showing 16 changed files with 305 additions and 233 deletions.
43 changes: 24 additions & 19 deletions app/sephirah/cmd/sephirah/wire_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions app/sephirah/internal/biz/biz.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ var ProviderSet = wire.NewSet(
bizangela.ProviderSet,
biztiphereth.ProviderSet,
bizgebura.NewGebura,
bizbinah.NewBinah,
bizbinah.NewControlBlock,
bizbinah.ProviderSet,
bizyesod.ProviderSet,
biznetzach.NewNetzach,
biznetzach.ProviderSet,
bizchesed.ProviderSet,
)
7 changes: 0 additions & 7 deletions app/sephirah/internal/biz/bizangela/angela.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"github.com/tuihub/librarian/app/sephirah/internal/client"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelangela"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelgebura"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach"
"github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod"
"github.com/tuihub/librarian/app/sephirah/internal/supervisor"
Expand All @@ -34,7 +33,6 @@ var ProviderSet = wire.NewSet(
NewNotifyTargetCache,
NewParseFeedItemDigestTopic,
NewUpdateAppInfoIndexTopic,
NewSystemNotificationTopic,
)

type Angela struct {
Expand All @@ -59,7 +57,6 @@ type AngelaRepo interface {
UpdateFeedPullStatus(context.Context, *modelyesod.FeedConfig) error
GetFeedItem(context.Context, model.InternalID) (*modelfeed.Item, error)
UpdateFeedItemDigest(context.Context, *modelfeed.Item) error
UpsertSystemNotification(context.Context, model.InternalID, *modelnetzach.SystemNotification) error
}

func NewAngelaBase(
Expand Down Expand Up @@ -90,7 +87,6 @@ func NewAngela(
notifyPush *libmq.Topic[modelangela.NotifyPush],
parseFeedItem *libmq.Topic[modelangela.ParseFeedItemDigest],
updateAppIndex *libmq.Topic[modelangela.UpdateAppInfoIndex],
systemNotification *libmq.Topic[modelangela.SystemNotify],
) (*Angela, error) {
if err := mq.RegisterTopic(pullAccountInfo); err != nil {
return nil, err
Expand All @@ -116,9 +112,6 @@ func NewAngela(
if err := mq.RegisterTopic(updateAppIndex); err != nil {
return nil, err
}
if err := mq.RegisterTopic(systemNotification); err != nil {
return nil, err
}
return &Angela{
mq: mq,
}, nil
Expand Down
2 changes: 1 addition & 1 deletion app/sephirah/internal/biz/bizangela/feed.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func NewPullFeedTopic( //nolint:gocognit // TODO
a *AngelaBase,
notify *libmq.Topic[modelangela.NotifyRouter],
parse *libmq.Topic[modelangela.ParseFeedItemDigest],
systemNotify *libmq.Topic[modelangela.SystemNotify],
systemNotify *libmq.Topic[modelnetzach.SystemNotify],
) *libmq.Topic[modelyesod.PullFeed] {
return libmq.NewTopic[modelyesod.PullFeed](
"PullFeed",
Expand Down
26 changes: 0 additions & 26 deletions app/sephirah/internal/biz/bizangela/system.go

This file was deleted.

7 changes: 7 additions & 0 deletions app/sephirah/internal/biz/bizbinah/binah.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ import (
"github.com/tuihub/librarian/app/sephirah/internal/model/modelbinah"
"github.com/tuihub/librarian/internal/lib/libauth"
searcher "github.com/tuihub/protos/pkg/librarian/searcher/v1"

"github.com/google/wire"
)

var ProviderSet = wire.NewSet(
NewBinah,
NewControlBlock,
)

type Binah struct {
Expand Down
18 changes: 16 additions & 2 deletions app/sephirah/internal/biz/biznetzach/netzach.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,18 @@ import (
"github.com/tuihub/librarian/app/sephirah/internal/supervisor"
"github.com/tuihub/librarian/internal/lib/libauth"
"github.com/tuihub/librarian/internal/lib/libcache"
"github.com/tuihub/librarian/internal/lib/libmq"
"github.com/tuihub/librarian/internal/lib/logger"
"github.com/tuihub/librarian/internal/model"
pb "github.com/tuihub/protos/pkg/librarian/sephirah/v1"

"github.com/go-kratos/kratos/v2/errors"
"github.com/google/wire"
)

var ProviderSet = wire.NewSet(
NewNetzach,
NewSystemNotificationTopic,
)

type NetzachRepo interface {
Expand All @@ -30,6 +37,8 @@ type NetzachRepo interface {
[]*modelnetzach.NotifyFlow, int64, error)
GetNotifyFlow(context.Context, model.InternalID) (*modelnetzach.NotifyFlow, error)
GetNotifyFlowIDsWithFeed(context.Context, model.InternalID) ([]model.InternalID, error)

UpsertSystemNotification(context.Context, model.InternalID, *modelnetzach.SystemNotification) error
ListSystemNotifications(context.Context, model.Paging, *model.InternalID, []modelnetzach.SystemNotificationType,
[]modelnetzach.SystemNotificationLevel, []modelnetzach.SystemNotificationStatus) (
[]*modelnetzach.SystemNotification, int64, error)
Expand All @@ -48,10 +57,15 @@ func NewNetzach(
repo NetzachRepo,
supv *supervisor.Supervisor,
sClient *client.Searcher,
mq *libmq.MQ,
notifySourceCache *libcache.Map[model.InternalID, modelangela.FeedToNotifyFlowValue],
notifyFlowCache *libcache.Map[model.InternalID, modelnetzach.NotifyFlow],
notifyTargetCache *libcache.Map[model.InternalID, modelnetzach.NotifyTarget],
) *Netzach {
systemNotification *libmq.Topic[modelnetzach.SystemNotify],
) (*Netzach, error) {
if err := mq.RegisterTopic(systemNotification); err != nil {
return nil, err
}
y := &Netzach{
repo,
supv,
Expand All @@ -60,7 +74,7 @@ func NewNetzach(
notifyFlowCache,
notifyTargetCache,
}
return y
return y, nil
}

func (n *Netzach) CreateNotifyTarget(ctx context.Context, target *modelnetzach.NotifyTarget) (
Expand Down
28 changes: 28 additions & 0 deletions app/sephirah/internal/biz/biznetzach/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package biznetzach

import (
"context"

"github.com/tuihub/librarian/app/sephirah/internal/client"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach"
"github.com/tuihub/librarian/internal/lib/libmq"
)

func NewSystemNotificationTopic(
repo NetzachRepo,
searcher *client.Searcher,
) *libmq.Topic[modelnetzach.SystemNotify] {
return libmq.NewTopic[modelnetzach.SystemNotify](
"SystemNotify",
func(ctx context.Context, r *modelnetzach.SystemNotify) error {
if r.Notification.ID == 0 {
id, err := searcher.NewID(ctx)
if err != nil {
return err
}
r.Notification.ID = id
}
return repo.UpsertSystemNotification(ctx, r.UserID, &r.Notification)
},
)
}
13 changes: 6 additions & 7 deletions app/sephirah/internal/biz/bizyesod/yesod.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

"github.com/tuihub/librarian/app/sephirah/internal/client"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelangela"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach"
"github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod"
Expand Down Expand Up @@ -62,7 +61,7 @@ type Yesod struct {
// mapper mapper.LibrarianMapperServiceClient
searcher *client.Searcher
pullFeed *libmq.Topic[modelyesod.PullFeed]
systemNotify *libmq.Topic[modelangela.SystemNotify]
systemNotify *libmq.Topic[modelnetzach.SystemNotify]
feedOwner *libcache.Map[modelyesod.FeedConfig, modeltiphereth.User]
}

Expand All @@ -73,7 +72,7 @@ func NewYesod(
// mClient mapper.LibrarianMapperServiceClient,
sClient *client.Searcher,
pullFeed *libmq.Topic[modelyesod.PullFeed],
systemNotify *libmq.Topic[modelangela.SystemNotify],
systemNotify *libmq.Topic[modelnetzach.SystemNotify],
feedOwner *libcache.Map[modelyesod.FeedConfig, modeltiphereth.User],
) (*Yesod, error) {
y := &Yesod{
Expand Down Expand Up @@ -102,23 +101,23 @@ func (y *Yesod) PullFeeds(ctx context.Context) error {
}
var errRes error
for _, c := range configs {
doNotify := func() *modelangela.SystemNotify {
doNotify := func() *modelnetzach.SystemNotify {
var owner *modeltiphereth.User
owner, err = y.feedOwner.GetWithFallBack(ctx, *c, nil)
if err != nil {
return nil
}
un := modelangela.NewUserNotify(
un := modelnetzach.NewUserNotify(
owner.ID,
modelnetzach.SystemNotificationLevelOngoing,
fmt.Sprintf("Scheduled Server Task: Update Feed %s", c.Name),
fmt.Sprintf("%s: Update Feed %s", modelnetzach.SystemNotifyTitleCronJob, c.Name),
"Queued",
)
un.Notification.ID, err = y.searcher.NewID(ctx)
if err != nil {
return nil
}
err = y.systemNotify.Publish(ctx, un)
err = y.systemNotify.PublishFallsLocalCall(ctx, un)
if err != nil {
return nil
}
Expand Down
32 changes: 0 additions & 32 deletions app/sephirah/internal/data/angela.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package data

import (
"context"
"fmt"
"time"

"github.com/tuihub/librarian/app/sephirah/internal/biz/bizangela"
Expand All @@ -14,9 +13,7 @@ import (
"github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/feed"
"github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/feedconfig"
"github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/feeditem"
"github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/systemnotification"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelgebura"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach"
"github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth"
"github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod"
"github.com/tuihub/librarian/internal/model"
Expand Down Expand Up @@ -313,32 +310,3 @@ func (a *angelaRepo) UpdateFeedItemDigest(ctx context.Context, item *modelfeed.I
Exec(ctx)
return err
}

func (a *angelaRepo) UpsertSystemNotification(
ctx context.Context,
userID model.InternalID,
notification *modelnetzach.SystemNotification,
) error {
n, err := a.data.db.SystemNotification.Get(ctx, notification.ID)
if err == nil && n != nil && len(n.Content) > 0 {
notification.Content = fmt.Sprintf("%s\n%s", n.Content, notification.Content)
}
q := a.data.db.SystemNotification.Create().
SetID(notification.ID).
SetType(converter.ToEntSystemNotificationType(notification.Type)).
SetLevel(converter.ToEntSystemNotificationLevel(notification.Level)).
SetStatus(converter.ToEntSystemNotificationStatus(notification.Status)).
SetTitle(notification.Title).
SetContent(notification.Content)
if notification.Type == modelnetzach.SystemNotificationTypeUser {
q.SetUserID(userID)
}
return q.OnConflict(
sql.ConflictColumns(systemnotification.FieldID),
resolveWithIgnores([]string{
systemnotification.FieldID,
systemnotification.FieldUserID,
systemnotification.FieldType,
}),
).Exec(ctx)
}
Loading

0 comments on commit c66f20c

Please sign in to comment.