diff --git a/app/sephirah/cmd/sephirah/wire_gen.go b/app/sephirah/cmd/sephirah/wire_gen.go index 11e88a2..33bddd2 100644 --- a/app/sephirah/cmd/sephirah/wire_gen.go +++ b/app/sephirah/cmd/sephirah/wire_gen.go @@ -65,20 +65,22 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c cleanup() return nil, nil, err } - supervisorSupervisor, err := supervisor.NewSupervisor(porter, libauthAuth, clientPorter) + netzachRepo := data.NewNetzachRepo(dataData) + librarianSearcherServiceClient, err := client2.NewSearcherClient(consul) if err != nil { cleanup2() cleanup() return nil, nil, err } - geburaRepo := data.NewGeburaRepo(dataData) - librarianSearcherServiceClient, err := client2.NewSearcherClient(consul) + searcher := client.NewSearcher(librarianSearcherServiceClient) + topic := biznetzach.NewSystemNotificationTopic(netzachRepo, searcher) + supervisorSupervisor, err := supervisor.NewSupervisor(porter, libauthAuth, clientPorter, topic) if err != nil { cleanup2() cleanup() return nil, nil, err } - searcher := client.NewSearcher(librarianSearcherServiceClient) + geburaRepo := data.NewGeburaRepo(dataData) angelaBase, err := bizangela.NewAngelaBase(angelaRepo, supervisorSupervisor, geburaRepo, librarianPorterServiceClient, searcher) if err != nil { cleanup2() @@ -92,20 +94,18 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c return nil, nil, err } libcacheMap := bizangela.NewAppInfoCache(geburaRepo, store) - topic := bizangela.NewUpdateAppInfoIndexTopic(angelaBase) - libmqTopic := bizangela.NewPullAppInfoTopic(angelaBase, libcacheMap, topic) - topic2 := bizangela.NewPullAccountAppInfoRelationTopic(angelaBase, libmqTopic) - topic3 := bizangela.NewPullAccountTopic(angelaBase, topic2) - netzachRepo := data.NewNetzachRepo(dataData) + libmqTopic := bizangela.NewUpdateAppInfoIndexTopic(angelaBase) + topic2 := bizangela.NewPullAppInfoTopic(angelaBase, libcacheMap, libmqTopic) + topic3 := bizangela.NewPullAccountAppInfoRelationTopic(angelaBase, topic2) + topic4 := bizangela.NewPullAccountTopic(angelaBase, topic3) map2 := bizangela.NewNotifyFlowCache(netzachRepo, store) map3 := bizangela.NewFeedToNotifyFlowCache(netzachRepo, store) map4 := bizangela.NewNotifyTargetCache(netzachRepo, store) - topic4 := bizangela.NewNotifyPushTopic(angelaBase, map4) - topic5 := bizangela.NewNotifyRouterTopic(angelaBase, map2, map3, topic4) - topic6 := bizangela.NewParseFeedItemDigestTopic(angelaBase) - topic7 := bizangela.NewSystemNotificationTopic(angelaBase) - topic8 := bizangela.NewPullFeedTopic(angelaBase, topic5, topic6, topic7) - angela, err := bizangela.NewAngela(libmqMQ, topic3, topic2, libmqTopic, topic8, topic5, topic4, topic6, topic, topic7) + topic5 := bizangela.NewNotifyPushTopic(angelaBase, map4) + topic6 := bizangela.NewNotifyRouterTopic(angelaBase, map2, map3, topic5) + topic7 := bizangela.NewParseFeedItemDigestTopic(angelaBase) + topic8 := bizangela.NewPullFeedTopic(angelaBase, topic6, topic7, topic) + angela, err := bizangela.NewAngela(libmqMQ, topic4, topic3, topic2, topic8, topic6, topic5, topic7, libmqTopic) if err != nil { cleanup2() cleanup() @@ -119,13 +119,13 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c return nil, nil, err } key := biztiphereth.NewUserCountCache(tipherethRepo, store) - tiphereth, err := biztiphereth.NewTiphereth(settings, tipherethRepo, libauthAuth, supervisorSupervisor, searcher, topic3, cron, key) + tiphereth, err := biztiphereth.NewTiphereth(settings, tipherethRepo, libauthAuth, supervisorSupervisor, searcher, topic4, cron, key) if err != nil { cleanup2() cleanup() return nil, nil, err } - gebura := bizgebura.NewGebura(geburaRepo, libauthAuth, searcher, librarianPorterServiceClient, supervisorSupervisor, topic, libmqTopic, libcacheMap) + gebura := bizgebura.NewGebura(geburaRepo, libauthAuth, searcher, librarianPorterServiceClient, supervisorSupervisor, libmqTopic, topic2, libcacheMap) binahRepo, err := data.NewBinahRepo(s3) if err != nil { cleanup2() @@ -136,13 +136,18 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c binah := bizbinah.NewBinah(binahRepo, controlBlock, libauthAuth, librarianSearcherServiceClient) yesodRepo := data.NewYesodRepo(dataData) map5 := bizyesod.NewFeedOwnerCache(yesodRepo, store) - yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic7, map5) + yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic, map5) + if err != nil { + cleanup2() + cleanup() + return nil, nil, err + } + netzach, err := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, libmqMQ, map3, map2, map4, topic) if err != nil { cleanup2() cleanup() return nil, nil, err } - netzach := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, map3, map2, map4) chesedRepo := data.NewChesedRepo(dataData) librarianMinerServiceClient, err := client2.NewMinerClient(consul) if err != nil { diff --git a/app/sephirah/internal/biz/biz.go b/app/sephirah/internal/biz/biz.go index 43004d6..2fa0676 100644 --- a/app/sephirah/internal/biz/biz.go +++ b/app/sephirah/internal/biz/biz.go @@ -17,9 +17,8 @@ var ProviderSet = wire.NewSet( bizangela.ProviderSet, biztiphereth.ProviderSet, bizgebura.NewGebura, - bizbinah.NewBinah, - bizbinah.NewControlBlock, + bizbinah.ProviderSet, bizyesod.ProviderSet, - biznetzach.NewNetzach, + biznetzach.ProviderSet, bizchesed.ProviderSet, ) diff --git a/app/sephirah/internal/biz/bizangela/angela.go b/app/sephirah/internal/biz/bizangela/angela.go index db6e9fc..c9fc5e0 100644 --- a/app/sephirah/internal/biz/bizangela/angela.go +++ b/app/sephirah/internal/biz/bizangela/angela.go @@ -7,7 +7,6 @@ import ( "github.com/tuihub/librarian/app/sephirah/internal/client" "github.com/tuihub/librarian/app/sephirah/internal/model/modelangela" "github.com/tuihub/librarian/app/sephirah/internal/model/modelgebura" - "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth" "github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod" "github.com/tuihub/librarian/app/sephirah/internal/supervisor" @@ -34,7 +33,6 @@ var ProviderSet = wire.NewSet( NewNotifyTargetCache, NewParseFeedItemDigestTopic, NewUpdateAppInfoIndexTopic, - NewSystemNotificationTopic, ) type Angela struct { @@ -59,7 +57,6 @@ type AngelaRepo interface { UpdateFeedPullStatus(context.Context, *modelyesod.FeedConfig) error GetFeedItem(context.Context, model.InternalID) (*modelfeed.Item, error) UpdateFeedItemDigest(context.Context, *modelfeed.Item) error - UpsertSystemNotification(context.Context, model.InternalID, *modelnetzach.SystemNotification) error } func NewAngelaBase( @@ -90,7 +87,6 @@ func NewAngela( notifyPush *libmq.Topic[modelangela.NotifyPush], parseFeedItem *libmq.Topic[modelangela.ParseFeedItemDigest], updateAppIndex *libmq.Topic[modelangela.UpdateAppInfoIndex], - systemNotification *libmq.Topic[modelangela.SystemNotify], ) (*Angela, error) { if err := mq.RegisterTopic(pullAccountInfo); err != nil { return nil, err @@ -116,9 +112,6 @@ func NewAngela( if err := mq.RegisterTopic(updateAppIndex); err != nil { return nil, err } - if err := mq.RegisterTopic(systemNotification); err != nil { - return nil, err - } return &Angela{ mq: mq, }, nil diff --git a/app/sephirah/internal/biz/bizangela/feed.go b/app/sephirah/internal/biz/bizangela/feed.go index 1ecb13a..fff31e0 100644 --- a/app/sephirah/internal/biz/bizangela/feed.go +++ b/app/sephirah/internal/biz/bizangela/feed.go @@ -23,7 +23,7 @@ func NewPullFeedTopic( //nolint:gocognit // TODO a *AngelaBase, notify *libmq.Topic[modelangela.NotifyRouter], parse *libmq.Topic[modelangela.ParseFeedItemDigest], - systemNotify *libmq.Topic[modelangela.SystemNotify], + systemNotify *libmq.Topic[modelnetzach.SystemNotify], ) *libmq.Topic[modelyesod.PullFeed] { return libmq.NewTopic[modelyesod.PullFeed]( "PullFeed", diff --git a/app/sephirah/internal/biz/bizangela/system.go b/app/sephirah/internal/biz/bizangela/system.go deleted file mode 100644 index 8bff19c..0000000 --- a/app/sephirah/internal/biz/bizangela/system.go +++ /dev/null @@ -1,26 +0,0 @@ -package bizangela - -import ( - "context" - - "github.com/tuihub/librarian/app/sephirah/internal/model/modelangela" - "github.com/tuihub/librarian/internal/lib/libmq" -) - -func NewSystemNotificationTopic( - a *AngelaBase, -) *libmq.Topic[modelangela.SystemNotify] { - return libmq.NewTopic[modelangela.SystemNotify]( - "SystemNotify", - func(ctx context.Context, r *modelangela.SystemNotify) error { - if r.Notification.ID == 0 { - id, err := a.searcher.NewID(ctx) - if err != nil { - return err - } - r.Notification.ID = id - } - return a.repo.UpsertSystemNotification(ctx, r.UserID, &r.Notification) - }, - ) -} diff --git a/app/sephirah/internal/biz/bizbinah/binah.go b/app/sephirah/internal/biz/bizbinah/binah.go index ba81bbc..8ab1c07 100644 --- a/app/sephirah/internal/biz/bizbinah/binah.go +++ b/app/sephirah/internal/biz/bizbinah/binah.go @@ -8,6 +8,13 @@ import ( "github.com/tuihub/librarian/app/sephirah/internal/model/modelbinah" "github.com/tuihub/librarian/internal/lib/libauth" searcher "github.com/tuihub/protos/pkg/librarian/searcher/v1" + + "github.com/google/wire" +) + +var ProviderSet = wire.NewSet( + NewBinah, + NewControlBlock, ) type Binah struct { diff --git a/app/sephirah/internal/biz/biznetzach/netzach.go b/app/sephirah/internal/biz/biznetzach/netzach.go index 06698a7..dd0afa9 100644 --- a/app/sephirah/internal/biz/biznetzach/netzach.go +++ b/app/sephirah/internal/biz/biznetzach/netzach.go @@ -10,11 +10,18 @@ import ( "github.com/tuihub/librarian/app/sephirah/internal/supervisor" "github.com/tuihub/librarian/internal/lib/libauth" "github.com/tuihub/librarian/internal/lib/libcache" + "github.com/tuihub/librarian/internal/lib/libmq" "github.com/tuihub/librarian/internal/lib/logger" "github.com/tuihub/librarian/internal/model" pb "github.com/tuihub/protos/pkg/librarian/sephirah/v1" "github.com/go-kratos/kratos/v2/errors" + "github.com/google/wire" +) + +var ProviderSet = wire.NewSet( + NewNetzach, + NewSystemNotificationTopic, ) type NetzachRepo interface { @@ -30,6 +37,8 @@ type NetzachRepo interface { []*modelnetzach.NotifyFlow, int64, error) GetNotifyFlow(context.Context, model.InternalID) (*modelnetzach.NotifyFlow, error) GetNotifyFlowIDsWithFeed(context.Context, model.InternalID) ([]model.InternalID, error) + + UpsertSystemNotification(context.Context, model.InternalID, *modelnetzach.SystemNotification) error ListSystemNotifications(context.Context, model.Paging, *model.InternalID, []modelnetzach.SystemNotificationType, []modelnetzach.SystemNotificationLevel, []modelnetzach.SystemNotificationStatus) ( []*modelnetzach.SystemNotification, int64, error) @@ -48,10 +57,15 @@ func NewNetzach( repo NetzachRepo, supv *supervisor.Supervisor, sClient *client.Searcher, + mq *libmq.MQ, notifySourceCache *libcache.Map[model.InternalID, modelangela.FeedToNotifyFlowValue], notifyFlowCache *libcache.Map[model.InternalID, modelnetzach.NotifyFlow], notifyTargetCache *libcache.Map[model.InternalID, modelnetzach.NotifyTarget], -) *Netzach { + systemNotification *libmq.Topic[modelnetzach.SystemNotify], +) (*Netzach, error) { + if err := mq.RegisterTopic(systemNotification); err != nil { + return nil, err + } y := &Netzach{ repo, supv, @@ -60,7 +74,7 @@ func NewNetzach( notifyFlowCache, notifyTargetCache, } - return y + return y, nil } func (n *Netzach) CreateNotifyTarget(ctx context.Context, target *modelnetzach.NotifyTarget) ( diff --git a/app/sephirah/internal/biz/biznetzach/system.go b/app/sephirah/internal/biz/biznetzach/system.go new file mode 100644 index 0000000..0a8a901 --- /dev/null +++ b/app/sephirah/internal/biz/biznetzach/system.go @@ -0,0 +1,28 @@ +package biznetzach + +import ( + "context" + + "github.com/tuihub/librarian/app/sephirah/internal/client" + "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" + "github.com/tuihub/librarian/internal/lib/libmq" +) + +func NewSystemNotificationTopic( + repo NetzachRepo, + searcher *client.Searcher, +) *libmq.Topic[modelnetzach.SystemNotify] { + return libmq.NewTopic[modelnetzach.SystemNotify]( + "SystemNotify", + func(ctx context.Context, r *modelnetzach.SystemNotify) error { + if r.Notification.ID == 0 { + id, err := searcher.NewID(ctx) + if err != nil { + return err + } + r.Notification.ID = id + } + return repo.UpsertSystemNotification(ctx, r.UserID, &r.Notification) + }, + ) +} diff --git a/app/sephirah/internal/biz/bizyesod/yesod.go b/app/sephirah/internal/biz/bizyesod/yesod.go index 98c5492..1a71193 100644 --- a/app/sephirah/internal/biz/bizyesod/yesod.go +++ b/app/sephirah/internal/biz/bizyesod/yesod.go @@ -7,7 +7,6 @@ import ( "time" "github.com/tuihub/librarian/app/sephirah/internal/client" - "github.com/tuihub/librarian/app/sephirah/internal/model/modelangela" "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth" "github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod" @@ -62,7 +61,7 @@ type Yesod struct { // mapper mapper.LibrarianMapperServiceClient searcher *client.Searcher pullFeed *libmq.Topic[modelyesod.PullFeed] - systemNotify *libmq.Topic[modelangela.SystemNotify] + systemNotify *libmq.Topic[modelnetzach.SystemNotify] feedOwner *libcache.Map[modelyesod.FeedConfig, modeltiphereth.User] } @@ -73,7 +72,7 @@ func NewYesod( // mClient mapper.LibrarianMapperServiceClient, sClient *client.Searcher, pullFeed *libmq.Topic[modelyesod.PullFeed], - systemNotify *libmq.Topic[modelangela.SystemNotify], + systemNotify *libmq.Topic[modelnetzach.SystemNotify], feedOwner *libcache.Map[modelyesod.FeedConfig, modeltiphereth.User], ) (*Yesod, error) { y := &Yesod{ @@ -102,23 +101,23 @@ func (y *Yesod) PullFeeds(ctx context.Context) error { } var errRes error for _, c := range configs { - doNotify := func() *modelangela.SystemNotify { + doNotify := func() *modelnetzach.SystemNotify { var owner *modeltiphereth.User owner, err = y.feedOwner.GetWithFallBack(ctx, *c, nil) if err != nil { return nil } - un := modelangela.NewUserNotify( + un := modelnetzach.NewUserNotify( owner.ID, modelnetzach.SystemNotificationLevelOngoing, - fmt.Sprintf("Scheduled Server Task: Update Feed %s", c.Name), + fmt.Sprintf("%s: Update Feed %s", modelnetzach.SystemNotifyTitleCronJob, c.Name), "Queued", ) un.Notification.ID, err = y.searcher.NewID(ctx) if err != nil { return nil } - err = y.systemNotify.Publish(ctx, un) + err = y.systemNotify.PublishFallsLocalCall(ctx, un) if err != nil { return nil } diff --git a/app/sephirah/internal/data/angela.go b/app/sephirah/internal/data/angela.go index d835291..135bdb2 100644 --- a/app/sephirah/internal/data/angela.go +++ b/app/sephirah/internal/data/angela.go @@ -2,7 +2,6 @@ package data import ( "context" - "fmt" "time" "github.com/tuihub/librarian/app/sephirah/internal/biz/bizangela" @@ -14,9 +13,7 @@ import ( "github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/feed" "github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/feedconfig" "github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/feeditem" - "github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/systemnotification" "github.com/tuihub/librarian/app/sephirah/internal/model/modelgebura" - "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth" "github.com/tuihub/librarian/app/sephirah/internal/model/modelyesod" "github.com/tuihub/librarian/internal/model" @@ -313,32 +310,3 @@ func (a *angelaRepo) UpdateFeedItemDigest(ctx context.Context, item *modelfeed.I Exec(ctx) return err } - -func (a *angelaRepo) UpsertSystemNotification( - ctx context.Context, - userID model.InternalID, - notification *modelnetzach.SystemNotification, -) error { - n, err := a.data.db.SystemNotification.Get(ctx, notification.ID) - if err == nil && n != nil && len(n.Content) > 0 { - notification.Content = fmt.Sprintf("%s\n%s", n.Content, notification.Content) - } - q := a.data.db.SystemNotification.Create(). - SetID(notification.ID). - SetType(converter.ToEntSystemNotificationType(notification.Type)). - SetLevel(converter.ToEntSystemNotificationLevel(notification.Level)). - SetStatus(converter.ToEntSystemNotificationStatus(notification.Status)). - SetTitle(notification.Title). - SetContent(notification.Content) - if notification.Type == modelnetzach.SystemNotificationTypeUser { - q.SetUserID(userID) - } - return q.OnConflict( - sql.ConflictColumns(systemnotification.FieldID), - resolveWithIgnores([]string{ - systemnotification.FieldID, - systemnotification.FieldUserID, - systemnotification.FieldType, - }), - ).Exec(ctx) -} diff --git a/app/sephirah/internal/data/netzach.go b/app/sephirah/internal/data/netzach.go index 2b0fd5d..4a185e7 100644 --- a/app/sephirah/internal/data/netzach.go +++ b/app/sephirah/internal/data/netzach.go @@ -2,6 +2,7 @@ package data import ( "context" + "fmt" "github.com/tuihub/librarian/app/sephirah/internal/biz/biznetzach" "github.com/tuihub/librarian/app/sephirah/internal/data/internal/converter" @@ -15,6 +16,8 @@ import ( "github.com/tuihub/librarian/app/sephirah/internal/data/internal/ent/user" "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/internal/model" + + "entgo.io/ent/dialect/sql" ) type netzachRepo struct { @@ -279,6 +282,35 @@ func (n *netzachRepo) GetNotifyFlowIDsWithFeed(ctx context.Context, id model.Int return ids, nil } +func (n *netzachRepo) UpsertSystemNotification( + ctx context.Context, + userID model.InternalID, + notification *modelnetzach.SystemNotification, +) error { + old, err := n.data.db.SystemNotification.Get(ctx, notification.ID) + if err == nil && n != nil && len(old.Content) > 0 { + notification.Content = fmt.Sprintf("%s\n%s", old.Content, notification.Content) + } + q := n.data.db.SystemNotification.Create(). + SetID(notification.ID). + SetType(converter.ToEntSystemNotificationType(notification.Type)). + SetLevel(converter.ToEntSystemNotificationLevel(notification.Level)). + SetStatus(converter.ToEntSystemNotificationStatus(notification.Status)). + SetTitle(notification.Title). + SetContent(notification.Content) + if notification.Type == modelnetzach.SystemNotificationTypeUser { + q.SetUserID(userID) + } + return q.OnConflict( + sql.ConflictColumns(systemnotification.FieldID), + resolveWithIgnores([]string{ + systemnotification.FieldID, + systemnotification.FieldUserID, + systemnotification.FieldType, + }), + ).Exec(ctx) +} + func (n *netzachRepo) ListSystemNotifications(ctx context.Context, paging model.Paging, userID *model.InternalID, types []modelnetzach.SystemNotificationType, levels []modelnetzach.SystemNotificationLevel, statuses []modelnetzach.SystemNotificationStatus) ([]*modelnetzach.SystemNotification, int64, error) { q := n.data.db.SystemNotification.Query(). Order(ent.Desc(systemnotification.FieldUpdatedAt)) diff --git a/app/sephirah/internal/model/modelangela/modelangela.go b/app/sephirah/internal/model/modelangela/modelangela.go index 65cace3..2a88821 100644 --- a/app/sephirah/internal/model/modelangela/modelangela.go +++ b/app/sephirah/internal/model/modelangela/modelangela.go @@ -1,8 +1,6 @@ package modelangela import ( - "time" - "github.com/tuihub/librarian/app/sephirah/internal/model/modelgebura" "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/internal/model" @@ -40,47 +38,3 @@ type ParseFeedItemDigest struct { type UpdateAppInfoIndex struct { IDs []model.InternalID } - -type SystemNotify struct { - UserID model.InternalID - Notification modelnetzach.SystemNotification -} - -func NewSystemNotify( - level modelnetzach.SystemNotificationLevel, - title string, - content string, -) SystemNotify { - return SystemNotify{ - UserID: 0, - Notification: modelnetzach.SystemNotification{ - ID: 0, - Type: modelnetzach.SystemNotificationTypeSystem, - Level: level, - Status: modelnetzach.SystemNotificationStatusUnread, - Title: title, - Content: content, - CreateTime: time.Now(), - }, - } -} - -func NewUserNotify( - userID model.InternalID, - level modelnetzach.SystemNotificationLevel, - title string, - content string, -) SystemNotify { - return SystemNotify{ - UserID: userID, - Notification: modelnetzach.SystemNotification{ - ID: 0, - Type: modelnetzach.SystemNotificationTypeUser, - Level: level, - Status: modelnetzach.SystemNotificationStatusUnread, - Title: title, - Content: content, - CreateTime: time.Now(), - }, - } -} diff --git a/app/sephirah/internal/model/modelnetzach/modelnetzach.go b/app/sephirah/internal/model/modelnetzach/modelnetzach.go index e771841..8179821 100644 --- a/app/sephirah/internal/model/modelnetzach/modelnetzach.go +++ b/app/sephirah/internal/model/modelnetzach/modelnetzach.go @@ -92,3 +92,51 @@ const ( SystemNotificationStatusRead SystemNotificationStatusDismissed ) + +type SystemNotify struct { + UserID model.InternalID + Notification SystemNotification +} + +const ( + SystemNotifyTitleCronJob = "Server Scheduled Task" +) + +func NewSystemNotify( + level SystemNotificationLevel, + title string, + content string, +) SystemNotify { + return SystemNotify{ + UserID: 0, + Notification: SystemNotification{ + ID: 0, + Type: SystemNotificationTypeSystem, + Level: level, + Status: SystemNotificationStatusUnread, + Title: title, + Content: content, + CreateTime: time.Now(), + }, + } +} + +func NewUserNotify( + userID model.InternalID, + level SystemNotificationLevel, + title string, + content string, +) SystemNotify { + return SystemNotify{ + UserID: userID, + Notification: SystemNotification{ + ID: 0, + Type: SystemNotificationTypeUser, + Level: level, + Status: SystemNotificationStatusUnread, + Title: title, + Content: content, + CreateTime: time.Now(), + }, + } +} diff --git a/app/sephirah/internal/model/modelyesod/modelyesod.go b/app/sephirah/internal/model/modelyesod/modelyesod.go index 836b5af..f28663c 100644 --- a/app/sephirah/internal/model/modelyesod/modelyesod.go +++ b/app/sephirah/internal/model/modelyesod/modelyesod.go @@ -3,7 +3,7 @@ package modelyesod import ( "time" - "github.com/tuihub/librarian/app/sephirah/internal/model/modelangela" + "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/internal/model" "github.com/tuihub/librarian/internal/model/modelfeed" ) @@ -71,7 +71,7 @@ type PullFeed struct { InternalID model.InternalID URL string Source string - SystemNotify *modelangela.SystemNotify + SystemNotify *modelnetzach.SystemNotify } type GroupFeedItemsBy int diff --git a/app/sephirah/internal/supervisor/supervisor.go b/app/sephirah/internal/supervisor/supervisor.go index e05f8bc..f4d9209 100644 --- a/app/sephirah/internal/supervisor/supervisor.go +++ b/app/sephirah/internal/supervisor/supervisor.go @@ -3,14 +3,17 @@ package supervisor import ( "context" "errors" + "fmt" "reflect" "sync" "github.com/tuihub/librarian/app/sephirah/internal/client" "github.com/tuihub/librarian/app/sephirah/internal/model/converter" + "github.com/tuihub/librarian/app/sephirah/internal/model/modelnetzach" "github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth" "github.com/tuihub/librarian/internal/conf" "github.com/tuihub/librarian/internal/lib/libauth" + "github.com/tuihub/librarian/internal/lib/libmq" "github.com/tuihub/librarian/internal/lib/libtime" "github.com/tuihub/librarian/internal/lib/logger" porter "github.com/tuihub/protos/pkg/librarian/porter/v1" @@ -26,15 +29,18 @@ type Supervisor struct { aliveInstances map[string]*modeltiphereth.PorterInstance knownInstances map[string]*modeltiphereth.PorterInstance knownInstancesOutdated bool + refreshMu sync.Mutex featureSummary *modeltiphereth.ServerFeatureSummary muFeatureSummary sync.RWMutex trustedAddresses []string + systemNotify *libmq.Topic[modelnetzach.SystemNotify] } func NewSupervisor( c *conf.Porter, auth *libauth.Auth, porter *client.Porter, + systemNotify *libmq.Topic[modelnetzach.SystemNotify], ) (*Supervisor, error) { if c == nil { c = new(conf.Porter) @@ -45,9 +51,11 @@ func NewSupervisor( aliveInstances: map[string]*modeltiphereth.PorterInstance{}, knownInstances: nil, // init in UpdateKnownInstances knownInstancesOutdated: true, + refreshMu: sync.Mutex{}, featureSummary: new(modeltiphereth.ServerFeatureSummary), muFeatureSummary: sync.RWMutex{}, trustedAddresses: c.GetTrustedAddress(), + systemNotify: systemNotify, }, nil } @@ -69,9 +77,12 @@ func (s *Supervisor) UpdateKnownInstances(instances []*modeltiphereth.PorterInst go s.updateFeatureSummary() } -func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit // TODO +func (s *Supervisor) RefreshAliveInstances( ctx context.Context, ) ([]*modeltiphereth.PorterInstance, error) { + s.refreshMu.Lock() + defer s.refreshMu.Unlock() + if s.knownInstances == nil { return nil, errors.New("known instances not set") } @@ -82,69 +93,35 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit // TODO } newInstances := make([]*modeltiphereth.PorterInstance, 0, len(addresses)) aliveInstanceMap := make(map[string]*modeltiphereth.PorterInstance, len(addresses)) - var info *porter.GetPorterInformationResponse + hasError := false + notification := modelnetzach.NewSystemNotify( + modelnetzach.SystemNotificationLevelOngoing, + fmt.Sprintf("%s: Refresh Porter Instances", modelnetzach.SystemNotifyTitleCronJob), + fmt.Sprintf("Found %d Porter Instances", len(addresses)), + ) + for _, address := range addresses { - if address == "" { - // bad address - continue - } - info, err = s.porter.GetPorterInformation( - client.WithPorterAddress(ctx, address), - &porter.GetPorterInformationRequest{}, - ) + var ins *modeltiphereth.PorterInstance + var isNew bool + ins, isNew, err = s.evaluatePorterInstance(ctx, address) if err != nil { - // bad instance - logger.Infof("%s", err.Error()) - continue - } - if info == nil { - // bad instance - continue + logger.Errorf("%s", err.Error()) + hasError = true + notification.Notification.Content += "\n" + fmt.Sprintf("Error on %s: %s", address, err.Error()) } - feature := converter.ToBizPorterFeatureSummary(info.GetFeatureSummary()) - var ins *modeltiphereth.PorterInstance - if s.knownInstances[address] != nil { //nolint:nestif // TODO - // known instance - if s.knownInstances[address].GlobalName != info.GetGlobalName() { - // bad instance, global name changed - continue - } - ins = s.knownInstances[address] - ins.ConnectionStatus = modeltiphereth.PorterConnectionStatusConnected - if ins.Status == modeltiphereth.PorterInstanceStatusActive { - // enable & check ownership - if err2 := s.enablePorterInstance(ctx, ins); err2 != nil { - logger.Errorf("%s", err2.Error()) - ins.ConnectionStatus = modeltiphereth.PorterConnectionStatusActivationFailed - // bad instance, can't enable - aliveInstanceMap[address] = ins - continue - } - ins.ConnectionStatus = modeltiphereth.PorterConnectionStatusActive - } - if reflect.DeepEqual(ins.FeatureSummary, feature) { - // no change, but alive - aliveInstanceMap[address] = ins - continue - } - ins.FeatureSummary = feature - } else { - // new instance - ins = &modeltiphereth.PorterInstance{ - ID: 0, - Name: info.GetName(), - Version: info.GetVersion(), - GlobalName: info.GetGlobalName(), - Address: address, - FeatureSummary: feature, - Status: modeltiphereth.PorterInstanceStatusUnspecified, - ConnectionStatus: modeltiphereth.PorterConnectionStatusConnected, + if ins != nil { + aliveInstanceMap[address] = ins + if isNew { + newInstances = append(newInstances, ins) } } - // new instance or feature changed - newInstances = append(newInstances, ins) - aliveInstanceMap[address] = ins } + + if hasError { + notification.Notification.Level = modelnetzach.SystemNotificationLevelError + _ = s.systemNotify.PublishFallsLocalCall(ctx, notification) + } + s.aliveInstances = aliveInstanceMap if len(newInstances) > 0 { go s.updateFeatureSummary() @@ -162,25 +139,95 @@ func (s *Supervisor) GetInstanceConnectionStatus( return s.aliveInstances[address].ConnectionStatus } +func (s *Supervisor) evaluatePorterInstance( + ctx context.Context, + address string, +) (*modeltiphereth.PorterInstance, bool, error) { + if address == "" { + // bad address + return nil, false, errors.New("address is empty") + } + info, err := s.porter.GetPorterInformation( + client.WithPorterAddress(ctx, address), + &porter.GetPorterInformationRequest{}, + ) + if err != nil { + // bad instance + logger.Infof("%s", err.Error()) + return nil, false, err + } + if info == nil { + // bad instance + return nil, false, errors.New("info is nil") + } + feature := converter.ToBizPorterFeatureSummary(info.GetFeatureSummary()) + var ins *modeltiphereth.PorterInstance + if s.knownInstances[address] != nil { //nolint:nestif // TODO + // known instance + if s.knownInstances[address].GlobalName != info.GetGlobalName() { + // bad instance, global name changed + return nil, false, errors.New("global name changed") + } + ins = s.knownInstances[address] + ins.ConnectionStatus = modeltiphereth.PorterConnectionStatusConnected + if ins.Status == modeltiphereth.PorterInstanceStatusActive { + // enable & check ownership + if err2 := s.enablePorterInstance(ctx, ins); err2 != nil { + logger.Errorf("%s", err2.Error()) + ins.ConnectionStatus = modeltiphereth.PorterConnectionStatusActivationFailed + // bad instance, can't enable + return ins, false, err2 + } + ins.ConnectionStatus = modeltiphereth.PorterConnectionStatusActive + } + if reflect.DeepEqual(ins.FeatureSummary, feature) { + // no change, but alive + return ins, false, nil + } + ins.FeatureSummary = feature + } else { + // new instance + ins = &modeltiphereth.PorterInstance{ + ID: 0, + Name: info.GetName(), + Version: info.GetVersion(), + GlobalName: info.GetGlobalName(), + Address: address, + FeatureSummary: feature, + Status: modeltiphereth.PorterInstanceStatusUnspecified, + ConnectionStatus: modeltiphereth.PorterConnectionStatusConnected, + } + } + // new instance or feature changed + return ins, true, nil +} + // EnablePorterInstance enable porter instance, can be called multiple times. func (s *Supervisor) enablePorterInstance(ctx context.Context, instance *modeltiphereth.PorterInstance) error { if instance == nil { return errors.New("instance is nil") } - refreshToken, err := s.auth.GenerateToken( - instance.ID, - 0, - libauth.ClaimsTypeRefreshToken, - libauth.UserTypePorter, - nil, - libtime.Hour, - ) - if err != nil { - return err - } - _, err = s.porter.EnablePorter(client.WithPorterAddress(ctx, instance.Address), &porter.EnablePorterRequest{ + _, err := s.porter.EnablePorter(client.WithPorterAddress(ctx, instance.Address), &porter.EnablePorterRequest{ SephirahId: 0, - RefreshToken: refreshToken, + RefreshToken: "", }) + if err != nil { + var refreshToken string + refreshToken, err = s.auth.GenerateToken( + instance.ID, + 0, + libauth.ClaimsTypeRefreshToken, + libauth.UserTypePorter, + nil, + libtime.Hour, + ) + if err != nil { + return err + } + _, err = s.porter.EnablePorter(client.WithPorterAddress(ctx, instance.Address), &porter.EnablePorterRequest{ + SephirahId: 0, + RefreshToken: refreshToken, + }) + } return err } diff --git a/app/sephirah/pkg/service/wire_gen.go b/app/sephirah/pkg/service/wire_gen.go index 5ccdc49..2f998d1 100644 --- a/app/sephirah/pkg/service/wire_gen.go +++ b/app/sephirah/pkg/service/wire_gen.go @@ -49,45 +49,45 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data cleanup() return nil, nil, err } - supervisorSupervisor, err := supervisor.NewSupervisor(porter, auth, clientPorter) + netzachRepo := data.NewNetzachRepo(dataData) + searcher := client.NewSearcher(librarianSearcherServiceClient) + topic := biznetzach.NewSystemNotificationTopic(netzachRepo, searcher) + supervisorSupervisor, err := supervisor.NewSupervisor(porter, auth, clientPorter, topic) if err != nil { cleanup() return nil, nil, err } geburaRepo := data.NewGeburaRepo(dataData) - searcher := client.NewSearcher(librarianSearcherServiceClient) angelaBase, err := bizangela.NewAngelaBase(angelaRepo, supervisorSupervisor, geburaRepo, librarianPorterServiceClient, searcher) if err != nil { cleanup() return nil, nil, err } libcacheMap := bizangela.NewAppInfoCache(geburaRepo, store) - topic := bizangela.NewUpdateAppInfoIndexTopic(angelaBase) - libmqTopic := bizangela.NewPullAppInfoTopic(angelaBase, libcacheMap, topic) - topic2 := bizangela.NewPullAccountAppInfoRelationTopic(angelaBase, libmqTopic) - topic3 := bizangela.NewPullAccountTopic(angelaBase, topic2) - netzachRepo := data.NewNetzachRepo(dataData) + libmqTopic := bizangela.NewUpdateAppInfoIndexTopic(angelaBase) + topic2 := bizangela.NewPullAppInfoTopic(angelaBase, libcacheMap, libmqTopic) + topic3 := bizangela.NewPullAccountAppInfoRelationTopic(angelaBase, topic2) + topic4 := bizangela.NewPullAccountTopic(angelaBase, topic3) map2 := bizangela.NewNotifyFlowCache(netzachRepo, store) map3 := bizangela.NewFeedToNotifyFlowCache(netzachRepo, store) map4 := bizangela.NewNotifyTargetCache(netzachRepo, store) - topic4 := bizangela.NewNotifyPushTopic(angelaBase, map4) - topic5 := bizangela.NewNotifyRouterTopic(angelaBase, map2, map3, topic4) - topic6 := bizangela.NewParseFeedItemDigestTopic(angelaBase) - topic7 := bizangela.NewSystemNotificationTopic(angelaBase) - topic8 := bizangela.NewPullFeedTopic(angelaBase, topic5, topic6, topic7) - angela, err := bizangela.NewAngela(mq, topic3, topic2, libmqTopic, topic8, topic5, topic4, topic6, topic, topic7) + topic5 := bizangela.NewNotifyPushTopic(angelaBase, map4) + topic6 := bizangela.NewNotifyRouterTopic(angelaBase, map2, map3, topic5) + topic7 := bizangela.NewParseFeedItemDigestTopic(angelaBase) + topic8 := bizangela.NewPullFeedTopic(angelaBase, topic6, topic7, topic) + angela, err := bizangela.NewAngela(mq, topic4, topic3, topic2, topic8, topic6, topic5, topic7, libmqTopic) if err != nil { cleanup() return nil, nil, err } tipherethRepo := data.NewTipherethRepo(dataData) key := biztiphereth.NewUserCountCache(tipherethRepo, store) - tiphereth, err := biztiphereth.NewTiphereth(settings, tipherethRepo, auth, supervisorSupervisor, searcher, topic3, cron, key) + tiphereth, err := biztiphereth.NewTiphereth(settings, tipherethRepo, auth, supervisorSupervisor, searcher, topic4, cron, key) if err != nil { cleanup() return nil, nil, err } - gebura := bizgebura.NewGebura(geburaRepo, auth, searcher, librarianPorterServiceClient, supervisorSupervisor, topic, libmqTopic, libcacheMap) + gebura := bizgebura.NewGebura(geburaRepo, auth, searcher, librarianPorterServiceClient, supervisorSupervisor, libmqTopic, topic2, libcacheMap) binahRepo, err := data.NewBinahRepo(s3) if err != nil { cleanup() @@ -97,12 +97,16 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data binah := bizbinah.NewBinah(binahRepo, controlBlock, auth, librarianSearcherServiceClient) yesodRepo := data.NewYesodRepo(dataData) map5 := bizyesod.NewFeedOwnerCache(yesodRepo, store) - yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic7, map5) + yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic, map5) + if err != nil { + cleanup() + return nil, nil, err + } + netzach, err := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, mq, map3, map2, map4, topic) if err != nil { cleanup() return nil, nil, err } - netzach := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, map3, map2, map4) chesedRepo := data.NewChesedRepo(dataData) map6 := bizchesed.NewImageCache(store) chesed, err := bizchesed.NewChesed(chesedRepo, binahRepo, cron, librarianPorterServiceClient, searcher, librarianMinerServiceClient, controlBlock, map6)