From f34b5ece9349ce8bc7444f6b1bf71277ce9df635 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Thu, 9 Jan 2025 19:07:23 +0530 Subject: [PATCH] Batch sync (#323) * fix: improve throughput for sync external * use goroutines to run jobs * fix lint issue * try to fix lint issue * use named returns * suppress lint error --- core/resource/service/resource_service.go | 15 +-- ext/store/maxcompute/maxcompute.go | 2 +- ext/store/maxcompute/sheet_sync.go | 144 ++++++++++++++-------- internal/lib/pool/basic.go | 54 ++++++++ server/optimus.go | 3 +- 5 files changed, 150 insertions(+), 68 deletions(-) create mode 100644 internal/lib/pool/basic.go diff --git a/core/resource/service/resource_service.go b/core/resource/service/resource_service.go index 33932993cb..f99530cbcb 100644 --- a/core/resource/service/resource_service.go +++ b/core/resource/service/resource_service.go @@ -38,7 +38,7 @@ type ResourceRepository interface { } type Syncer interface { - Sync(ctx context.Context, res *resource.Resource) error + SyncBatch(ctx context.Context, resources []*resource.Resource) ([]string, error) } type ResourceManager interface { @@ -394,18 +394,7 @@ func (rs ResourceService) SyncExternalTables(ctx context.Context, projectName te return nil, errors.InvalidArgument(resource.EntityResource, "no resources found for filter") } - var successRes []string - multiError := errors.NewMultiError("error in external table sync") - for _, res := range resources { - syncErr := rs.syncer.Sync(ctx, res) - if syncErr != nil { - multiError.Append(syncErr) - } else { - successRes = append(successRes, res.FullName()) - } - } - - return successRes, multiError.ToErr() + return rs.syncer.SyncBatch(ctx, resources) } func (rs ResourceService) SyncResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) (*resource.SyncResponse, error) { // nolint:gocritic diff --git a/ext/store/maxcompute/maxcompute.go b/ext/store/maxcompute/maxcompute.go index 0f036dd133..bb1794f51b 100644 --- a/ext/store/maxcompute/maxcompute.go +++ b/ext/store/maxcompute/maxcompute.go @@ -82,7 +82,7 @@ func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error { return handle.Create(res) case KindExternalTable: - syncer := NewSyncer(m.secretProvider, m.tenantGetter) + syncer := NewSyncer(m.secretProvider) err = syncer.Sync(ctx, res) if err != nil { return err diff --git a/ext/store/maxcompute/sheet_sync.go b/ext/store/maxcompute/sheet_sync.go index 4e5f60f534..cb7ab1e988 100644 --- a/ext/store/maxcompute/sheet_sync.go +++ b/ext/store/maxcompute/sheet_sync.go @@ -2,7 +2,6 @@ package maxcompute import ( "context" - "errors" "fmt" "strings" "time" @@ -13,22 +12,56 @@ import ( "github.com/goto/optimus/core/tenant" bucket "github.com/goto/optimus/ext/bucket/oss" "github.com/goto/optimus/ext/sheets/gsheet" + "github.com/goto/optimus/internal/errors" + "github.com/goto/optimus/internal/lib/pool" ) const ( GsheetCredsKey = "GOOGLE_SHEETS_ACCOUNT" OSSCredsKey = "OSS_CREDS" - ExtLocation = "" putTimeOut = time.Second * 10 ) type SyncerService struct { secretProvider SecretProvider - tenantGetter TenantDetailsGetter } -func NewSyncer(secretProvider SecretProvider, tenantProvider TenantDetailsGetter) *SyncerService { - return &SyncerService{secretProvider: secretProvider, tenantGetter: tenantProvider} +func NewSyncer(secretProvider SecretProvider) *SyncerService { + return &SyncerService{secretProvider: secretProvider} +} + +func (s *SyncerService) SyncBatch(ctx context.Context, resources []*resource.Resource) ([]string, error) { + sheets, ossClient, err := s.getClients(ctx, resources[0].Tenant()) + if err != nil { + return nil, err + } + + var jobs []func() pool.JobResult[string] + for _, r := range resources { + r := r + f1 := func() pool.JobResult[string] { + err := processResource(ctx, sheets, ossClient, r) + if err != nil { + return pool.JobResult[string]{Err: err} + } + return pool.JobResult[string]{Output: r.FullName()} + } + jobs = append(jobs, f1) + } + + resultsChan := pool.RunWithWorkers(0, jobs) + + var successNames []string + mu := errors.NewMultiError("error in batch sync") + for result := range resultsChan { + if result.Err != nil { + mu.Append(err) + } else { + successNames = append(successNames, result.Output) + } + } + + return successNames, mu.ToErr() } func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error { @@ -43,95 +76,100 @@ func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error } if len(et.Source.SourceURIs) == 0 { - return errors.New("source URI is empty for Google Sheet") + return errors.InvalidArgument(EntityExternalTable, "source URI is empty for Google Sheet") } uri := et.Source.SourceURIs[0] - // Get sheet content - content, err := s.getGsheet(ctx, res.Tenant(), uri, et.Source.Range) + sheets, ossClient, err := s.getClients(ctx, res.Tenant()) if err != nil { return err } - bucketName, err := s.getBucketName(ctx, res, et) + content, err := sheets.GetAsCSV(uri, et.Source.Range) if err != nil { return err } - objectKey, err := s.getObjectKey(ctx, res, et) + + bucketName, objectKey, err := getBucketNameAndPath(et.Source.Location, res.FullName()) if err != nil { return err } - return s.writeContentToLocation(ctx, res.Tenant(), bucketName, objectKey, content) + return writeToBucket(ctx, ossClient, bucketName, objectKey, content) } -func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheetURI, sheetRange string) (string, error) { +func (s *SyncerService) getClients(ctx context.Context, tnnt tenant.Tenant) (*gsheet.GSheets, *oss.Client, error) { secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCredsKey) if err != nil { - return "", err + return nil, nil, err } - sheets, err := gsheet.NewGSheets(ctx, secret.Value()) + sheetClient, err := gsheet.NewGSheets(ctx, secret.Value()) if err != nil { - return "", err + return nil, nil, err } - return sheets.GetAsCSV(sheetURI, sheetRange) -} -func (s *SyncerService) getBucketName(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { - location, err := s.getLocation(ctx, res, et) + creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey) if err != nil { - return "", err + return nil, nil, err } - parts := strings.Split(location, "/") - if len(parts) > 3 { // nolint:mnd - bucketName := parts[3] - return bucketName, nil + + ossClient, err := bucket.NewOssClient(creds.Value()) + if err != nil { + return nil, nil, err } - return "", errors.New("unable to get bucketName from Location") + + return sheetClient, ossClient, nil } -func (s *SyncerService) getObjectKey(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { - location, err := s.getLocation(ctx, res, et) +func processResource(ctx context.Context, sheetSrv *gsheet.GSheets, ossClient *oss.Client, res *resource.Resource) error { + et, err := ConvertSpecTo[ExternalTable](res) if err != nil { - return "", err + return err } - parts := strings.Split(location, "/") - if len(parts) > 4 { // nolint:mnd - path := strings.Join(parts[4:], "/") - return fmt.Sprintf("%s%s/file.csv", path, res.FullName()), nil + + if !strings.EqualFold(et.Source.SourceType, GoogleSheet) { + return nil } - return "", errors.New("unable to get object path from location") -} -func (s *SyncerService) getLocation(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) { - location := et.Source.Location - if location == "" { - details, err := s.tenantGetter.GetDetails(ctx, res.Tenant()) - if err != nil { - return "", err - } - loc, err := details.GetConfig(ExtLocation) - if err != nil { - return "", err - } - location = loc + if len(et.Source.SourceURIs) == 0 { + return errors.InvalidArgument(EntityExternalTable, "source URI is empty for Google Sheet") } - return location, nil -} + uri := et.Source.SourceURIs[0] -func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant.Tenant, bucketName, objectKey, content string) error { - // Setup oss bucket writer - creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey) + content, err := sheetSrv.GetAsCSV(uri, et.Source.Range) if err != nil { return err } - ossClient, err := bucket.NewOssClient(creds.Value()) + + bucketName, objectKey, err := getBucketNameAndPath(et.Source.Location, res.FullName()) if err != nil { return err } - _, err = ossClient.PutObject(ctx, &oss.PutObjectRequest{ + return writeToBucket(ctx, ossClient, bucketName, objectKey, content) +} + +func getBucketNameAndPath(loc string, fullName string) (bucketName string, path string, err error) { // nolint + if loc == "" { + err = errors.InvalidArgument(EntityExternalTable, "location for the external table is empty") + return + } + + parts := strings.Split(loc, "/") + if len(parts) < 4 { // nolint:mnd + err = errors.InvalidArgument(EntityExternalTable, "unable to parse url "+loc) + return + } + + bucketName = parts[3] + components := strings.Join(parts[4:], "/") + path = fmt.Sprintf("%s%s/file.csv", components, fullName) + return +} + +func writeToBucket(ctx context.Context, client *oss.Client, bucketName, objectKey, content string) error { + _, err := client.PutObject(ctx, &oss.PutObjectRequest{ Bucket: &bucketName, Key: &objectKey, ContentType: oss.Ptr("text/csv"), diff --git a/internal/lib/pool/basic.go b/internal/lib/pool/basic.go new file mode 100644 index 0000000000..4f9b659cf0 --- /dev/null +++ b/internal/lib/pool/basic.go @@ -0,0 +1,54 @@ +package pool + +import ( + "fmt" + "sync" + "time" +) + +type JobResult[T any] struct { + Output T + Err error +} + +const defaultWorkers = 5 + +type Job[T any] func() JobResult[T] + +func RunWithWorkers[T any](workerCount int, fn []func() JobResult[T]) <-chan JobResult[T] { + start := time.Now() + + numberOfWorkers := defaultWorkers + if workerCount > 0 { + numberOfWorkers = workerCount + } + + var wg sync.WaitGroup + wg.Add(numberOfWorkers) + jobChannel := make(chan Job[T]) + jobResultChannel := make(chan JobResult[T], len(fn)) + + // Start the workers + for i := 0; i < numberOfWorkers; i++ { + go worker(i, &wg, jobChannel, jobResultChannel) + } + + // Send jobs to worker + for _, job := range fn { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + close(jobResultChannel) + + fmt.Printf("Took %s", time.Since(start)) // nolint + return jobResultChannel +} + +func worker[T any](_ int, wg *sync.WaitGroup, jobChannel <-chan Job[T], resultChannel chan JobResult[T]) { + defer wg.Done() + for job := range jobChannel { + resultChannel <- job() + } +} diff --git a/server/optimus.go b/server/optimus.go index d5b336e47f..3da774774d 100644 --- a/server/optimus.go +++ b/server/optimus.go @@ -358,13 +358,14 @@ func (s *OptimusServer) setupHandlers() error { s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository, newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectService, ) - syncer := mcStore.NewSyncer(tenantService, tenantService) // Plugin upstreamIdentifierFactory, _ := upstreamidentifier.NewUpstreamIdentifierFactory(s.logger) evaluatorFactory, _ := evaluator.NewEvaluatorFactory(s.logger) pluginService, _ := plugin.NewPluginService(s.logger, s.pluginRepo, upstreamIdentifierFactory, evaluatorFactory) + syncer := mcStore.NewSyncer(tenantService) + // Resource Bounded Context - requirements resourceRepository := resource.NewRepository(s.dbPool) backupRepository := resource.NewBackupRepository(s.dbPool)