From 0d88b3fd6cf508a96b7dfbb11e2c4235f75a4603 Mon Sep 17 00:00:00 2001 From: Sandeep Bhardwaj Date: Thu, 9 Jan 2025 14:47:43 +0530 Subject: [PATCH] use goroutines to run jobs --- ext/store/maxcompute/sheet_sync.go | 26 +++++++++++--- internal/lib/pool/basic.go | 54 ++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 5 deletions(-) create mode 100644 internal/lib/pool/basic.go diff --git a/ext/store/maxcompute/sheet_sync.go b/ext/store/maxcompute/sheet_sync.go index fa00392af0..e966fd5ad6 100644 --- a/ext/store/maxcompute/sheet_sync.go +++ b/ext/store/maxcompute/sheet_sync.go @@ -13,6 +13,7 @@ import ( bucket "github.com/goto/optimus/ext/bucket/oss" "github.com/goto/optimus/ext/sheets/gsheet" "github.com/goto/optimus/internal/errors" + "github.com/goto/optimus/internal/lib/pool" ) const ( @@ -35,18 +36,33 @@ func (s *SyncerService) SyncBatch(ctx context.Context, resources []*resource.Res return nil, err } + var jobs []func() pool.JobResult[string] + for _, r := range resources { + r := r + f1 := func() pool.JobResult[string] { + err := processResource(ctx, sheets, ossClient, r) + if err != nil { + return pool.JobResult[string]{Err: err} + } else { + return pool.JobResult[string]{Output: r.FullName()} + } + } + jobs = append(jobs, f1) + } + + resultsChan := pool.RunWithWorkers(0, jobs) + var successNames []string mu := errors.NewMultiError("error in batch sync") - for _, r := range resources { - err := processResource(ctx, sheets, ossClient, r) - if err != nil { + for result := range resultsChan { + if result.Err != nil { mu.Append(err) } else { - successNames = append(successNames, r.FullName()) + successNames = append(successNames, result.Output) } } - return successNames, nil + return successNames, mu.ToErr() } func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error { diff --git a/internal/lib/pool/basic.go b/internal/lib/pool/basic.go new file mode 100644 index 0000000000..4f9b659cf0 --- /dev/null +++ b/internal/lib/pool/basic.go @@ -0,0 +1,54 @@ +package pool + +import ( + "fmt" + "sync" + "time" +) + +type JobResult[T any] struct { + Output T + Err error +} + +const defaultWorkers = 5 + +type Job[T any] func() JobResult[T] + +func RunWithWorkers[T any](workerCount int, fn []func() JobResult[T]) <-chan JobResult[T] { + start := time.Now() + + numberOfWorkers := defaultWorkers + if workerCount > 0 { + numberOfWorkers = workerCount + } + + var wg sync.WaitGroup + wg.Add(numberOfWorkers) + jobChannel := make(chan Job[T]) + jobResultChannel := make(chan JobResult[T], len(fn)) + + // Start the workers + for i := 0; i < numberOfWorkers; i++ { + go worker(i, &wg, jobChannel, jobResultChannel) + } + + // Send jobs to worker + for _, job := range fn { + jobChannel <- job + } + + close(jobChannel) + wg.Wait() + close(jobResultChannel) + + fmt.Printf("Took %s", time.Since(start)) // nolint + return jobResultChannel +} + +func worker[T any](_ int, wg *sync.WaitGroup, jobChannel <-chan Job[T], resultChannel chan JobResult[T]) { + defer wg.Done() + for job := range jobChannel { + resultChannel <- job() + } +}