diff --git a/server/datacatalog/datacatalogv2/datacatalogv2adapter/cache.go b/server/datacatalog/datacatalogv2/datacatalogv2adapter/cache.go index 0f6a88157..08c15e15d 100644 --- a/server/datacatalog/datacatalogv2/datacatalogv2adapter/cache.go +++ b/server/datacatalog/datacatalogv2/datacatalogv2adapter/cache.go @@ -3,37 +3,12 @@ package datacatalogv2adapter import ( "context" "fmt" - "time" "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2" "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/plateauapi" "golang.org/x/exp/slices" ) -const minCacheDuration = 30 * time.Second - -func New(cmsbase, project string) (*plateauapi.RepoWrapper, error) { - fetcher, err := datacatalogv2.NewFetcher(cmsbase) - if err != nil { - return nil, err - } - return From(fetcher, project), nil -} - -func From(fetcher datacatalogv2.Fetchable, project string) *plateauapi.RepoWrapper { - r := plateauapi.NewRepoWrapper(nil, func(ctx context.Context, repo *plateauapi.Repo) error { - r, err := fetchAndCreateCache(ctx, project, fetcher, datacatalogv2.FetcherDoOptions{}) - if err != nil { - return err - } - *repo = r - return nil - }) - r.SetName(fmt.Sprintf("%s(v2)", project)) - r.SetMinCacheDuration(minCacheDuration) - return r -} - func fetchAndCreateCache(ctx context.Context, project string, fetcher datacatalogv2.Fetchable, opts datacatalogv2.FetcherDoOptions) (*plateauapi.InMemoryRepo, error) { r, err := fetcher.Do(ctx, project, opts) if err != nil { diff --git a/server/datacatalog/datacatalogv2/datacatalogv2adapter/repos.go b/server/datacatalog/datacatalogv2/datacatalogv2adapter/repos.go new file mode 100644 index 000000000..e4b6c94e8 --- /dev/null +++ b/server/datacatalog/datacatalogv2/datacatalogv2adapter/repos.go @@ -0,0 +1,68 @@ +package datacatalogv2adapter + +import ( + "context" + "fmt" + "time" + + "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2" + "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/plateauapi" + "github.com/reearth/reearthx/log" + "github.com/reearth/reearthx/util" +) + +type Repos struct { + fetchers *util.SyncMap[string, datacatalogv2.Fetchable] + *plateauapi.Repos +} + +func NewRepos() *Repos { + r := &Repos{ + fetchers: util.NewSyncMap[string, datacatalogv2.Fetchable](), + } + r.Repos = plateauapi.NewRepos(r.update) + return r +} + +func (r *Repos) Prepare(ctx context.Context, project string, f datacatalogv2.Fetchable) error { + if _, ok := r.fetchers.Load(project); ok { + return nil + } + + r.setCMS(project, f) + _, err := r.Update(ctx, project) + return err +} + +func (r *Repos) update(ctx context.Context, project string) (*plateauapi.ReposUpdateResult, error) { + fetcher, ok := r.fetchers.Load(project) + if !ok { + return nil, fmt.Errorf("fetcher is not initialized for %s", project) + } + + updated := r.UpdatedAt(project) + var updatedStr string + if !updated.IsZero() { + updatedStr = updated.Format(time.RFC3339) + } + log.Debugfc(ctx, "datacatalogv2: updating repo %s: last_update=%s", project, updatedStr) + + repo, err := fetchAndCreateCache(ctx, project, fetcher, datacatalogv2.FetcherDoOptions{}) + if err != nil { + return nil, err + } + + adminRepo := repo.Clone() + adminRepo.SetAdmin(true) + + log.Debugfc(ctx, "datacatalogv2: updated repo %s", project) + + return &plateauapi.ReposUpdateResult{ + Repo: repo, + AdminRepo: adminRepo, + }, nil +} + +func (r *Repos) setCMS(project string, f datacatalogv2.Fetchable) { + r.fetchers.Store(project, f) +} diff --git a/server/datacatalog/datacatalogv3/repos.go b/server/datacatalog/datacatalogv3/repos.go index 9873c8d05..86dd004ce 100644 --- a/server/datacatalog/datacatalogv3/repos.go +++ b/server/datacatalog/datacatalogv3/repos.go @@ -48,7 +48,7 @@ func (r *Repos) update(ctx context.Context, project string) (*plateauapi.ReposUp if !updated.IsZero() { updatedStr = updated.Format(time.RFC3339) } - log.Infofc(ctx, "datacatalogv3: updating repo %s: last_update=%s", project, updatedStr) + log.Debugfc(ctx, "datacatalogv3: updating repo %s: last_update=%s", project, updatedStr) data, err := cms.GetAll(ctx, project) if err != nil { @@ -64,6 +64,8 @@ func (r *Repos) update(ctx context.Context, project string) (*plateauapi.ReposUp adminRepo.SetAdmin(true) adminRepo.SetIncludedStages(stagesForAdmin...) + log.Debugfc(ctx, "datacatalogv3: updated repo %s", project) + return &plateauapi.ReposUpdateResult{ Repo: repo, AdminRepo: adminRepo, diff --git a/server/datacatalog/plateauapi/inmemory.go b/server/datacatalog/plateauapi/inmemory.go index 9473cd5d0..f73bb69ca 100644 --- a/server/datacatalog/plateauapi/inmemory.go +++ b/server/datacatalog/plateauapi/inmemory.go @@ -43,6 +43,15 @@ func (c *InMemoryRepo) Name() string { return fmt.Sprintf("inmemory(%s)", c.ctx.Name) } +func (c *InMemoryRepo) Clone() *InMemoryRepo { + return &InMemoryRepo{ + ctx: c.ctx, + areasForDataTypes: c.areasForDataTypes, + admin: c.admin, + includedStages: slices.Clone(c.includedStages), + } +} + func (c *InMemoryRepo) SetContext(ctx *InMemoryRepoContext) { c.ctx = ctx c.areasForDataTypes = areasForDatasetTypes(ctx.Datasets.All()) diff --git a/server/datacatalog/repos.go b/server/datacatalog/repos.go index 82df7fa8d..3ca230339 100644 --- a/server/datacatalog/repos.go +++ b/server/datacatalog/repos.go @@ -4,23 +4,22 @@ import ( "context" "fmt" "net/http" - "sort" "strings" "time" + "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2" "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2/datacatalogv2adapter" "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv3" "github.com/eukarya-inc/reearth-plateauview/server/datacatalog/plateauapi" "github.com/eukarya-inc/reearth-plateauview/server/plateaucms" "github.com/labstack/echo/v4" "github.com/reearth/reearthx/log" - "github.com/samber/lo" "golang.org/x/sync/errgroup" ) type reposHandler struct { reposv3 *datacatalogv3.Repos - reposv2 map[string]*plateauapi.RepoWrapper + reposv2 *datacatalogv2adapter.Repos pcms *plateaucms.CMS gqlComplexityLimit int cacheUpdateKey string @@ -38,6 +37,7 @@ func newReposHandler(conf Config) (*reposHandler, error) { } reposv3 := datacatalogv3.NewRepos() + reposv2 := datacatalogv2adapter.NewRepos() if conf.GraphqlMaxComplexity <= 0 { conf.GraphqlMaxComplexity = gqlComplexityLimit @@ -45,7 +45,7 @@ func newReposHandler(conf Config) (*reposHandler, error) { return &reposHandler{ reposv3: reposv3, - reposv2: map[string]*plateauapi.RepoWrapper{}, + reposv2: reposv2, pcms: pcms, gqlComplexityLimit: conf.GraphqlMaxComplexity, cacheUpdateKey: conf.CacheUpdateKey, @@ -130,12 +130,10 @@ func (h *reposHandler) UpdateCache(ctx context.Context) error { }) } - v2prj := lo.Keys(h.reposv2) - sort.Strings(v2prj) - for _, prj := range v2prj { - prj := prj + for _, p := range h.reposv2.Projects() { + p := p g.Go(func() error { - return h.updateV2(ctx, prj) + return h.updateV2(ctx, p) }) } @@ -227,7 +225,7 @@ func (h *reposHandler) getRepo(admin bool, md plateaucms.Metadata) (repo plateau } if isV2(md) { - repo = h.reposv2[md.DataCatalogProjectAlias] + repo = h.reposv2.Repo(md.DataCatalogProjectAlias, admin) } else if isV3(md) { repo = h.reposv3.Repo(md.DataCatalogProjectAlias, admin) } @@ -261,17 +259,16 @@ func (h *reposHandler) prepareV2(ctx context.Context, md plateaucms.Metadata) er return nil } - if _, ok := h.reposv2[md.DataCatalogProjectAlias]; ok { - return nil + fetcher, err := datacatalogv2.NewFetcher(md.CMSBaseURL) + if err != nil { + return fmt.Errorf("datacatalogv2: failed to create fetcher %s: %w", md.DataCatalogProjectAlias, err) } - r, err := datacatalogv2adapter.New(md.CMSBaseURL, md.DataCatalogProjectAlias) - if err != nil { - return fmt.Errorf("datacatalogv3: failed to create repo(v2) %s: %w", md.DataCatalogProjectAlias, err) + if err := h.reposv2.Prepare(ctx, md.DataCatalogProjectAlias, fetcher); err != nil { + return fmt.Errorf("datacatalogv2: failed to prepare repo for %s: %w", md.DataCatalogProjectAlias, err) } - h.reposv2[md.DataCatalogProjectAlias] = r - return h.updateV2(ctx, md.DataCatalogProjectAlias) + return nil } func (h *reposHandler) prepareV3(ctx context.Context, md plateaucms.Metadata) error { @@ -292,21 +289,9 @@ func (h *reposHandler) prepareV3(ctx context.Context, md plateaucms.Metadata) er } func (h *reposHandler) updateV2(ctx context.Context, prj string) error { - r := h.reposv2[prj] - if r == nil { - return nil + if _, err := h.reposv2.Update(ctx, prj); err != nil { + return fmt.Errorf("datacatalogv2: failed to update repo %s: %w", prj, err) } - - log.Infofc(ctx, "datacatalogv3: updating repo(v2) %s", prj) - - if updated, err := r.Update(ctx); err != nil { - return fmt.Errorf("datacatalogv3: failed to update repo(v2) %s: %w", prj, err) - } else if !updated { - log.Infofc(ctx, "datacatalogv3: skip updating repo(v2) %s", prj) - return nil - } - - log.Infofc(ctx, "datacatalogv3: updated repo(v2) %s", prj) return nil }