Skip to content
This repository has been archived by the owner on Apr 8, 2024. It is now read-only.

Commit

Permalink
refactor(server): use repos in datacatalogv2
Browse files Browse the repository at this point in the history
  • Loading branch information
rot1024 committed Mar 14, 2024
1 parent 6086440 commit 6b4c761
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 57 deletions.
25 changes: 0 additions & 25 deletions server/datacatalog/datacatalogv2/datacatalogv2adapter/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,12 @@ package datacatalogv2adapter
import (
"context"
"fmt"
"time"

"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2"
"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/plateauapi"
"golang.org/x/exp/slices"
)

const minCacheDuration = 30 * time.Second

func New(cmsbase, project string) (*plateauapi.RepoWrapper, error) {
fetcher, err := datacatalogv2.NewFetcher(cmsbase)
if err != nil {
return nil, err
}
return From(fetcher, project), nil
}

func From(fetcher datacatalogv2.Fetchable, project string) *plateauapi.RepoWrapper {
r := plateauapi.NewRepoWrapper(nil, func(ctx context.Context, repo *plateauapi.Repo) error {
r, err := fetchAndCreateCache(ctx, project, fetcher, datacatalogv2.FetcherDoOptions{})
if err != nil {
return err
}
*repo = r
return nil
})
r.SetName(fmt.Sprintf("%s(v2)", project))
r.SetMinCacheDuration(minCacheDuration)
return r
}

