Skip to content

Commit

Permalink
Batch sync (#323)
Browse files Browse the repository at this point in the history
* fix: improve throughput for sync external

* use goroutines to run jobs

* fix lint issue

* try to fix lint issue

* use named returns

* suppress lint error
  • Loading branch information
sbchaos authored Jan 9, 2025
1 parent 7acd4a4 commit f34b5ec
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 68 deletions.
15 changes: 2 additions & 13 deletions core/resource/service/resource_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ResourceRepository interface {
}

type Syncer interface {
Sync(ctx context.Context, res *resource.Resource) error
SyncBatch(ctx context.Context, resources []*resource.Resource) ([]string, error)
}

type ResourceManager interface {
Expand Down Expand Up @@ -394,18 +394,7 @@ func (rs ResourceService) SyncExternalTables(ctx context.Context, projectName te
return nil, errors.InvalidArgument(resource.EntityResource, "no resources found for filter")
}

var successRes []string
multiError := errors.NewMultiError("error in external table sync")
for _, res := range resources {
syncErr := rs.syncer.Sync(ctx, res)
if syncErr != nil {
multiError.Append(syncErr)
} else {
successRes = append(successRes, res.FullName())
}
}

return successRes, multiError.ToErr()
return rs.syncer.SyncBatch(ctx, resources)
}

func (rs ResourceService) SyncResources(ctx context.Context, tnnt tenant.Tenant, store resource.Store, names []string) (*resource.SyncResponse, error) { // nolint:gocritic
Expand Down
2 changes: 1 addition & 1 deletion ext/store/maxcompute/maxcompute.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (m MaxCompute) Create(ctx context.Context, res *resource.Resource) error {
return handle.Create(res)

case KindExternalTable:
syncer := NewSyncer(m.secretProvider, m.tenantGetter)
syncer := NewSyncer(m.secretProvider)
err = syncer.Sync(ctx, res)
if err != nil {
return err
Expand Down
144 changes: 91 additions & 53 deletions ext/store/maxcompute/sheet_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package maxcompute

import (
"context"
"errors"
"fmt"
"strings"
"time"
Expand All @@ -13,22 +12,56 @@ import (
"github.com/goto/optimus/core/tenant"
bucket "github.com/goto/optimus/ext/bucket/oss"
"github.com/goto/optimus/ext/sheets/gsheet"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/pool"
)

const (
GsheetCredsKey = "GOOGLE_SHEETS_ACCOUNT"
OSSCredsKey = "OSS_CREDS"
ExtLocation = ""
putTimeOut = time.Second * 10
)

type SyncerService struct {
secretProvider SecretProvider
tenantGetter TenantDetailsGetter
}

func NewSyncer(secretProvider SecretProvider, tenantProvider TenantDetailsGetter) *SyncerService {
return &SyncerService{secretProvider: secretProvider, tenantGetter: tenantProvider}
func NewSyncer(secretProvider SecretProvider) *SyncerService {
return &SyncerService{secretProvider: secretProvider}
}

func (s *SyncerService) SyncBatch(ctx context.Context, resources []*resource.Resource) ([]string, error) {
sheets, ossClient, err := s.getClients(ctx, resources[0].Tenant())
if err != nil {
return nil, err
}

var jobs []func() pool.JobResult[string]
for _, r := range resources {
r := r
f1 := func() pool.JobResult[string] {
err := processResource(ctx, sheets, ossClient, r)
if err != nil {
return pool.JobResult[string]{Err: err}
}
return pool.JobResult[string]{Output: r.FullName()}
}
jobs = append(jobs, f1)
}

resultsChan := pool.RunWithWorkers(0, jobs)

var successNames []string
mu := errors.NewMultiError("error in batch sync")
for result := range resultsChan {
if result.Err != nil {
mu.Append(err)
} else {
successNames = append(successNames, result.Output)
}
}

return successNames, mu.ToErr()
}

func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error {
Expand All @@ -43,95 +76,100 @@ func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error
}

if len(et.Source.SourceURIs) == 0 {
return errors.New("source URI is empty for Google Sheet")
return errors.InvalidArgument(EntityExternalTable, "source URI is empty for Google Sheet")
}
uri := et.Source.SourceURIs[0]

// Get sheet content
content, err := s.getGsheet(ctx, res.Tenant(), uri, et.Source.Range)
sheets, ossClient, err := s.getClients(ctx, res.Tenant())
if err != nil {
return err
}

bucketName, err := s.getBucketName(ctx, res, et)
content, err := sheets.GetAsCSV(uri, et.Source.Range)
if err != nil {
return err
}
objectKey, err := s.getObjectKey(ctx, res, et)

bucketName, objectKey, err := getBucketNameAndPath(et.Source.Location, res.FullName())
if err != nil {
return err
}

return s.writeContentToLocation(ctx, res.Tenant(), bucketName, objectKey, content)
return writeToBucket(ctx, ossClient, bucketName, objectKey, content)
}

func (s *SyncerService) getGsheet(ctx context.Context, tnnt tenant.Tenant, sheetURI, sheetRange string) (string, error) {
func (s *SyncerService) getClients(ctx context.Context, tnnt tenant.Tenant) (*gsheet.GSheets, *oss.Client, error) {
secret, err := s.secretProvider.GetSecret(ctx, tnnt, GsheetCredsKey)
if err != nil {
return "", err
return nil, nil, err
}

sheets, err := gsheet.NewGSheets(ctx, secret.Value())
sheetClient, err := gsheet.NewGSheets(ctx, secret.Value())
if err != nil {
return "", err
return nil, nil, err
}
return sheets.GetAsCSV(sheetURI, sheetRange)
}

func (s *SyncerService) getBucketName(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) {
location, err := s.getLocation(ctx, res, et)
creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey)
if err != nil {
return "", err
return nil, nil, err
}
parts := strings.Split(location, "/")
if len(parts) > 3 { // nolint:mnd
bucketName := parts[3]
return bucketName, nil

ossClient, err := bucket.NewOssClient(creds.Value())
if err != nil {
return nil, nil, err
}
return "", errors.New("unable to get bucketName from Location")

return sheetClient, ossClient, nil
}

func (s *SyncerService) getObjectKey(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) {
location, err := s.getLocation(ctx, res, et)
func processResource(ctx context.Context, sheetSrv *gsheet.GSheets, ossClient *oss.Client, res *resource.Resource) error {
et, err := ConvertSpecTo[ExternalTable](res)
if err != nil {
return "", err
return err
}
parts := strings.Split(location, "/")
if len(parts) > 4 { // nolint:mnd
path := strings.Join(parts[4:], "/")
return fmt.Sprintf("%s%s/file.csv", path, res.FullName()), nil

if !strings.EqualFold(et.Source.SourceType, GoogleSheet) {
return nil
}
return "", errors.New("unable to get object path from location")
}

func (s *SyncerService) getLocation(ctx context.Context, res *resource.Resource, et *ExternalTable) (string, error) {
location := et.Source.Location
if location == "" {
details, err := s.tenantGetter.GetDetails(ctx, res.Tenant())
if err != nil {
return "", err
}
loc, err := details.GetConfig(ExtLocation)
if err != nil {
return "", err
}
location = loc
if len(et.Source.SourceURIs) == 0 {
return errors.InvalidArgument(EntityExternalTable, "source URI is empty for Google Sheet")
}
return location, nil
}
uri := et.Source.SourceURIs[0]

func (s *SyncerService) writeContentToLocation(ctx context.Context, tnnt tenant.Tenant, bucketName, objectKey, content string) error {
// Setup oss bucket writer
creds, err := s.secretProvider.GetSecret(ctx, tnnt, OSSCredsKey)
content, err := sheetSrv.GetAsCSV(uri, et.Source.Range)
if err != nil {
return err
}
ossClient, err := bucket.NewOssClient(creds.Value())

bucketName, objectKey, err := getBucketNameAndPath(et.Source.Location, res.FullName())
if err != nil {
return err
}

_, err = ossClient.PutObject(ctx, &oss.PutObjectRequest{
return writeToBucket(ctx, ossClient, bucketName, objectKey, content)
}

func getBucketNameAndPath(loc string, fullName string) (bucketName string, path string, err error) { // nolint
if loc == "" {
err = errors.InvalidArgument(EntityExternalTable, "location for the external table is empty")
return
}

parts := strings.Split(loc, "/")
if len(parts) < 4 { // nolint:mnd
err = errors.InvalidArgument(EntityExternalTable, "unable to parse url "+loc)
return
}

bucketName = parts[3]
components := strings.Join(parts[4:], "/")
path = fmt.Sprintf("%s%s/file.csv", components, fullName)
return
}

func writeToBucket(ctx context.Context, client *oss.Client, bucketName, objectKey, content string) error {
_, err := client.PutObject(ctx, &oss.PutObjectRequest{
Bucket: &bucketName,
Key: &objectKey,
ContentType: oss.Ptr("text/csv"),
Expand Down
54 changes: 54 additions & 0 deletions internal/lib/pool/basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package pool

import (
"fmt"
"sync"
"time"
)

type JobResult[T any] struct {
Output T
Err error
}

const defaultWorkers = 5

type Job[T any] func() JobResult[T]

func RunWithWorkers[T any](workerCount int, fn []func() JobResult[T]) <-chan JobResult[T] {
start := time.Now()

numberOfWorkers := defaultWorkers
if workerCount > 0 {
numberOfWorkers = workerCount
}

var wg sync.WaitGroup
wg.Add(numberOfWorkers)
jobChannel := make(chan Job[T])
jobResultChannel := make(chan JobResult[T], len(fn))

// Start the workers
for i := 0; i < numberOfWorkers; i++ {
go worker(i, &wg, jobChannel, jobResultChannel)
}

// Send jobs to worker
for _, job := range fn {
jobChannel <- job
}

close(jobChannel)
wg.Wait()
close(jobResultChannel)

fmt.Printf("Took %s", time.Since(start)) // nolint
return jobResultChannel
}

func worker[T any](_ int, wg *sync.WaitGroup, jobChannel <-chan Job[T], resultChannel chan JobResult[T]) {
defer wg.Done()
for job := range jobChannel {
resultChannel <- job()
}
}
3 changes: 2 additions & 1 deletion server/optimus.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,13 +358,14 @@ func (s *OptimusServer) setupHandlers() error {
s.logger, jobProviderRepo, jobRunRepo, replayRepository, operatorRunRepository,
newScheduler, newPriorityResolver, jobInputCompiler, s.eventHandler, tProjectService,
)
syncer := mcStore.NewSyncer(tenantService, tenantService)

// Plugin
upstreamIdentifierFactory, _ := upstreamidentifier.NewUpstreamIdentifierFactory(s.logger)
evaluatorFactory, _ := evaluator.NewEvaluatorFactory(s.logger)
pluginService, _ := plugin.NewPluginService(s.logger, s.pluginRepo, upstreamIdentifierFactory, evaluatorFactory)

syncer := mcStore.NewSyncer(tenantService)

// Resource Bounded Context - requirements
resourceRepository := resource.NewRepository(s.dbPool)
backupRepository := resource.NewBackupRepository(s.dbPool)
Expand Down

0 comments on commit f34b5ec

Please sign in to comment.