From d047ccdc5c8a465ec9b29ef130fc25c07e807975 Mon Sep 17 00:00:00 2001 From: MuZhou233 Date: Wed, 7 Aug 2024 19:12:39 +0100 Subject: [PATCH] fix: test new logic --- app/sephirah/cmd/sephirah/wire_gen.go | 2 +- app/sephirah/internal/data/tiphereth.go | 43 ++++++--- .../internal/model/converter/biz_to_pb.go | 1 + .../internal/model/converter/generated.go | 45 +++------ .../service/librariansephirahservice.go | 57 +++++------- app/sephirah/internal/service/tiphereth.go | 2 +- app/sephirah/internal/supervisor/summary.go | 93 ++++++------------- .../internal/supervisor/supervisor.go | 7 +- app/sephirah/pkg/service/wire_gen.go | 2 +- internal/lib/libcache/key.go | 2 +- 10 files changed, 104 insertions(+), 150 deletions(-) diff --git a/app/sephirah/cmd/sephirah/wire_gen.go b/app/sephirah/cmd/sephirah/wire_gen.go index a374145..5f6a77e 100644 --- a/app/sephirah/cmd/sephirah/wire_gen.go +++ b/app/sephirah/cmd/sephirah/wire_gen.go @@ -164,7 +164,7 @@ func wireApp(sephirahServer *conf.SephirahServer, database *conf.Database, s3 *c return nil, nil, err } v := server.NewAuthMiddleware(libauthAuth) - librarianSephirahServiceServer := service.NewLibrarianSephirahServiceService(angela, tiphereth, gebura, binah, yesod, netzach, chesed, supervisorSupervisor, settings, libauthAuth, v, sephirahServer, store) + librarianSephirahServiceServer := service.NewLibrarianSephirahServiceService(angela, tiphereth, gebura, binah, yesod, netzach, chesed, supervisorSupervisor, settings, libauthAuth, v, sephirahServer) grpcServer, err := server.NewGRPCServer(sephirahServer, libauthAuth, librarianSephirahServiceServer, settings, builtInObserver) if err != nil { cleanup2() diff --git a/app/sephirah/internal/data/tiphereth.go b/app/sephirah/internal/data/tiphereth.go index 7c887e6..43be1e9 100644 --- a/app/sephirah/internal/data/tiphereth.go +++ b/app/sephirah/internal/data/tiphereth.go @@ -362,9 +362,11 @@ func (t tipherethRepo) UpsertPorters(ctx context.Context, il []*modelsupervisor. SetBuildVersion(instance.BinarySummary.BuildVersion). SetBuildDate(instance.BinarySummary.BuildDate). SetGlobalName(instance.GlobalName). + SetRegion(instance.Region). SetAddress(instance.Address). SetStatus(converter.ToEntPorterInstanceStatus(instance.Status)). - SetFeatureSummary(instance.FeatureSummary) + SetFeatureSummary(instance.FeatureSummary). + SetContextJSONSchema(instance.ContextJSONSchema) } return t.data.db.PorterInstance. CreateBulk(instances...). @@ -430,6 +432,7 @@ func (t tipherethRepo) CreatePorterContext( context *modelsupervisor.PorterContext, ) error { return t.data.db.PorterContext.Create(). + SetID(context.ID). SetOwnerID(userID). SetGlobalName(context.GlobalName). SetRegion(context.Region). @@ -482,9 +485,10 @@ func (t tipherethRepo) ListPorterGroups( ctx context.Context, status []modeltiphereth.UserStatus, ) ([]*modelsupervisor.PorterGroup, error) { - var pi []*ent.PorterInstance - var pg []*modelsupervisor.PorterGroup - pgm := make(map[string]*modelsupervisor.PorterGroup) + var res []struct { + ent.PorterInstance + Min model.InternalID + } q := t.data.db.PorterInstance.Query() if len(status) > 0 { q.Where(porterinstance.StatusIn(converter.ToEntPorterInstanceStatusList(status)...)) @@ -492,27 +496,38 @@ func (t tipherethRepo) ListPorterGroups( err := q.GroupBy( porterinstance.FieldGlobalName, porterinstance.FieldRegion, - ).Scan(ctx, &pi) + ). + Aggregate(ent.Min(porterinstance.FieldID)). + Scan(ctx, &res) + if err != nil { + return nil, err + } + var ids []model.InternalID + for _, p := range res { + ids = append(ids, p.Min) + } + pi, err := t.data.db.PorterInstance.Query().Where( + porterinstance.IDIn(ids...), + ).All(ctx) if err != nil { return nil, err } + var pg []*modelsupervisor.PorterGroup + pgm := make(map[string]*modelsupervisor.PorterGroup) for _, p := range pi { + if len(p.ContextJSONSchema) == 0 { + continue + } if pgm[p.GlobalName] == nil { pgm[p.GlobalName] = &modelsupervisor.PorterGroup{ - BinarySummary: &modelsupervisor.PorterBinarySummary{ - Name: p.Name, - Version: p.Version, - Description: p.Description, - SourceCodeAddress: p.SourceCodeAddress, - BuildVersion: p.BuildVersion, - BuildDate: p.BuildDate, - }, + BinarySummary: converter.ToBizPorter(p).BinarySummary, GlobalName: p.GlobalName, Regions: []string{p.Region}, ContextJSONSchema: p.ContextJSONSchema, } + } else { + pgm[p.GlobalName].Regions = append(pgm[p.GlobalName].Regions, p.Region) } - pgm[p.GlobalName].Regions = append(pgm[p.GlobalName].Regions, p.Region) } for _, v := range pgm { pg = append(pg, v) diff --git a/app/sephirah/internal/model/converter/biz_to_pb.go b/app/sephirah/internal/model/converter/biz_to_pb.go index 6dcf4cb..ec1a83e 100644 --- a/app/sephirah/internal/model/converter/biz_to_pb.go +++ b/app/sephirah/internal/model/converter/biz_to_pb.go @@ -33,6 +33,7 @@ type toPBConverter interface { //nolint:unused // used by generator ToPBInternalIDList([]model.InternalID) []*librarian.InternalID ToPBServerFeatureSummary(*modelsupervisor.ServerFeatureSummary) *pb.ServerFeatureSummary ToPBFeatureFlag(*modelsupervisor.FeatureFlag) *librarian.FeatureFlag + ToPBFeatureFlagList([]*modelsupervisor.FeatureFlag) []*librarian.FeatureFlag ToPBFeatureRequest(*modelsupervisor.FeatureRequest) *librarian.FeatureRequest // goverter:map ID DeviceId diff --git a/app/sephirah/internal/model/converter/generated.go b/app/sephirah/internal/model/converter/generated.go index b6738ef..f7b9112 100755 --- a/app/sephirah/internal/model/converter/generated.go +++ b/app/sephirah/internal/model/converter/generated.go @@ -1040,6 +1040,16 @@ func ToPBFeatureFlag(source *modelsupervisor.FeatureFlag) *v1.FeatureFlag { } return pV1FeatureFlag } +func ToPBFeatureFlagList(source []*modelsupervisor.FeatureFlag) []*v1.FeatureFlag { + var pV1FeatureFlagList []*v1.FeatureFlag + if source != nil { + pV1FeatureFlagList = make([]*v1.FeatureFlag, len(source)) + for i := 0; i < len(source); i++ { + pV1FeatureFlagList[i] = ToPBFeatureFlag(source[i]) + } + } + return pV1FeatureFlagList +} func ToPBFeatureRequest(source *modelsupervisor.FeatureRequest) *v1.FeatureRequest { var pV1FeatureRequest *v1.FeatureRequest if source != nil { @@ -1518,36 +1528,11 @@ func ToPBServerFeatureSummary(source *modelsupervisor.ServerFeatureSummary) *v11 var pV1ServerFeatureSummary *v11.ServerFeatureSummary if source != nil { var v1ServerFeatureSummary v11.ServerFeatureSummary - if (*source).AccountPlatforms != nil { - v1ServerFeatureSummary.AccountPlatforms = make([]*v1.FeatureFlag, len((*source).AccountPlatforms)) - for i := 0; i < len((*source).AccountPlatforms); i++ { - v1ServerFeatureSummary.AccountPlatforms[i] = ToPBFeatureFlag((*source).AccountPlatforms[i]) - } - } - if (*source).AppInfoSources != nil { - v1ServerFeatureSummary.AppInfoSources = make([]*v1.FeatureFlag, len((*source).AppInfoSources)) - for j := 0; j < len((*source).AppInfoSources); j++ { - v1ServerFeatureSummary.AppInfoSources[j] = ToPBFeatureFlag((*source).AppInfoSources[j]) - } - } - if (*source).FeedSources != nil { - v1ServerFeatureSummary.FeedSources = make([]*v1.FeatureFlag, len((*source).FeedSources)) - for k := 0; k < len((*source).FeedSources); k++ { - v1ServerFeatureSummary.FeedSources[k] = ToPBFeatureFlag((*source).FeedSources[k]) - } - } - if (*source).NotifyDestinations != nil { - v1ServerFeatureSummary.NotifyDestinations = make([]*v1.FeatureFlag, len((*source).NotifyDestinations)) - for l := 0; l < len((*source).NotifyDestinations); l++ { - v1ServerFeatureSummary.NotifyDestinations[l] = ToPBFeatureFlag((*source).NotifyDestinations[l]) - } - } - if (*source).FeedItemActions != nil { - v1ServerFeatureSummary.FeedItemActions = make([]*v1.FeatureFlag, len((*source).FeedItemActions)) - for m := 0; m < len((*source).FeedItemActions); m++ { - v1ServerFeatureSummary.FeedItemActions[m] = ToPBFeatureFlag((*source).FeedItemActions[m]) - } - } + v1ServerFeatureSummary.AccountPlatforms = ToPBFeatureFlagList((*source).AccountPlatforms) + v1ServerFeatureSummary.AppInfoSources = ToPBFeatureFlagList((*source).AppInfoSources) + v1ServerFeatureSummary.FeedSources = ToPBFeatureFlagList((*source).FeedSources) + v1ServerFeatureSummary.NotifyDestinations = ToPBFeatureFlagList((*source).NotifyDestinations) + v1ServerFeatureSummary.FeedItemActions = ToPBFeatureFlagList((*source).FeedItemActions) pV1ServerFeatureSummary = &v1ServerFeatureSummary } return pV1ServerFeatureSummary diff --git a/app/sephirah/internal/service/librariansephirahservice.go b/app/sephirah/internal/service/librariansephirahservice.go index fc82001..78b3140 100644 --- a/app/sephirah/internal/service/librariansephirahservice.go +++ b/app/sephirah/internal/service/librariansephirahservice.go @@ -16,7 +16,6 @@ import ( "github.com/tuihub/librarian/internal/conf" "github.com/tuihub/librarian/internal/lib/libapp" "github.com/tuihub/librarian/internal/lib/libauth" - "github.com/tuihub/librarian/internal/lib/libcache" pb "github.com/tuihub/protos/pkg/librarian/sephirah/v1" "google.golang.org/protobuf/types/known/timestamppb" @@ -35,7 +34,7 @@ type LibrarianSephirahServiceService struct { app *libapp.Settings auth *libauth.Auth authFunc func(context.Context) (context.Context, error) - info *libcache.Key[pb.GetServerInformationResponse] + info *pb.ServerInstanceSummary } func NewLibrarianSephirahServiceService( @@ -51,7 +50,6 @@ func NewLibrarianSephirahServiceService( auth *libauth.Auth, authFunc func(context.Context) (context.Context, error), config *conf.SephirahServer, - store libcache.Store, ) pb.LibrarianSephirahServiceServer { t.CreateConfiguredAdmin() if config == nil { @@ -74,47 +72,34 @@ func NewLibrarianSephirahServiceService( authFunc: authFunc, info: nil, } - res.info = newServerInfromationCache(res, store, &pb.ServerInstanceSummary{ + res.info = &pb.ServerInstanceSummary{ Name: config.GetInfo().GetName(), Description: config.GetInfo().GetDescription(), WebsiteUrl: config.GetInfo().GetWebsiteUrl(), LogoUrl: config.GetInfo().GetLogoUrl(), BackgroundUrl: config.GetInfo().GetBackgroundUrl(), - }) + } return res } -func newServerInfromationCache( - s *LibrarianSephirahServiceService, - store libcache.Store, - serverSummary *pb.ServerInstanceSummary, -) *libcache.Key[pb.GetServerInformationResponse] { - return libcache.NewKey[pb.GetServerInformationResponse]( - store, - "GetServerInformationResponse", - func(ctx context.Context) (*pb.GetServerInformationResponse, error) { - featureSummary := s.s.GetFeatureSummary() - featureSummary.FeedItemActions = append(featureSummary.FeedItemActions, s.y.GetBuiltInFeedActions()...) - return &pb.GetServerInformationResponse{ - ServerBinarySummary: &pb.ServerBinarySummary{ - SourceCodeAddress: s.app.SourceCodeAddress, - BuildVersion: s.app.Version, - BuildDate: s.app.BuildDate, - }, - ProtocolSummary: &pb.ServerProtocolSummary{ - Version: s.app.ProtoVersion, - }, - CurrentTime: timestamppb.New(time.Now()), - FeatureSummary: converter.ToPBServerFeatureSummary(featureSummary), - ServerInstanceSummary: serverSummary, - StatusReport: nil, - }, nil - }, - libcache.WithExpiration(time.Minute), - ) -} - func (s *LibrarianSephirahServiceService) GetServerInformation(ctx context.Context, _ *pb.GetServerInformationRequest) (*pb.GetServerInformationResponse, error) { - return s.info.Get(ctx) + featureSummary := converter.ToPBServerFeatureSummary(s.s.GetFeatureSummary()) + featureSummary.FeedItemActions = append(featureSummary.FeedItemActions, + converter.ToPBFeatureFlagList(s.y.GetBuiltInFeedActions())..., + ) + return &pb.GetServerInformationResponse{ + ServerBinarySummary: &pb.ServerBinarySummary{ + SourceCodeAddress: s.app.SourceCodeAddress, + BuildVersion: s.app.Version, + BuildDate: s.app.BuildDate, + }, + ProtocolSummary: &pb.ServerProtocolSummary{ + Version: s.app.ProtoVersion, + }, + CurrentTime: timestamppb.New(time.Now()), + FeatureSummary: featureSummary, + ServerInstanceSummary: s.info, + StatusReport: nil, + }, nil } diff --git a/app/sephirah/internal/service/tiphereth.go b/app/sephirah/internal/service/tiphereth.go index 7e6d498..b9b497a 100644 --- a/app/sephirah/internal/service/tiphereth.go +++ b/app/sephirah/internal/service/tiphereth.go @@ -268,9 +268,9 @@ func (s *LibrarianSephirahServiceService) ListPorters(ctx context.Context, req * res[i] = s.s.GetInstanceController(ctx, porters[i].Address) if res[i] == nil { res[i] = new(modelsupervisor.PorterInstanceController) - res[i].PorterInstance = *porters[i] res[i].ConnectionStatus = modelsupervisor.PorterConnectionStatusDisconnected } + res[i].PorterInstance = *porters[i] } return &pb.ListPortersResponse{ Paging: &librarian.PagingResponse{TotalSize: total}, diff --git a/app/sephirah/internal/supervisor/summary.go b/app/sephirah/internal/supervisor/summary.go index 1c27b30..81be597 100644 --- a/app/sephirah/internal/supervisor/summary.go +++ b/app/sephirah/internal/supervisor/summary.go @@ -4,7 +4,6 @@ import ( "context" "github.com/tuihub/librarian/app/sephirah/internal/model/modelsupervisor" - "github.com/tuihub/librarian/app/sephirah/internal/model/modeltiphereth" "github.com/tuihub/librarian/internal/lib/libtype" ) @@ -13,95 +12,59 @@ func (s *Supervisor) GetFeatureSummary() *modelsupervisor.ServerFeatureSummary { defer s.featureSummaryRWMu.RUnlock() featureSummary := new(modelsupervisor.ServerFeatureSummary) if s.featureSummary != nil { - _ = libtype.DeepCopyStruct(s.featureSummary, &featureSummary) + _ = libtype.DeepCopyStruct(s.featureSummary, featureSummary) } - return s.featureSummary + return featureSummary } -func (s *Supervisor) updateFeatureSummary(ctx context.Context) { +func (s *Supervisor) updateFeatureSummary(_ context.Context) { + s.featureSummaryRWMu.Lock() + defer s.featureSummaryRWMu.Unlock() + var instances []*modelsupervisor.PorterInstance s.instanceController.Range(func(key string, controller modelsupervisor.PorterInstanceController) bool { - ins, err := s.instanceCache.Get(ctx, key) - if err == nil && ins != nil && ins.Status == modeltiphereth.UserStatusActive { - instances = append(instances, ins) + if controller.ConnectionStatus == modelsupervisor.PorterConnectionStatusActive { + instances = append(instances, &controller.PorterInstance) } return true }) featureSummary, featureSummaryMap := summarize(instances) - - s.featureSummaryRWMu.Lock() - defer s.featureSummaryRWMu.Unlock() s.featureSummary = featureSummary s.featureSummaryMap = featureSummaryMap } -func summarize( //nolint:gocognit // how? +func summarize( instances []*modelsupervisor.PorterInstance, ) (*modelsupervisor.ServerFeatureSummary, *modelsupervisor.ServerFeatureSummaryMap) { res := new(modelsupervisor.ServerFeatureSummary) resMap := modelsupervisor.NewServerFeatureSummaryMap() - supportedAccountPlatforms := make(map[string]bool) - supportedAppSources := make(map[string]bool) - supportedFeedSources := make(map[string]bool) - supportedNotifyDestinations := make(map[string]bool) for _, ins := range instances { if ins == nil { continue } - for _, account := range ins.FeatureSummary.AccountPlatforms { - a, _ := resMap.AccountPlatforms.Load(ins.Address) - if a == nil { - a = []string{} - } - a = append(a, account.ID) - resMap.AccountPlatforms.Store(ins.Address, a) - if supportedAccountPlatforms[account.ID] { - continue - } - res.AccountPlatforms = append(res.AccountPlatforms, account) - supportedAccountPlatforms[account.ID] = true - } - for _, appSource := range ins.FeatureSummary.AppInfoSources { - a, _ := resMap.AppInfoSources.Load(ins.Address) - if a == nil { - a = []string{} - } - a = append(a, appSource.ID) - resMap.AppInfoSources.Store(ins.Address, a) - if supportedAppSources[appSource.ID] { - continue - } - res.AppInfoSources = append(res.AppInfoSources, appSource) - supportedAppSources[appSource.ID] = true - } - for _, feedSource := range ins.FeatureSummary.FeedSources { - a, _ := resMap.FeedSources.Load(ins.Address) - if a == nil { - a = []string{} - } - a = append(a, feedSource.ID) - resMap.FeedSources.Store(ins.Address, a) - if supportedFeedSources[feedSource.ID] { - continue - } - res.FeedSources = append(res.FeedSources, feedSource) - supportedFeedSources[feedSource.ID] = true - } - for _, notifyDestination := range ins.FeatureSummary.NotifyDestinations { - a, _ := resMap.NotifyDestinations.Load(ins.Address) - if a == nil { - a = []string{} - } - a = append(a, notifyDestination.ID) - resMap.NotifyDestinations.Store(ins.Address, a) - if supportedNotifyDestinations[notifyDestination.ID] { - continue + do := func(flags []*modelsupervisor.FeatureFlag, resMap *libtype.SyncMap[string, []string], res []*modelsupervisor.FeatureFlag) { + markMap := make(map[string]bool) + for _, flag := range flags { + a, _ := resMap.Load(flag.ID) + if a == nil { + a = []string{} + } + a = append(a, ins.Address) + resMap.Store(flag.ID, a) + if markMap[flag.ID] { + continue + } + res = append(res, flag) + markMap[flag.ID] = true } - res.NotifyDestinations = append(res.NotifyDestinations, notifyDestination) - supportedNotifyDestinations[notifyDestination.ID] = true } + do(ins.FeatureSummary.AccountPlatforms, resMap.AccountPlatforms, res.AccountPlatforms) + do(ins.FeatureSummary.AppInfoSources, resMap.AppInfoSources, res.AppInfoSources) + do(ins.FeatureSummary.FeedSources, resMap.FeedSources, res.FeedSources) + do(ins.FeatureSummary.NotifyDestinations, resMap.NotifyDestinations, res.NotifyDestinations) + do(ins.FeatureSummary.FeedItemActions, resMap.FeedItemActions, res.FeedItemActions) } return res, resMap } diff --git a/app/sephirah/internal/supervisor/supervisor.go b/app/sephirah/internal/supervisor/supervisor.go index db56e2d..07c0b6f 100644 --- a/app/sephirah/internal/supervisor/supervisor.go +++ b/app/sephirah/internal/supervisor/supervisor.go @@ -103,6 +103,7 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO } newInstances := make([]*modelsupervisor.PorterInstance, 0, len(discoveredAddresses)) newInstancesMu := sync.Mutex{} + updateFeatureSummary := false hasError := false notification := modelnetzach.NewSystemNotify( modelnetzach.SystemNotificationLevelOngoing, @@ -175,6 +176,7 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO } else { ctl.ConnectionStatusMessage = "" } + lastConnectionStatus := ctl.ConnectionStatus if err != nil { //nolint:nestif // TODO if ctl.LastHeartbeat.Add(defaultHeartbeatTimeout).Before(now) { ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusDisconnected @@ -193,6 +195,9 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO ctl.ConnectionStatus = modelsupervisor.PorterConnectionStatusActive ctl.LastHeartbeat = now } + if lastConnectionStatus != ctl.ConnectionStatus { + updateFeatureSummary = true + } s.instanceController.Store(ins.Address, ctl) }(ctx, ins) @@ -230,7 +235,7 @@ func (s *Supervisor) RefreshAliveInstances( //nolint:gocognit,funlen // TODO _ = s.systemNotify.PublishFallsLocalCall(ctx, notification) } - if len(newInstances) > 0 { + if updateFeatureSummary { go s.updateFeatureSummary(ctx) } return newInstances, nil diff --git a/app/sephirah/pkg/service/wire_gen.go b/app/sephirah/pkg/service/wire_gen.go index fff6434..f83423c 100644 --- a/app/sephirah/pkg/service/wire_gen.go +++ b/app/sephirah/pkg/service/wire_gen.go @@ -116,7 +116,7 @@ func NewSephirahService(sephirahServer *conf.SephirahServer, database *conf.Data return nil, nil, err } v := server.NewAuthMiddleware(auth) - librarianSephirahServiceServer := service.NewLibrarianSephirahServiceService(angela, tiphereth, gebura, binah, yesod, netzach, chesed, supervisorSupervisor, settings, auth, v, sephirahServer, store) + librarianSephirahServiceServer := service.NewLibrarianSephirahServiceService(angela, tiphereth, gebura, binah, yesod, netzach, chesed, supervisorSupervisor, settings, auth, v, sephirahServer) return librarianSephirahServiceServer, func() { cleanup() }, nil diff --git a/internal/lib/libcache/key.go b/internal/lib/libcache/key.go index 32343e5..b4608d7 100644 --- a/internal/lib/libcache/key.go +++ b/internal/lib/libcache/key.go @@ -112,7 +112,7 @@ func (k *Key[T]) Set(ctx context.Context, value *T, options ...Option) error { if err != nil { return err } - return k.store.Set(ctx, k.keyName, b, options...) + return k.store.Set(ctx, k.keyName, b, append(k.defaultOptions, options...)...) } func (k *Key[T]) Delete(ctx context.Context) error {