From 9163d6ccc1b41709232849ba075c8d09fa826378 Mon Sep 17 00:00:00 2001 From: Trock Date: Mon, 8 Aug 2022 18:56:35 +0800 Subject: [PATCH 1/4] refactor(pool): rough modify --- control/pool/dependency.go | 92 +++++++++++++++++++++++----- control/pool/example_test.go | 2 +- control/pool/factory.go | 50 +++++++++++++++ control/pool/pool.go | 115 +++++++++++++++++++++-------------- control/pool/pool_test.go | 59 +++++++++++------- 5 files changed, 233 insertions(+), 85 deletions(-) create mode 100644 control/pool/factory.go diff --git a/control/pool/dependency.go b/control/pool/dependency.go index ae8ab805..f6d5538a 100644 --- a/control/pool/dependency.go +++ b/control/pool/dependency.go @@ -1,29 +1,89 @@ package pool import ( + "fmt" + "sync" + "time" + + "github.com/DoNewsCode/core/config" + "github.com/DoNewsCode/core/contract" + "github.com/DoNewsCode/core/contract/lifecycle" "github.com/DoNewsCode/core/di" ) -// Providers provide a *pool.Pool to the core. -func Providers(options ...ProviderOptionFunc) di.Deps { - return di.Deps{func() *Pool { - return NewPool(options...) - }} +func Providers() di.Deps { + return di.Deps{ + provideDefaultPool, + providePoolFactory(), + di.Bind(new(*Factory), new(Maker)), + } +} + +func provideDefaultPool(maker Maker) (*Pool, error) { + return maker.Make("default") } -// ProviderOptionFunc is the functional option to Providers. -type ProviderOptionFunc func(pool *Pool) +// factoryIn is the injection parameter for provideDatabaseOut. +type factoryIn struct { + di.In -// WithConcurrency sets the maximum concurrency for the pool. -func WithConcurrency(concurrency int) ProviderOptionFunc { - return func(pool *Pool) { - pool.concurrency = concurrency - } + Conf contract.ConfigUnmarshaler + OnReloadEvent lifecycle.ConfigReload `optional:"true"` +} + +type poolConfig struct { + Cap int32 `yaml:"cap" json:"cap"` + Concurrency int32 `yaml:"concurrency" json:"concurrency"` + Timeout config.Duration `yaml:"timeout" json:"timeout"` +} + +// out +type out struct { + di.Out + + Factory *Factory } -// WithCounter sets the counter for the pool. -func WithCounter(counter *Counter) ProviderOptionFunc { - return func(pool *Pool) { - pool.counter = counter +func providePoolFactory() func(p factoryIn) (out, func(), error) { + var wg = &sync.WaitGroup{} + return func(factoryIn factoryIn) (out, func(), error) { + factory := di.NewFactory[*Pool](func(name string) (pair di.Pair[*Pool], err error) { + var ( + conf poolConfig + ) + if err := factoryIn.Conf.Unmarshal(fmt.Sprintf("pool.%s", name), &conf); err != nil { + if name != "default" { + return pair, fmt.Errorf("pool configuration %s not valid: %w", name, err) + } + } + pool := &Pool{ + cap: 10, + concurrency: 1000, + ch: make(chan job), + wg: wg, + timeout: 10 * time.Minute, + } + if conf.Cap > 0 { + pool.cap = conf.Cap + } + if conf.Concurrency > 0 { + pool.concurrency = conf.Concurrency + } + if !conf.Timeout.IsZero() { + pool.timeout = conf.Timeout.Duration + } + + return di.Pair[*Pool]{ + Conn: pool, + Closer: nil, + }, err + }) + + return out{ + Factory: &Factory{ + factory: factory, + wg: wg, + }, + }, factory.Close, nil } } diff --git a/control/pool/example_test.go b/control/pool/example_test.go index ed63e887..b140b93d 100644 --- a/control/pool/example_test.go +++ b/control/pool/example_test.go @@ -18,7 +18,7 @@ func Example() { core.WithInline("http.addr", ":9777"), core.WithInline("log.level", "none"), ) - c.Provide(pool.Providers(pool.WithConcurrency(1))) + c.Provide(pool.Providers()) c.Invoke(func(p *pool.Pool, dispatcher lifecycle.HTTPServerStart) { dispatcher.On(func(ctx context.Context, payload lifecycle.HTTPServerStartPayload) error { diff --git a/control/pool/factory.go b/control/pool/factory.go new file mode 100644 index 00000000..5d42588b --- /dev/null +++ b/control/pool/factory.go @@ -0,0 +1,50 @@ +package pool + +import ( + "context" + "sync" + + "github.com/DoNewsCode/core/di" + "github.com/oklog/run" +) + +// Maker models Factory +type Maker interface { + Make(name string) (*Pool, error) +} + +type Factory struct { + wg *sync.WaitGroup + + factory *di.Factory[*Pool] +} + +func (f *Factory) Make(name string) (*Pool, error) { + return f.factory.Make(name) +} + +// ProvideRunGroup implements core.RunProvider +func (f *Factory) ProvideRunGroup(group *run.Group) { + ctx, cancel := context.WithCancel(context.Background()) + + group.Add(func() error { + f.run(ctx) + return nil + }, func(err error) { + cancel() + }) +} + +// Module implements di.Modular +func (f *Factory) Module() interface{} { + return f +} + +func (f *Factory) run(ctx context.Context) { + f.wg.Add(1) + go func() { + defer f.wg.Done() + <-ctx.Done() + }() + f.wg.Wait() +} diff --git a/control/pool/pool.go b/control/pool/pool.go index 90ac5b6a..0c47811b 100644 --- a/control/pool/pool.go +++ b/control/pool/pool.go @@ -14,7 +14,7 @@ // Let's go through all the disadvantages. First, the backpressure is lost. // There is no way to limit the maximum goroutine the handler can create. clients // can easily flood the server. Secondly, the graceful shutdown process is -// ruined. The http server can shutdown itself without losing any request, but +// ruined. The http server can shut down itself without losing any request, but // the async jobs created with "go" are not protected by the server. You will // lose all unfinished jobs once the server shuts down and program exits. lastly, // the async job may want to access the original request context, maybe for @@ -45,24 +45,12 @@ package pool import ( "context" "sync" + "sync/atomic" + "time" "github.com/DoNewsCode/core/ctxmeta" - - "github.com/oklog/run" ) -// NewPool returned func(contract.Dispatcher) *Pool -func NewPool(options ...ProviderOptionFunc) *Pool { - pool := Pool{ - ch: make(chan job), - concurrency: 10, - } - for _, f := range options { - f(&pool) - } - return &pool -} - type job struct { fn func() } @@ -72,24 +60,15 @@ type job struct { // goroutine directly. type Pool struct { ch chan job - concurrency int counter *Counter -} - -// ProvideRunGroup implements core.RunProvider -func (p *Pool) ProvideRunGroup(group *run.Group) { - ctx, cancel := context.WithCancel(context.Background()) + jobCount int32 + workerCount int32 - group.Add(func() error { - return p.Run(ctx) - }, func(err error) { - cancel() - }) -} + wg *sync.WaitGroup -// Module implements di.Modular -func (p *Pool) Module() interface{} { - return p + cap int32 + concurrency int32 + timeout time.Duration } // Go dispatchers a job to the async worker pool. requestContext is the context @@ -98,37 +77,83 @@ func (p *Pool) Module() interface{} { // nothing to do with the request. If the pool has reached max concurrency, the job will // be executed in the current goroutine. In other word, the job will be executed synchronously. func (p *Pool) Go(requestContext context.Context, function func(asyncContext context.Context)) { + p.incJobCount() + p.loadWorker() j := job{ fn: func() { function(ctxmeta.WithoutCancel(requestContext)) }, } - select { - case p.ch <- j: - default: - p.counter.IncSyncJob() - j.fn() + p.ch <- j +} + +func (p *Pool) WorkerCount() int32 { + return atomic.LoadInt32(&p.workerCount) +} + +func (p *Pool) incJobCount() { + atomic.AddInt32(&p.jobCount, 1) +} + +func (p *Pool) decJobCount() { + atomic.AddInt32(&p.jobCount, -1) +} + +func (p *Pool) incWorkerCount() { + atomic.AddInt32(&p.workerCount, 1) +} + +func (p *Pool) decWorkerCount() { + atomic.AddInt32(&p.workerCount, -1) +} + +func (p *Pool) needIncWorker() int32 { + // at least one worker keepalive + if p.WorkerCount() == 0 { + return 1 } + + if concurrency, jobCount := atomic.LoadInt32(&p.concurrency), atomic.LoadInt32(&p.jobCount); (concurrency == 0 || p.WorkerCount() < concurrency) && jobCount >= p.cap { + // calculate the number of workers to be added + return jobCount/p.cap - p.WorkerCount() + } + return 0 } -// Run starts the async worker pool and block until it finishes. -func (p *Pool) Run(ctx context.Context) error { - var wg sync.WaitGroup - for i := 0; i < p.concurrency; i++ { - wg.Add(1) +func (p *Pool) loadWorker() { + v := p.needIncWorker() + if v == 0 { + return + } + + for i := 0; i < int(v); i++ { + p.wg.Add(1) + p.incWorkerCount() + go func() { - defer wg.Done() + timer := time.NewTimer(p.timeout) + defer func() { + p.decWorkerCount() + timer.Stop() + p.wg.Done() + }() for { select { case j := <-p.ch: p.counter.IncAsyncJob() + timer.Reset(p.timeout) j.fn() - case <-ctx.Done(): - return + p.decJobCount() + case <-timer.C: + if p.WorkerCount() > 1 && atomic.LoadInt32(&p.jobCount)/p.cap-p.WorkerCount() < 0 { + return + } + timer.Reset(p.timeout) } } + }() } - wg.Wait() - return nil + + return } diff --git a/control/pool/pool_test.go b/control/pool/pool_test.go index 8499052b..eb72022f 100644 --- a/control/pool/pool_test.go +++ b/control/pool/pool_test.go @@ -5,55 +5,77 @@ import ( "testing" "time" - "github.com/oklog/run" + "github.com/DoNewsCode/core/config" ) func TestPool_Go(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) - p := NewPool(WithConcurrency(1)) - go p.Run(ctx) - time.Sleep(time.Millisecond) + + f, _, _ := providePoolFactory()(factoryIn{ + Conf: config.MapAdapter{}, + }) + go f.Factory.run(ctx) + + p, _ := f.Factory.Make("default") p.Go(context.Background(), func(asyncContext context.Context) { cancel() }) } -func TestPool_FallbackToSyncMode(t *testing.T) { +func TestPool_CapLimit(t *testing.T) { t.Parallel() - ctx, cancel := context.WithTimeout(context.Background(), time.Second) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() - p := NewPool(WithConcurrency(1)) - go p.Run(ctx) - time.Sleep(time.Millisecond) + f, _, _ := providePoolFactory()(factoryIn{Conf: config.MapAdapter{ + "pool": map[string]any{ + "default": map[string]any{ + "cap": 1, + "timeout": "1s", + }, + }, + }}) + go f.Factory.run(ctx) + + p, _ := f.Factory.Make("default") ts := time.Now() var executed = make(chan struct{}) - // saturate the pool + // job1 p.Go(ctx, func(asyncContext context.Context) { time.Sleep(time.Second) }) - // fallback to sync mode + if p.WorkerCount() != 1 { + t.Fatal("worker count should be 1") + } + // job2 p.Go(ctx, func(asyncContext context.Context) { close(executed) }) + if p.WorkerCount() != 2 { + t.Fatal("worker count should be 2") + } <-executed // job channel not be blocked, so the interval should be less than 1 second if time.Since(ts) >= time.Second { t.Fatal("timeout: sync mode should be used") } + time.Sleep(time.Second) + if p.WorkerCount() != 1 { + t.Fatal("worker should be recycle") + } } func TestPool_contextValue(t *testing.T) { t.Parallel() ctx, cancel := context.WithCancel(context.Background()) defer cancel() + f, _, _ := providePoolFactory()(factoryIn{Conf: config.MapAdapter{}}) + go f.Factory.run(ctx) - p := NewPool(WithConcurrency(1)) - go p.Run(ctx) - time.Sleep(time.Millisecond) + p, _ := f.Factory.Make("default") key := struct{}{} requestContext := context.WithValue(context.Background(), key, "foo") @@ -69,12 +91,3 @@ func TestPool_contextValue(t *testing.T) { cancel() }) } - -func TestPool_ProvideRunGroup(t *testing.T) { - t.Parallel() - p := NewPool(WithConcurrency(1)) - var group run.Group - group.Add(func() error { return nil }, func(err error) {}) - p.ProvideRunGroup(&group) - group.Run() -} From 23c5c4fc5813103bf3ebf25c266c4952f9d3a8f2 Mon Sep 17 00:00:00 2001 From: Trock Date: Tue, 9 Aug 2022 20:42:05 +0800 Subject: [PATCH 2/4] refactor(pool): worker --- control/pool/dependency.go | 61 ++++++++++---------- control/pool/factory.go | 40 +------------- control/pool/pool.go | 95 ++++---------------------------- control/pool/pool_test.go | 73 ++++++------------------ control/pool/worker.go | 110 +++++++++++++++++++++++++++++++++++++ 5 files changed, 170 insertions(+), 209 deletions(-) create mode 100644 control/pool/worker.go diff --git a/control/pool/dependency.go b/control/pool/dependency.go index f6d5538a..5f0534f4 100644 --- a/control/pool/dependency.go +++ b/control/pool/dependency.go @@ -1,8 +1,7 @@ package pool import ( - "fmt" - "sync" + "context" "time" "github.com/DoNewsCode/core/config" @@ -45,32 +44,19 @@ type out struct { } func providePoolFactory() func(p factoryIn) (out, func(), error) { - var wg = &sync.WaitGroup{} + ctx, cancel := context.WithCancel(context.Background()) + var worker = &worker{ + ch: make(chan job), + incWorkerChan: make(chan int32), + cap: 10, + timeout: 10 * time.Minute, + } return func(factoryIn factoryIn) (out, func(), error) { factory := di.NewFactory[*Pool](func(name string) (pair di.Pair[*Pool], err error) { - var ( - conf poolConfig - ) - if err := factoryIn.Conf.Unmarshal(fmt.Sprintf("pool.%s", name), &conf); err != nil { - if name != "default" { - return pair, fmt.Errorf("pool configuration %s not valid: %w", name, err) - } - } + pool := &Pool{ - cap: 10, - concurrency: 1000, - ch: make(chan job), - wg: wg, - timeout: 10 * time.Minute, - } - if conf.Cap > 0 { - pool.cap = conf.Cap - } - if conf.Concurrency > 0 { - pool.concurrency = conf.Concurrency - } - if !conf.Timeout.IsZero() { - pool.timeout = conf.Timeout.Duration + ch: worker.ch, + incJobCountFunc: worker.incJobCount, } return di.Pair[*Pool]{ @@ -78,12 +64,25 @@ func providePoolFactory() func(p factoryIn) (out, func(), error) { Closer: nil, }, err }) - + var ( + conf poolConfig + ) + _ = factoryIn.Conf.Unmarshal("pool", &conf) + if conf.Cap > 0 { + worker.cap = conf.Cap + } + if conf.Concurrency > 0 { + worker.concurrency = conf.Concurrency + } + if !conf.Timeout.IsZero() { + worker.timeout = conf.Timeout.Duration + } + worker.run(ctx) return out{ - Factory: &Factory{ - factory: factory, - wg: wg, - }, - }, factory.Close, nil + Factory: factory, + }, func() { + cancel() + factory.Close() + }, nil } } diff --git a/control/pool/factory.go b/control/pool/factory.go index 5d42588b..d63e33b7 100644 --- a/control/pool/factory.go +++ b/control/pool/factory.go @@ -1,11 +1,7 @@ package pool import ( - "context" - "sync" - "github.com/DoNewsCode/core/di" - "github.com/oklog/run" ) // Maker models Factory @@ -13,38 +9,4 @@ type Maker interface { Make(name string) (*Pool, error) } -type Factory struct { - wg *sync.WaitGroup - - factory *di.Factory[*Pool] -} - -func (f *Factory) Make(name string) (*Pool, error) { - return f.factory.Make(name) -} - -// ProvideRunGroup implements core.RunProvider -func (f *Factory) ProvideRunGroup(group *run.Group) { - ctx, cancel := context.WithCancel(context.Background()) - - group.Add(func() error { - f.run(ctx) - return nil - }, func(err error) { - cancel() - }) -} - -// Module implements di.Modular -func (f *Factory) Module() interface{} { - return f -} - -func (f *Factory) run(ctx context.Context) { - f.wg.Add(1) - go func() { - defer f.wg.Done() - <-ctx.Done() - }() - f.wg.Wait() -} +type Factory = di.Factory[*Pool] diff --git a/control/pool/pool.go b/control/pool/pool.go index 0c47811b..f2c79889 100644 --- a/control/pool/pool.go +++ b/control/pool/pool.go @@ -44,9 +44,6 @@ package pool import ( "context" - "sync" - "sync/atomic" - "time" "github.com/DoNewsCode/core/ctxmeta" ) @@ -59,16 +56,9 @@ type job struct { // web servers. See the package documentation about its advantage over creating a // goroutine directly. type Pool struct { - ch chan job - counter *Counter - jobCount int32 - workerCount int32 - - wg *sync.WaitGroup - - cap int32 - concurrency int32 - timeout time.Duration + counter *Counter + ch chan job + incJobCountFunc func() } // Go dispatchers a job to the async worker pool. requestContext is the context @@ -77,83 +67,20 @@ type Pool struct { // nothing to do with the request. If the pool has reached max concurrency, the job will // be executed in the current goroutine. In other word, the job will be executed synchronously. func (p *Pool) Go(requestContext context.Context, function func(asyncContext context.Context)) { - p.incJobCount() - p.loadWorker() j := job{ fn: func() { function(ctxmeta.WithoutCancel(requestContext)) }, } - p.ch <- j -} - -func (p *Pool) WorkerCount() int32 { - return atomic.LoadInt32(&p.workerCount) -} - -func (p *Pool) incJobCount() { - atomic.AddInt32(&p.jobCount, 1) -} - -func (p *Pool) decJobCount() { - atomic.AddInt32(&p.jobCount, -1) -} - -func (p *Pool) incWorkerCount() { - atomic.AddInt32(&p.workerCount, 1) -} - -func (p *Pool) decWorkerCount() { - atomic.AddInt32(&p.workerCount, -1) -} - -func (p *Pool) needIncWorker() int32 { - // at least one worker keepalive - if p.WorkerCount() == 0 { - return 1 - } - - if concurrency, jobCount := atomic.LoadInt32(&p.concurrency), atomic.LoadInt32(&p.jobCount); (concurrency == 0 || p.WorkerCount() < concurrency) && jobCount >= p.cap { - // calculate the number of workers to be added - return jobCount/p.cap - p.WorkerCount() + if p.incJobCountFunc != nil { + p.incJobCountFunc() } - return 0 -} -func (p *Pool) loadWorker() { - v := p.needIncWorker() - if v == 0 { - return + select { + case p.ch <- j: + p.counter.IncAsyncJob() + default: + p.counter.IncSyncJob() + j.fn() } - - for i := 0; i < int(v); i++ { - p.wg.Add(1) - p.incWorkerCount() - - go func() { - timer := time.NewTimer(p.timeout) - defer func() { - p.decWorkerCount() - timer.Stop() - p.wg.Done() - }() - for { - select { - case j := <-p.ch: - p.counter.IncAsyncJob() - timer.Reset(p.timeout) - j.fn() - p.decJobCount() - case <-timer.C: - if p.WorkerCount() > 1 && atomic.LoadInt32(&p.jobCount)/p.cap-p.WorkerCount() < 0 { - return - } - timer.Reset(p.timeout) - } - } - - }() - } - - return } diff --git a/control/pool/pool_test.go b/control/pool/pool_test.go index eb72022f..f4aa6159 100644 --- a/control/pool/pool_test.go +++ b/control/pool/pool_test.go @@ -2,6 +2,7 @@ package pool import ( "context" + "fmt" "testing" "time" @@ -10,76 +11,29 @@ import ( func TestPool_Go(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - f, _, _ := providePoolFactory()(factoryIn{ + f, cancel, _ := providePoolFactory()(factoryIn{ Conf: config.MapAdapter{}, }) - go f.Factory.run(ctx) - + time.Sleep(time.Millisecond) p, _ := f.Factory.Make("default") p.Go(context.Background(), func(asyncContext context.Context) { - cancel() + fmt.Println("123") }) -} - -func TestPool_CapLimit(t *testing.T) { - t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - f, _, _ := providePoolFactory()(factoryIn{Conf: config.MapAdapter{ - "pool": map[string]any{ - "default": map[string]any{ - "cap": 1, - "timeout": "1s", - }, - }, - }}) - go f.Factory.run(ctx) - - p, _ := f.Factory.Make("default") - - ts := time.Now() - var executed = make(chan struct{}) - - // job1 - p.Go(ctx, func(asyncContext context.Context) { - time.Sleep(time.Second) - }) - if p.WorkerCount() != 1 { - t.Fatal("worker count should be 1") - } - // job2 - p.Go(ctx, func(asyncContext context.Context) { - close(executed) - }) - if p.WorkerCount() != 2 { - t.Fatal("worker count should be 2") - } - <-executed - // job channel not be blocked, so the interval should be less than 1 second - if time.Since(ts) >= time.Second { - t.Fatal("timeout: sync mode should be used") - } - time.Sleep(time.Second) - if p.WorkerCount() != 1 { - t.Fatal("worker should be recycle") - } + cancel() } func TestPool_contextValue(t *testing.T) { t.Parallel() - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - f, _, _ := providePoolFactory()(factoryIn{Conf: config.MapAdapter{}}) - go f.Factory.run(ctx) + f, cancel, _ := providePoolFactory()(factoryIn{Conf: config.MapAdapter{}}) + time.Sleep(time.Millisecond) p, _ := f.Factory.Make("default") key := struct{}{} requestContext := context.WithValue(context.Background(), key, "foo") - + execute := make(chan struct{}) p.Go(requestContext, func(asyncContext context.Context) { if _, ok := asyncContext.Deadline(); ok { t.Fatalf("asyncContext shouldn't have deadline set") @@ -88,6 +42,15 @@ func TestPool_contextValue(t *testing.T) { if value != "foo" { t.Fatalf("want foo, got %s", value) } - cancel() + execute <- struct{}{} + }) + <-execute + cancel() +} + +func TestPool_Nil_Valid(t *testing.T) { + var p Pool + p.Go(context.Background(), func(asyncContext context.Context) { + }) } diff --git a/control/pool/worker.go b/control/pool/worker.go new file mode 100644 index 00000000..fc348671 --- /dev/null +++ b/control/pool/worker.go @@ -0,0 +1,110 @@ +package pool + +import ( + "context" + "sync" + "sync/atomic" + "time" +) + +type worker struct { + ch chan job + incWorkerChan chan int32 + jobCount int32 + workerCount int32 + + wg sync.WaitGroup + + cap int32 + concurrency int32 + timeout time.Duration +} + +func (w *worker) WorkerCount() int32 { + return atomic.LoadInt32(&w.workerCount) +} + +func (w *worker) incJobCount() { + atomic.AddInt32(&w.jobCount, 1) +} + +func (w *worker) decJobCount() { + atomic.AddInt32(&w.jobCount, -1) +} + +func (w *worker) incWorkerCount() { + atomic.AddInt32(&w.workerCount, 1) +} + +func (w *worker) decWorkerCount() { + atomic.AddInt32(&w.workerCount, -1) +} + +func (w *worker) run(ctx context.Context) { + go w.incWorker(ctx) + go w.runWorker(ctx) +} + +func (w *worker) incWorker(ctx context.Context) { + w.incWorkerChan <- 1 + timer := time.NewTimer(10 * time.Second) + go func() { + for { + select { + case <-timer.C: + timer.Reset(10 * time.Second) + if w.WorkerCount() == 0 { + w.incWorkerChan <- 1 + } + + if concurrency, jobCount := atomic.LoadInt32(&w.concurrency), atomic.LoadInt32(&w.jobCount); (concurrency == 0 || w.WorkerCount() < concurrency) && jobCount >= w.cap { + // calculate the number of workers to be added + w.incWorkerChan <- jobCount/w.cap - w.WorkerCount() + } + case <-ctx.Done(): + timer.Stop() + } + } + }() +} + +func (w *worker) runWorker(ctx context.Context) { + for { + select { + case v := <-w.incWorkerChan: + for i := 0; i < int(v); i++ { + w.wg.Add(1) + w.incWorkerCount() + + go func() { + timer := time.NewTimer(w.timeout) + defer func() { + w.decWorkerCount() + timer.Stop() + w.wg.Done() + }() + for { + select { + case j := <-w.ch: + timer.Reset(w.timeout) + j.fn() + w.decJobCount() + case <-timer.C: + if w.WorkerCount() > 1 && atomic.LoadInt32(&w.jobCount)/w.cap-w.WorkerCount() < 0 { + return + } + timer.Reset(w.timeout) + case <-ctx.Done(): + return + } + } + + }() + } + case <-ctx.Done(): + w.wg.Wait() + return + } + } + +} From 4bddcbdf4c703796db76ca5b73abc1ccba221e0e Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 10 Aug 2022 09:17:07 +0800 Subject: [PATCH 3/4] fix: counter with name --- control/pool/dependency.go | 4 ++++ control/pool/pool_test.go | 2 -- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/control/pool/dependency.go b/control/pool/dependency.go index 5f0534f4..4bf10b2f 100644 --- a/control/pool/dependency.go +++ b/control/pool/dependency.go @@ -28,6 +28,7 @@ type factoryIn struct { Conf contract.ConfigUnmarshaler OnReloadEvent lifecycle.ConfigReload `optional:"true"` + Counter *Counter `optional:"true"` } type poolConfig struct { @@ -58,6 +59,9 @@ func providePoolFactory() func(p factoryIn) (out, func(), error) { ch: worker.ch, incJobCountFunc: worker.incJobCount, } + if factoryIn.Counter != nil { + pool.counter = factoryIn.Counter.PoolName(name) + } return di.Pair[*Pool]{ Conn: pool, diff --git a/control/pool/pool_test.go b/control/pool/pool_test.go index f4aa6159..9edbc36e 100644 --- a/control/pool/pool_test.go +++ b/control/pool/pool_test.go @@ -2,7 +2,6 @@ package pool import ( "context" - "fmt" "testing" "time" @@ -18,7 +17,6 @@ func TestPool_Go(t *testing.T) { time.Sleep(time.Millisecond) p, _ := f.Factory.Make("default") p.Go(context.Background(), func(asyncContext context.Context) { - fmt.Println("123") }) cancel() From 71b49520f7382e0e3a7e76f42952c59e98b7a703 Mon Sep 17 00:00:00 2001 From: Trock Date: Wed, 10 Aug 2022 09:46:39 +0800 Subject: [PATCH 4/4] docs: add comment --- control/pool/dependency.go | 16 +++++++++++----- control/pool/factory.go | 1 + 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/control/pool/dependency.go b/control/pool/dependency.go index 4bf10b2f..3ad5c761 100644 --- a/control/pool/dependency.go +++ b/control/pool/dependency.go @@ -32,9 +32,15 @@ type factoryIn struct { } type poolConfig struct { - Cap int32 `yaml:"cap" json:"cap"` - Concurrency int32 `yaml:"concurrency" json:"concurrency"` - Timeout config.Duration `yaml:"timeout" json:"timeout"` + // Cap is the maximum number of tasks. If it exceeds the maximum number, new workers will be added automatically. + // Default is 10. + Cap int32 `yaml:"cap" json:"cap"` + // Concurrency limits the maximum number of workers. + // Default is 1000. + Concurrency int32 `yaml:"concurrency" json:"concurrency"` + // IdleTimeout idle workers will be recycled after this duration. + // Default is 10 minutes. + IdleTimeout config.Duration `yaml:"idle_timeout" json:"idle_timeout"` } // out @@ -78,8 +84,8 @@ func providePoolFactory() func(p factoryIn) (out, func(), error) { if conf.Concurrency > 0 { worker.concurrency = conf.Concurrency } - if !conf.Timeout.IsZero() { - worker.timeout = conf.Timeout.Duration + if !conf.IdleTimeout.IsZero() { + worker.timeout = conf.IdleTimeout.Duration } worker.run(ctx) return out{ diff --git a/control/pool/factory.go b/control/pool/factory.go index d63e33b7..e065e693 100644 --- a/control/pool/factory.go +++ b/control/pool/factory.go @@ -9,4 +9,5 @@ type Maker interface { Make(name string) (*Pool, error) } +// Factory is the *di.Factory that creates *Pool. type Factory = di.Factory[*Pool]