func fetchAndCreateCache(ctx context.Context, project string, fetcher datacatalogv2.Fetchable, opts datacatalogv2.FetcherDoOptions) (*plateauapi.InMemoryRepo, error) {
r, err := fetcher.Do(ctx, project, opts)
if err != nil {
Expand Down
68 changes: 68 additions & 0 deletions server/datacatalog/datacatalogv2/datacatalogv2adapter/repos.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package datacatalogv2adapter

import (
"context"
"fmt"
"time"

"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2"
"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/plateauapi"
"github.com/reearth/reearthx/log"
"github.com/reearth/reearthx/util"
)

type Repos struct {
fetchers *util.SyncMap[string, datacatalogv2.Fetchable]
*plateauapi.Repos
}

func NewRepos() *Repos {
r := &Repos{
fetchers: util.NewSyncMap[string, datacatalogv2.Fetchable](),
}
r.Repos = plateauapi.NewRepos(r.update)
return r
}

func (r *Repos) Prepare(ctx context.Context, project string, f datacatalogv2.Fetchable) error {
if _, ok := r.fetchers.Load(project); ok {
return nil
}

r.setCMS(project, f)
_, err := r.Update(ctx, project)
return err
}

func (r *Repos) update(ctx context.Context, project string) (*plateauapi.ReposUpdateResult, error) {
fetcher, ok := r.fetchers.Load(project)
if !ok {
return nil, fmt.Errorf("fetcher is not initialized for %s", project)
}

updated := r.UpdatedAt(project)
var updatedStr string
if !updated.IsZero() {
updatedStr = updated.Format(time.RFC3339)
}
log.Debugfc(ctx, "datacatalogv2: updating repo %s: last_update=%s", project, updatedStr)

repo, err := fetchAndCreateCache(ctx, project, fetcher, datacatalogv2.FetcherDoOptions{})
if err != nil {
return nil, err
}

adminRepo := repo.Clone()
adminRepo.SetAdmin(true)

log.Debugfc(ctx, "datacatalogv2: updated repo %s", project)

return &plateauapi.ReposUpdateResult{
Repo: repo,
AdminRepo: adminRepo,
}, nil
}

func (r *Repos) setCMS(project string, f datacatalogv2.Fetchable) {
r.fetchers.Store(project, f)
}
4 changes: 3 additions & 1 deletion server/datacatalog/datacatalogv3/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func (r *Repos) update(ctx context.Context, project string) (*plateauapi.ReposUp
if !updated.IsZero() {
updatedStr = updated.Format(time.RFC3339)
}
log.Infofc(ctx, "datacatalogv3: updating repo %s: last_update=%s", project, updatedStr)
log.Debugfc(ctx, "datacatalogv3: updating repo %s: last_update=%s", project, updatedStr)

data, err := cms.GetAll(ctx, project)
if err != nil {
Expand All @@ -64,6 +64,8 @@ func (r *Repos) update(ctx context.Context, project string) (*plateauapi.ReposUp
adminRepo.SetAdmin(true)
adminRepo.SetIncludedStages(stagesForAdmin...)

log.Debugfc(ctx, "datacatalogv3: updated repo %s", project)

return &plateauapi.ReposUpdateResult{
Repo: repo,
AdminRepo: adminRepo,
Expand Down
9 changes: 9 additions & 0 deletions server/datacatalog/plateauapi/inmemory.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func (c *InMemoryRepo) Name() string {
return fmt.Sprintf("inmemory(%s)", c.ctx.Name)
}

func (c *InMemoryRepo) Clone() *InMemoryRepo {
return &InMemoryRepo{
ctx: c.ctx,
areasForDataTypes: c.areasForDataTypes,
admin: c.admin,
includedStages: slices.Clone(c.includedStages),
}
}

func (c *InMemoryRepo) SetContext(ctx *InMemoryRepoContext) {
c.ctx = ctx
c.areasForDataTypes = areasForDatasetTypes(ctx.Datasets.All())
Expand Down
47 changes: 16 additions & 31 deletions server/datacatalog/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ import (
"context"
"fmt"
"net/http"
"sort"
"strings"
"time"

"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2"
"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv2/datacatalogv2adapter"
"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/datacatalogv3"
"github.com/eukarya-inc/reearth-plateauview/server/datacatalog/plateauapi"
"github.com/eukarya-inc/reearth-plateauview/server/plateaucms"
"github.com/labstack/echo/v4"
"github.com/reearth/reearthx/log"
"github.com/samber/lo"
"golang.org/x/sync/errgroup"
)

type reposHandler struct {
reposv3 *datacatalogv3.Repos
reposv2 map[string]*plateauapi.RepoWrapper
reposv2 *datacatalogv2adapter.Repos
pcms *plateaucms.CMS
gqlComplexityLimit int
cacheUpdateKey string
Expand All @@ -38,14 +37,15 @@ func newReposHandler(conf Config) (*reposHandler, error) {
}

reposv3 := datacatalogv3.NewRepos()
reposv2 := datacatalogv2adapter.NewRepos()

if conf.GraphqlMaxComplexity <= 0 {
conf.GraphqlMaxComplexity = gqlComplexityLimit
}

return &reposHandler{
reposv3: reposv3,
reposv2: map[string]*plateauapi.RepoWrapper{},
reposv2: reposv2,
pcms: pcms,
gqlComplexityLimit: conf.GraphqlMaxComplexity,
cacheUpdateKey: conf.CacheUpdateKey,
Expand Down Expand Up @@ -130,12 +130,10 @@ func (h *reposHandler) UpdateCache(ctx context.Context) error {
})
}

v2prj := lo.Keys(h.reposv2)
sort.Strings(v2prj)
for _, prj := range v2prj {
prj := prj
for _, p := range h.reposv2.Projects() {
p := p
g.Go(func() error {
return h.updateV2(ctx, prj)
return h.updateV2(ctx, p)
})
}

Expand Down Expand Up @@ -227,7 +225,7 @@ func (h *reposHandler) getRepo(admin bool, md plateaucms.Metadata) (repo plateau
}

if isV2(md) {
repo = h.reposv2[md.DataCatalogProjectAlias]
repo = h.reposv2.Repo(md.DataCatalogProjectAlias, admin)
} else if isV3(md) {
repo = h.reposv3.Repo(md.DataCatalogProjectAlias, admin)
}
Expand Down Expand Up @@ -261,17 +259,16 @@ func (h *reposHandler) prepareV2(ctx context.Context, md plateaucms.Metadata) er
return nil
}

if _, ok := h.reposv2[md.DataCatalogProjectAlias]; ok {
return nil
fetcher, err := datacatalogv2.NewFetcher(md.CMSBaseURL)
if err != nil {
return fmt.Errorf("datacatalogv2: failed to create fetcher %s: %w", md.DataCatalogProjectAlias, err)
}

r, err := datacatalogv2adapter.New(md.CMSBaseURL, md.DataCatalogProjectAlias)
if err != nil {
return fmt.Errorf("datacatalogv3: failed to create repo(v2) %s: %w", md.DataCatalogProjectAlias, err)
if err := h.reposv2.Prepare(ctx, md.DataCatalogProjectAlias, fetcher); err != nil {
return fmt.Errorf("datacatalogv2: failed to prepare repo for %s: %w", md.DataCatalogProjectAlias, err)
}

h.reposv2[md.DataCatalogProjectAlias] = r
return h.updateV2(ctx, md.DataCatalogProjectAlias)
return nil
}

func (h *reposHandler) prepareV3(ctx context.Context, md plateaucms.Metadata) error {
Expand All @@ -292,21 +289,9 @@ func (h *reposHandler) prepareV3(ctx context.Context, md plateaucms.Metadata) er
}

func (h *reposHandler) updateV2(ctx context.Context, prj string) error {
r := h.reposv2[prj]
if r == nil {
return nil
if _, err := h.reposv2.Update(ctx, prj); err != nil {
return fmt.Errorf("datacatalogv2: failed to update repo %s: %w", prj, err)
}

log.Infofc(ctx, "datacatalogv3: updating repo(v2) %s", prj)

if updated, err := r.Update(ctx); err != nil {
return fmt.Errorf("datacatalogv3: failed to update repo(v2) %s: %w", prj, err)
} else if !updated {
log.Infofc(ctx, "datacatalogv3: skip updating repo(v2) %s", prj)
return nil
}

log.Infofc(ctx, "datacatalogv3: updated repo(v2) %s", prj)
return nil
}

Expand Down

0 comments on commit 6b4c761

Please sign in to comment.