Skip to content

Commit

Permalink
use goroutines to run jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
sbchaos committed Jan 9, 2025
1 parent 5516565 commit 0d88b3f
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 5 deletions.
26 changes: 21 additions & 5 deletions ext/store/maxcompute/sheet_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
bucket "github.com/goto/optimus/ext/bucket/oss"
"github.com/goto/optimus/ext/sheets/gsheet"
"github.com/goto/optimus/internal/errors"
"github.com/goto/optimus/internal/lib/pool"
)

const (
Expand All @@ -35,18 +36,33 @@ func (s *SyncerService) SyncBatch(ctx context.Context, resources []*resource.Res
return nil, err
}

var jobs []func() pool.JobResult[string]
for _, r := range resources {
r := r
f1 := func() pool.JobResult[string] {
err := processResource(ctx, sheets, ossClient, r)
if err != nil {
return pool.JobResult[string]{Err: err}
} else {

Check failure on line 46 in ext/store/maxcompute/sheet_sync.go

View workflow job for this annotation

GitHub Actions / lint

indent-error-flow: if block ends with a return statement, so drop this else and outdent its block (revive)
return pool.JobResult[string]{Output: r.FullName()}
}
}
jobs = append(jobs, f1)
}

resultsChan := pool.RunWithWorkers(0, jobs)

var successNames []string
mu := errors.NewMultiError("error in batch sync")
for _, r := range resources {
err := processResource(ctx, sheets, ossClient, r)
if err != nil {
for result := range resultsChan {
if result.Err != nil {
mu.Append(err)
} else {
successNames = append(successNames, r.FullName())
successNames = append(successNames, result.Output)
}
}

return successNames, nil
return successNames, mu.ToErr()
}

func (s *SyncerService) Sync(ctx context.Context, res *resource.Resource) error {
Expand Down
54 changes: 54 additions & 0 deletions internal/lib/pool/basic.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package pool

import (
"fmt"
"sync"
"time"
)

type JobResult[T any] struct {
Output T
Err error
}

const defaultWorkers = 5

type Job[T any] func() JobResult[T]

func RunWithWorkers[T any](workerCount int, fn []func() JobResult[T]) <-chan JobResult[T] {
start := time.Now()

numberOfWorkers := defaultWorkers
if workerCount > 0 {
numberOfWorkers = workerCount
}

var wg sync.WaitGroup
wg.Add(numberOfWorkers)
jobChannel := make(chan Job[T])
jobResultChannel := make(chan JobResult[T], len(fn))

// Start the workers
for i := 0; i < numberOfWorkers; i++ {
go worker(i, &wg, jobChannel, jobResultChannel)
}

// Send jobs to worker
for _, job := range fn {
jobChannel <- job
}

close(jobChannel)
wg.Wait()
close(jobResultChannel)

fmt.Printf("Took %s", time.Since(start)) // nolint
return jobResultChannel
}

func worker[T any](_ int, wg *sync.WaitGroup, jobChannel <-chan Job[T], resultChannel chan JobResult[T]) {
defer wg.Done()
for job := range jobChannel {
resultChannel <- job()
}
}

0 comments on commit 0d88b3f

Please sign in to comment.