From 400890bf060e2c2993201aa5bafcca1579d6ab9d Mon Sep 17 00:00:00 2001 From: MuZhou233 Date: Wed, 28 Aug 2024 15:34:34 +0100 Subject: [PATCH] feat: impl porter context control --- app/sephirah/cmd/sephirah/wire_gen.go | 29 ++-- .../internal/biz/biztiphereth/tiphereth.go | 20 +++ .../data/internal/converter/ent_to_biz.go | 2 - app/sephirah/internal/data/tiphereth.go | 11 ++ .../internal/model/converter/biz_to_pb.go | 5 +- .../internal/model/converter/generated.go | 20 ++- .../model/modelsupervisor/modelsupervisor.go | 24 ++-- app/sephirah/internal/service/tiphereth.go | 11 +- .../internal/supervisor/supervisor.go | 131 +++++++++++++----- app/sephirah/internal/supervisor/task.go | 108 +++++++++++++++ app/sephirah/pkg/service/wire_gen.go | 29 ++-- internal/lib/libtype/slice.go | 32 +++++ 12 files changed, 333 insertions(+), 89 deletions(-) create mode 100644 app/sephirah/internal/supervisor/task.go create mode 100644 internal/lib/libtype/slice.go diff --git a/app/sephirah/cmd/sephirah/wire_gen.go b/app/sephirah/cmd/sephirah/wire_gen.go index 5f6a77e..bf896d2 100644 --- a/app/sephirah/cmd/sephirah/wire_gen.go +++ b/app/sephirah/cmd/sephirah/wire_gen.go @@ -82,7 +82,8 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c return nil, nil, err } libcacheMap := biztiphereth.NewPorterInstanceCache(tipherethRepo, store) - supervisorSupervisor, err := supervisor.NewSupervisor(porter, libauthAuth, clientPorter, topic, libcacheMap) + map2 := biztiphereth.NewPorterContextCache(tipherethRepo, store) + supervisorSupervisor, err := supervisor.NewSupervisor(porter, libauthAuth, clientPorter, topic, libcacheMap, map2) if err != nil { cleanup2() cleanup() @@ -95,16 +96,16 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c cleanup() return nil, nil, err } - map2 := bizangela.NewAppInfoCache(geburaRepo, store) + map3 := bizangela.NewAppInfoCache(geburaRepo, store) libmqTopic := bizangela.NewUpdateAppInfoIndexTopic(angelaBase) - topic2 := bizangela.NewPullAppInfoTopic(angelaBase, map2, libmqTopic) + topic2 := bizangela.NewPullAppInfoTopic(angelaBase, map3, libmqTopic) topic3 := bizangela.NewPullAccountAppInfoRelationTopic(angelaBase, topic2) topic4 := bizangela.NewPullAccountTopic(angelaBase, topic3) - map3 := bizangela.NewNotifyFlowCache(netzachRepo, store) - map4 := bizangela.NewFeedToNotifyFlowCache(netzachRepo, store) - map5 := bizangela.NewNotifyTargetCache(netzachRepo, store) - topic5 := bizangela.NewNotifyPushTopic(angelaBase, map5) - topic6 := bizangela.NewNotifyRouterTopic(angelaBase, map3, map4, topic5) + map4 := bizangela.NewNotifyFlowCache(netzachRepo, store) + map5 := bizangela.NewFeedToNotifyFlowCache(netzachRepo, store) + map6 := bizangela.NewNotifyTargetCache(netzachRepo, store) + topic5 := bizangela.NewNotifyPushTopic(angelaBase, map6) + topic6 := bizangela.NewNotifyRouterTopic(angelaBase, map4, map5, topic5) topic7 := bizangela.NewFeedItemPostprocessTopic(angelaBase, topic6, topic) topic8 := bizangela.NewPullFeedTopic(angelaBase, topic7, topic) angela, err := bizangela.NewAngela(libmqMQ, topic4, topic3, topic2, topic8, topic6, topic5, topic7, libmqTopic) @@ -126,7 +127,7 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c cleanup() return nil, nil, err } - gebura := bizgebura.NewGebura(geburaRepo, libauthAuth, searcher, librarianPorterServiceClient, supervisorSupervisor, libmqTopic, topic2, map2) + gebura := bizgebura.NewGebura(geburaRepo, libauthAuth, searcher, librarianPorterServiceClient, supervisorSupervisor, libmqTopic, topic2, map3) binahRepo, err := data.NewBinahRepo(s3) if err != nil { cleanup2() @@ -136,14 +137,14 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c controlBlock := bizbinah.NewControlBlock(libauthAuth) binah := bizbinah.NewBinah(binahRepo, controlBlock, libauthAuth, librarianSearcherServiceClient) yesodRepo := data.NewYesodRepo(dataData) - map6 := bizyesod.NewFeedOwnerCache(yesodRepo, store) - yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic, map6) + map7 := bizyesod.NewFeedOwnerCache(yesodRepo, store) + yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic, map7) if err != nil { cleanup2() cleanup() return nil, nil, err } - netzach, err := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, libmqMQ, map4, map3, map5, topic) + netzach, err := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, libmqMQ, map5, map4, map6, topic) if err != nil { cleanup2() cleanup() @@ -156,8 +157,8 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c cleanup() return nil, nil, err } - map7 := bizchesed.NewImageCache(store) - chesed, err := bizchesed.NewChesed(chesedRepo, binahRepo, cron, librarianPorterServiceClient, searcher, librarianMinerServiceClient, controlBlock, map7) + map8 := bizchesed.NewImageCache(store) + chesed, err := bizchesed.NewChesed(chesedRepo, binahRepo, cron, librarianPorterServiceClient, searcher, librarianMinerServiceClient, controlBlock, map8) if err != nil { cleanup2() cleanup() diff --git a/app/sephirah/internal/biz/biztiphereth/tiphereth.go b/app/sephirah/internal/biz/biztiphereth/tiphereth.go index e311399..2b4bfa5 100644 --- a/app/sephirah/internal/biz/biztiphereth/tiphereth.go +++ b/app/sephirah/internal/biz/biztiphereth/tiphereth.go @@ -2,6 +2,7 @@ package biztiphereth import ( "context" + "strconv" "github.com/tuihub/librarian/app/sephirah/internal/client" "github.com/tuihub/librarian/app/sephirah/internal/model/modelsupervisor" @@ -23,6 +24,7 @@ var ProviderSet = wire.NewSet( NewTiphereth, NewUserCountCache, NewPorterInstanceCache, + NewPorterContextCache, ) type TipherethRepo interface { @@ -45,6 +47,7 @@ type TipherethRepo interface { CreatePorterContext(context.Context, model.InternalID, *modelsupervisor.PorterContext) error ListPorterContexts(context.Context, model.InternalID, model.Paging) ([]*modelsupervisor.PorterContext, int64, error) UpdatePorterContext(context.Context, model.InternalID, *modelsupervisor.PorterContext) error + FetchPorterContext(context.Context, model.InternalID) (*modelsupervisor.PorterContext, error) CreateDevice(context.Context, *modeltiphereth.DeviceInfo) error ListUserSessions(context.Context, model.InternalID) ([]*modeltiphereth.UserSession, error) DeleteUserSession(context.Context, model.InternalID, model.InternalID) error @@ -185,3 +188,20 @@ func NewPorterInstanceCache( libcache.WithExpiration(libtime.SevenDays), ) } + +func NewPorterContextCache( + t TipherethRepo, + store libcache.Store, +) *libcache.Map[model.InternalID, modelsupervisor.PorterContext] { + return libcache.NewMap[model.InternalID, modelsupervisor.PorterContext]( + store, + "PorterContextCache", + func(k model.InternalID) string { + return strconv.FormatInt(int64(k), 10) + }, + func(ctx context.Context, k model.InternalID) (*modelsupervisor.PorterContext, error) { + return t.FetchPorterContext(ctx, k) + }, + libcache.WithExpiration(libtime.SevenDays), + ) +} diff --git a/app/sephirah/internal/data/internal/converter/ent_to_biz.go b/app/sephirah/internal/data/internal/converter/ent_to_biz.go index 40d51b4..de3ae0b 100644 --- a/app/sephirah/internal/data/internal/converter/ent_to_biz.go +++ b/app/sephirah/internal/data/internal/converter/ent_to_biz.go @@ -76,8 +76,6 @@ type toBizConverter interface { //nolint:unused // used by generator // goverter:enum:map StatusBlocked UserStatusBlocked ToBizPorterStatus(porterinstance.Status) modeltiphereth.UserStatus - // goverter:ignore HandleStatus - // goverter:ignore HandleStatusMessage ToBizPorterContext(*ent.PorterContext) *modelsupervisor.PorterContext ToBizPorterContextList([]*ent.PorterContext) []*modelsupervisor.PorterContext // goverter:enum:unknown PorterContextStatusUnspecified diff --git a/app/sephirah/internal/data/tiphereth.go b/app/sephirah/internal/data/tiphereth.go index 43be1e9..5af88bb 100644 --- a/app/sephirah/internal/data/tiphereth.go +++ b/app/sephirah/internal/data/tiphereth.go @@ -534,3 +534,14 @@ func (t tipherethRepo) ListPorterGroups( } return pg, nil } + +func (t tipherethRepo) FetchPorterContext( + ctx context.Context, + id model.InternalID, +) (*modelsupervisor.PorterContext, error) { + res, err := t.data.db.PorterContext.Get(ctx, id) + if err != nil { + return nil, err + } + return converter.ToBizPorterContext(res), nil +} diff --git a/app/sephirah/internal/model/converter/biz_to_pb.go b/app/sephirah/internal/model/converter/biz_to_pb.go index ec1a83e..1bbe69d 100644 --- a/app/sephirah/internal/model/converter/biz_to_pb.go +++ b/app/sephirah/internal/model/converter/biz_to_pb.go @@ -86,8 +86,9 @@ type toPBConverter interface { //nolint:unused // used by generator // goverter:enum:map PorterConnectionStatusDowngraded PorterConnectionStatus_PORTER_CONNECTION_STATUS_DOWNGRADED ToPBPorterConnectionStatus(modelsupervisor.PorterConnectionStatus) pb.PorterConnectionStatus - ToPBPorterContext(*modelsupervisor.PorterContext) *pb.PorterContext - ToPBPorterContextList([]*modelsupervisor.PorterContext) []*pb.PorterContext + // goverter:autoMap PorterContext + ToPBPorterContext(*modelsupervisor.PorterContextController) *pb.PorterContext + ToPBPorterContextList([]*modelsupervisor.PorterContextController) []*pb.PorterContext // goverter:enum:unknown PorterContextStatus_PORTER_CONTEXT_STATUS_UNSPECIFIED // goverter:enum:map PorterContextStatusUnspecified PorterContextStatus_PORTER_CONTEXT_STATUS_UNSPECIFIED // goverter:enum:map PorterContextStatusActive PorterContextStatus_PORTER_CONTEXT_STATUS_ACTIVE diff --git a/app/sephirah/internal/model/converter/generated.go b/app/sephirah/internal/model/converter/generated.go index f7b9112..a9c109a 100755 --- a/app/sephirah/internal/model/converter/generated.go +++ b/app/sephirah/internal/model/converter/generated.go @@ -520,8 +520,6 @@ func ToBizPorterContext(source *v11.PorterContext) *modelsupervisor.PorterContex modelsupervisorPorterContext.Name = (*source).Name modelsupervisorPorterContext.Description = (*source).Description modelsupervisorPorterContext.Status = ToBizPorterContextStatus((*source).Status) - modelsupervisorPorterContext.HandleStatus = ToBizPorterContextHandleStatus((*source).HandleStatus) - modelsupervisorPorterContext.HandleStatusMessage = (*source).HandleStatusMessage pModelsupervisorPorterContext = &modelsupervisorPorterContext } return pModelsupervisorPorterContext @@ -1427,17 +1425,17 @@ func ToPBPorterConnectionStatus(source modelsupervisor.PorterConnectionStatus) v } return v1PorterConnectionStatus } -func ToPBPorterContext(source *modelsupervisor.PorterContext) *v11.PorterContext { +func ToPBPorterContext(source *modelsupervisor.PorterContextController) *v11.PorterContext { var pV1PorterContext *v11.PorterContext if source != nil { var v1PorterContext v11.PorterContext - v1PorterContext.Id = ToPBInternalID((*source).ID) - v1PorterContext.GlobalName = (*source).GlobalName - v1PorterContext.Region = (*source).Region - v1PorterContext.ContextJson = (*source).ContextJSON - v1PorterContext.Name = (*source).Name - v1PorterContext.Description = (*source).Description - v1PorterContext.Status = ToPBPorterContextStatus((*source).Status) + v1PorterContext.Id = ToPBInternalID((*source).PorterContext.ID) + v1PorterContext.GlobalName = (*source).PorterContext.GlobalName + v1PorterContext.Region = (*source).PorterContext.Region + v1PorterContext.ContextJson = (*source).PorterContext.ContextJSON + v1PorterContext.Name = (*source).PorterContext.Name + v1PorterContext.Description = (*source).PorterContext.Description + v1PorterContext.Status = ToPBPorterContextStatus((*source).PorterContext.Status) v1PorterContext.HandleStatus = ToPBPorterContextHandleStatus((*source).HandleStatus) v1PorterContext.HandleStatusMessage = (*source).HandleStatusMessage pV1PorterContext = &v1PorterContext @@ -1462,7 +1460,7 @@ func ToPBPorterContextHandleStatus(source modelsupervisor.PorterContextHandleSta } return v1PorterContextHandleStatus } -func ToPBPorterContextList(source []*modelsupervisor.PorterContext) []*v11.PorterContext { +func ToPBPorterContextList(source []*modelsupervisor.PorterContextController) []*v11.PorterContext { var pV1PorterContextList []*v11.PorterContext if source != nil { pV1PorterContextList = make([]*v11.PorterContext, len(source)) diff --git a/app/sephirah/internal/model/modelsupervisor/modelsupervisor.go b/app/sephirah/internal/model/modelsupervisor/modelsupervisor.go index febf79c..7d3d452 100644 --- a/app/sephirah/internal/model/modelsupervisor/modelsupervisor.go +++ b/app/sephirah/internal/model/modelsupervisor/modelsupervisor.go @@ -13,6 +13,14 @@ type PorterInstanceController struct { ConnectionStatus PorterConnectionStatus ConnectionStatusMessage string LastHeartbeat time.Time + LastEnabledContext []model.InternalID +} + +type PorterContextController struct { + PorterContext + HandleStatus PorterContextHandleStatus + HandleStatusMessage string + HandlerAddress string } type PorterInstance struct { @@ -82,15 +90,13 @@ const ( ) type PorterContext struct { - ID model.InternalID - GlobalName string - Region string - ContextJSON string - Name string - Description string - Status PorterContextStatus - HandleStatus PorterContextHandleStatus - HandleStatusMessage string + ID model.InternalID + GlobalName string + Region string + ContextJSON string + Name string + Description string + Status PorterContextStatus } type PorterContextStatus int diff --git a/app/sephirah/internal/service/tiphereth.go b/app/sephirah/internal/service/tiphereth.go index b9b497a..a402045 100644 --- a/app/sephirah/internal/service/tiphereth.go +++ b/app/sephirah/internal/service/tiphereth.go @@ -324,9 +324,18 @@ func (s *LibrarianSephirahServiceService) ListPorterContexts( if err != nil { return nil, err } + res := make([]*modelsupervisor.PorterContextController, len(contexts)) + for i := range res { + res[i] = s.s.GetContextController(ctx, contexts[i].ID) + if res[i] == nil { + res[i] = new(modelsupervisor.PorterContextController) + res[i].HandleStatus = modelsupervisor.PorterContextHandleStatusBlocked + } + res[i].PorterContext = *contexts[i] + } return &pb.ListPorterContextsResponse{ Paging: &librarian.PagingResponse{TotalSize: total}, - Contexts: converter.ToPBPorterContextList(contexts), + Contexts: converter.ToPBPorterContextList(res), }, nil } diff --git a/app/sephirah/internal/supervisor/supervisor.go b/app/sephirah/internal/supervisor/supervisor.go index 07c0b6f..1b45ea4 100644 --- a/app/sephirah/internal/supervisor/supervisor.go +++ b/app/sephirah/internal/supervisor/supervisor.go @@ -19,6 +19,7 @@ import ( "github.com/tuihub/librarian/internal/lib/libtime" "github.com/tuihub/librarian/internal/lib/libtype" "github.com/tuihub/librarian/internal/lib/logger" + "github.com/tuihub/librarian/internal/model" porter "github.com/tuihub/protos/pkg/librarian/porter/v1" "github.com/google/uuid" @@ -39,12 +40,17 @@ type Supervisor struct { auth *libauth.Auth systemNotify *libmq.Topic[modelnetzach.SystemNotify] - refreshMu sync.Mutex - trustedAddresses []string - instanceController *libtype.SyncMap[string, modelsupervisor.PorterInstanceController] - instanceCache *libcache.Map[string, modelsupervisor.PorterInstance] + refreshMu sync.Mutex + trustedAddresses []string + instanceController *libtype.SyncMap[string, modelsupervisor.PorterInstanceController] + instanceContextController *libtype.SyncMap[model.InternalID, modelsupervisor.PorterContextController] + instanceCache *libcache.Map[string, modelsupervisor.PorterInstance] + instanceContextCache *libcache.Map[model.InternalID, modelsupervisor.PorterContext] + enableContextTopic *libmq.Topic[modelsupervisor.PorterContext] - featureSummary *modelsupervisor.ServerFeatureSummary + // featureSummary contains unique enabled features + featureSummary *modelsupervisor.ServerFeatureSummary + // featureSummaryMap is the map from enabled feature ID to instance addresses featureSummaryMap *modelsupervisor.ServerFeatureSummaryMap featureSummaryRWMu sync.RWMutex } @@ -55,23 +61,31 @@ func NewSupervisor( porter *client.Porter, systemNotify *libmq.Topic[modelnetzach.SystemNotify], instanceCache *libcache.Map[string, modelsupervisor.PorterInstance], + instanceContextCache *libcache.Map[model.InternalID, modelsupervisor.PorterContext], ) (*Supervisor, error) { if c == nil { c = new(conf.Porter) } - return &Supervisor{ - UUID: int64(uuid.New().ID()), - porter: porter, - auth: auth, - instanceController: libtype.NewSyncMap[string, modelsupervisor.PorterInstanceController](), - instanceCache: instanceCache, - refreshMu: sync.Mutex{}, + res := Supervisor{ + UUID: int64(uuid.New().ID()), + porter: porter, + auth: auth, + systemNotify: systemNotify, + + refreshMu: sync.Mutex{}, + trustedAddresses: c.GetTrusted(), + instanceController: libtype.NewSyncMap[string, modelsupervisor.PorterInstanceController](), + instanceContextController: libtype.NewSyncMap[model.InternalID, modelsupervisor.PorterContextController](), + instanceCache: instanceCache, + instanceContextCache: instanceContextCache, + enableContextTopic: nil, + featureSummary: new(modelsupervisor.ServerFeatureSummary), featureSummaryMap: modelsupervisor.NewServerFeatureSummaryMap(), featureSummaryRWMu: sync.RWMutex{}, - trustedAddresses: c.GetTrusted(), - systemNotify: systemNotify, - }, nil + } + res.enableContextTopic = newEnablePorterContextTopic(&res) + return &res, nil } func (s *Supervisor) GetHeartbeatInterval() time.Duration { @@ -88,6 +102,16 @@ func (s *Supervisor) GetInstanceController( return nil } +func (s *Supervisor) GetContextController( + ctx context.Context, + id model.InternalID, +) *modelsupervisor.PorterContextController { + if c, ok := s.instanceContextController.Load(id); ok { + return &c + } + return nil +} + func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO ctx context.Context, ) ([]*modelsupervisor.PorterInstance, error) { @@ -146,6 +170,7 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO ConnectionStatus: modelsupervisor.PorterConnectionStatusConnected, ConnectionStatusMessage: "", LastHeartbeat: time.Now(), + LastEnabledContext: nil, }) }(ctx, address) } @@ -170,30 +195,17 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO // no return } - now := time.Now() - if resp != nil { + if err != nil { + ctl.ConnectionStatusMessage += fmt.Sprintf("Error: %s", err.Error()) + } else if resp != nil { ctl.ConnectionStatusMessage = resp.GetStatusMessage() - } else { - ctl.ConnectionStatusMessage = "" } lastConnectionStatus := ctl.ConnectionStatus - if err != nil { //nolint:nestif // TODO - if ctl.LastHeartbeat.Add(defaultHeartbeatTimeout).Before(now) { - ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusDisconnected - } else if ctl.LastHeartbeat.Add(defaultHeartbeatDowngrade).Before(now) { - ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusDowngraded - } else if ctl.ConnectionStatus == modelsupervisor.PorterConnectionStatusActive { - ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActive - } else { - ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActivationFailed - } - if ctl.ConnectionStatusMessage != "" { - ctl.ConnectionStatusMessage += "\n" - } - ctl.ConnectionStatusMessage += fmt.Sprintf("Error: %s", err.Error()) - } else { - ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActive - ctl.LastHeartbeat = now + err = s.updateControllers(ctx, &ctl, resp) + if err != nil { + logger.Errorf("%s", err.Error()) + hasError = true + notifyCh <- fmt.Sprintf("Error on %s: %s", ins.Address, err.Error()) } if lastConnectionStatus != ctl.ConnectionStatus { updateFeatureSummary = true @@ -310,3 +322,50 @@ func (s *Supervisor) enablePorterInstance( } return resp, err } + +func (s *Supervisor) updateControllers( + ctx context.Context, + ctl *modelsupervisor.PorterInstanceController, + resp *porter.EnablePorterResponse, +) error { + now := time.Now() + if resp == nil { //nolint:nestif // TODO + if ctl.LastHeartbeat.Add(defaultHeartbeatTimeout).Before(now) { + ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusDisconnected + } else if ctl.LastHeartbeat.Add(defaultHeartbeatDowngrade).Before(now) { + ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusDowngraded + } else if ctl.ConnectionStatus == modelsupervisor.PorterConnectionStatusActive { + ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActive + } else { + ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActivationFailed + } + } else { + ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActive + ctl.LastHeartbeat = now + } + if resp == nil || resp.GetEnablesSummary() == nil { + return nil + } + if ctl.ConnectionStatus == modelsupervisor.PorterConnectionStatusActive { + onlyLast, _ := libtype.DiffSlices( + ctl.LastEnabledContext, + converter.ToBizInternalIDList(resp.GetEnablesSummary().GetContextIds()), + ) + ctl.LastEnabledContext = converter.ToBizInternalIDList(resp.GetEnablesSummary().GetContextIds()) + for _, id := range onlyLast { + ic, ok := s.instanceContextController.Load(id) + if !ok { + continue + } + ic.HandleStatus = modelsupervisor.PorterContextHandleStatusQueueing + ic.HandleStatusMessage = "" + ic.HandlerAddress = "" + s.instanceContextController.Store(id, ic) + err := s.QueuePorterContext(ctx, ic.PorterContext) + if err != nil { + return err + } + } + } + return nil +} diff --git a/app/sephirah/internal/supervisor/task.go b/app/sephirah/internal/supervisor/task.go new file mode 100644 index 0000000..cf32729 --- /dev/null +++ b/app/sephirah/internal/supervisor/task.go @@ -0,0 +1,108 @@ +package supervisor + +import ( + "context" + "fmt" + + "github.com/tuihub/librarian/app/sephirah/internal/client" + "github.com/tuihub/librarian/app/sephirah/internal/model/converter" + "github.com/tuihub/librarian/app/sephirah/internal/model/modelsupervisor" + "github.com/tuihub/librarian/internal/lib/libmq" + "github.com/tuihub/librarian/internal/lib/libtype" + porter "github.com/tuihub/protos/pkg/librarian/porter/v1" +) + +func (s *Supervisor) QueuePorterContext( + ctx context.Context, + porterContext modelsupervisor.PorterContext, +) error { + return s.enableContextTopic.Publish(ctx, porterContext) +} + +func newEnablePorterContextTopic( + s *Supervisor, +) *libmq.Topic[modelsupervisor.PorterContext] { + return libmq.NewTopic[modelsupervisor.PorterContext]( + "EnablePorterContext", + func(ctx context.Context, c *modelsupervisor.PorterContext) error { + ic, ok := s.instanceContextController.Load(c.ID) + if ok && ic.HandleStatus == modelsupervisor.PorterContextHandleStatusActive { + return nil + } + if !ok { + ic = modelsupervisor.PorterContextController{ + PorterContext: *c, + HandleStatus: modelsupervisor.PorterContextHandleStatusQueueing, + HandleStatusMessage: "", + HandlerAddress: "", + } + } + var available []string + s.instanceController.Range(func(key string, controller modelsupervisor.PorterInstanceController) bool { + if controller.ConnectionStatus == modelsupervisor.PorterConnectionStatusActive && + controller.GlobalName == c.GlobalName && + controller.Region == c.Region { + available = append(available, controller.PorterInstance.Address) + } + return true + }) + if len(available) == 0 { + ic.HandleStatus = modelsupervisor.PorterContextHandleStatusBlocked + ic.HandleStatusMessage = "no available instance" + s.instanceContextController.Store(c.ID, ic) + return nil + } + libtype.ShuffleSlice(available) + var err error + ic.HandleStatus = modelsupervisor.PorterContextHandleStatusBlocked + ic.HandleStatusMessage = "" + for _, address := range available { + _, err = s.porter.EnableContext( + client.WithPorterAddress(ctx, []string{address}), + &porter.EnableContextRequest{ + ContextId: converter.ToPBInternalID(c.ID), + ContextJson: c.ContextJSON, + }, + ) + if err == nil { + ic.HandleStatus = modelsupervisor.PorterContextHandleStatusActive + ic.HandlerAddress = address + break + } + } + if err != nil { + ic.HandleStatusMessage = err.Error() + } + s.instanceContextController.Store(c.ID, ic) + return err + }, + ) +} + +func (s *Supervisor) DisablePorterContext( + ctx context.Context, + porterContext *modelsupervisor.PorterContext, +) error { + ic, ok := s.instanceContextController.Load(porterContext.ID) + if !ok || len(ic.HandlerAddress) == 0 { + return nil + } + _, ok = s.instanceController.Load(ic.HandlerAddress) + if !ok { + return fmt.Errorf("instance not found: %s", ic.HandlerAddress) + } + _, err := s.porter.DisableContext( + client.WithPorterAddress(ctx, []string{ic.HandlerAddress}), + &porter.DisableContextRequest{ + ContextId: converter.ToPBInternalID(porterContext.ID), + }, + ) + if err != nil { + return err + } + ic.HandleStatus = modelsupervisor.PorterContextHandleStatusQueueing + ic.HandleStatusMessage = "" + ic.HandlerAddress = "" + s.instanceContextController.Store(porterContext.ID, ic) + return nil +} diff --git a/app/sephirah/pkg/service/wire_gen.go b/app/sephirah/pkg/service/wire_gen.go index f83423c..c8652f6 100644 --- a/app/sephirah/pkg/service/wire_gen.go +++ b/app/sephirah/pkg/service/wire_gen.go @@ -54,7 +54,8 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data topic := biznetzach.NewSystemNotificationTopic(netzachRepo, searcher) tipherethRepo := data.NewTipherethRepo(dataData) libcacheMap := biztiphereth.NewPorterInstanceCache(tipherethRepo, store) - supervisorSupervisor, err := supervisor.NewSupervisor(porter, auth, clientPorter, topic, libcacheMap) + map2 := biztiphereth.NewPorterContextCache(tipherethRepo, store) + supervisorSupervisor, err := supervisor.NewSupervisor(porter, auth, clientPorter, topic, libcacheMap, map2) if err != nil { cleanup() return nil, nil, err @@ -65,16 +66,16 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data cleanup() return nil, nil, err } - map2 := bizangela.NewAppInfoCache(geburaRepo, store) + map3 := bizangela.NewAppInfoCache(geburaRepo, store) libmqTopic := bizangela.NewUpdateAppInfoIndexTopic(angelaBase) - topic2 := bizangela.NewPullAppInfoTopic(angelaBase, map2, libmqTopic) + topic2 := bizangela.NewPullAppInfoTopic(angelaBase, map3, libmqTopic) topic3 := bizangela.NewPullAccountAppInfoRelationTopic(angelaBase, topic2) topic4 := bizangela.NewPullAccountTopic(angelaBase, topic3) - map3 := bizangela.NewNotifyFlowCache(netzachRepo, store) - map4 := bizangela.NewFeedToNotifyFlowCache(netzachRepo, store) - map5 := bizangela.NewNotifyTargetCache(netzachRepo, store) - topic5 := bizangela.NewNotifyPushTopic(angelaBase, map5) - topic6 := bizangela.NewNotifyRouterTopic(angelaBase, map3, map4, topic5) + map4 := bizangela.NewNotifyFlowCache(netzachRepo, store) + map5 := bizangela.NewFeedToNotifyFlowCache(netzachRepo, store) + map6 := bizangela.NewNotifyTargetCache(netzachRepo, store) + topic5 := bizangela.NewNotifyPushTopic(angelaBase, map6) + topic6 := bizangela.NewNotifyRouterTopic(angelaBase, map4, map5, topic5) topic7 := bizangela.NewFeedItemPostprocessTopic(angelaBase, topic6, topic) topic8 := bizangela.NewPullFeedTopic(angelaBase, topic7, topic) angela, err := bizangela.NewAngela(mq, topic4, topic3, topic2, topic8, topic6, topic5, topic7, libmqTopic) @@ -88,7 +89,7 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data cleanup() return nil, nil, err } - gebura := bizgebura.NewGebura(geburaRepo, auth, searcher, librarianPorterServiceClient, supervisorSupervisor, libmqTopic, topic2, map2) + gebura := bizgebura.NewGebura(geburaRepo, auth, searcher, librarianPorterServiceClient, supervisorSupervisor, libmqTopic, topic2, map3) binahRepo, err := data.NewBinahRepo(s3) if err != nil { cleanup() @@ -97,20 +98,20 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data controlBlock := bizbinah.NewControlBlock(auth) binah := bizbinah.NewBinah(binahRepo, controlBlock, auth, librarianSearcherServiceClient) yesodRepo := data.NewYesodRepo(dataData) - map6 := bizyesod.NewFeedOwnerCache(yesodRepo, store) - yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic, map6) + map7 := bizyesod.NewFeedOwnerCache(yesodRepo, store) + yesod, err := bizyesod.NewYesod(yesodRepo, supervisorSupervisor, cron, searcher, topic8, topic, map7) if err != nil { cleanup() return nil, nil, err } - netzach, err := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, mq, map4, map3, map5, topic) + netzach, err := biznetzach.NewNetzach(netzachRepo, supervisorSupervisor, searcher, mq, map5, map4, map6, topic) if err != nil { cleanup() return nil, nil, err } chesedRepo := data.NewChesedRepo(dataData) - map7 := bizchesed.NewImageCache(store) - chesed, err := bizchesed.NewChesed(chesedRepo, binahRepo, cron, librarianPorterServiceClient, searcher, librarianMinerServiceClient, controlBlock, map7) + map8 := bizchesed.NewImageCache(store) + chesed, err := bizchesed.NewChesed(chesedRepo, binahRepo, cron, librarianPorterServiceClient, searcher, librarianMinerServiceClient, controlBlock, map8) if err != nil { cleanup() return nil, nil, err diff --git a/internal/lib/libtype/slice.go b/internal/lib/libtype/slice.go new file mode 100644 index 0000000..465fbaa --- /dev/null +++ b/internal/lib/libtype/slice.go @@ -0,0 +1,32 @@ +package libtype + +import ( + "math/rand" +) + +func ShuffleSlice[T any](ss []T) { + for i := range ss { + j := rand.Intn(i + 1) //nolint:gosec // not critical + ss[i], ss[j] = ss[j], ss[i] + } +} + +func DiffSlices[T comparable](a, b []T) ([]T, []T) { + onlyA := make([]T, 0, len(a)) + onlyB := make([]T, 0, len(b)) + m := make(map[T]struct{}, len(a)) + for _, v := range a { + m[v] = struct{}{} + } + for _, v := range b { + if _, ok := m[v]; !ok { + onlyB = append(onlyB, v) + } + } + for _, v := range a { + if _, ok := m[v]; !ok { + onlyA = append(onlyA, v) + } + } + return onlyA, onlyB +}