From 45fdc224ddd52946e56c3cb84e473aa2bce12f7c Mon Sep 17 00:00:00 2001 From: Tommy TIAN Date: Fri, 2 Sep 2022 14:17:33 +0800 Subject: [PATCH 1/2] Add async methods and NewPool invalid number handling. Signed-off-by: txaty --- pool.go | 52 ++++++++++++++++++++++++++++++++++++++-------------- pool_test.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 14 deletions(-) diff --git a/pool.go b/pool.go index 04de7cf..dddc426 100644 --- a/pool.go +++ b/pool.go @@ -1,56 +1,80 @@ package gool +import ( + "runtime" +) + // Pool implements a simple goroutine pool type Pool[A, R any] struct { numWorkers int - jobChan chan Task[A, R] + taskChan chan Task[A, R] } // NewPool creates a new goroutine pool with the given number of workers and job queue capacity. +// If numWorkers is less than 1, it will be set to the number of CPUs. +// If cap (task queue capacity) is less than 1, it will be set to twice the number of workers. func NewPool[A, R any](numWorkers, cap int) *Pool[A, R] { + if numWorkers <= 0 { + numWorkers = runtime.NumCPU() + } + if cap <= 0 { + cap = 2 * numWorkers + } p := &Pool[A, R]{ numWorkers: numWorkers, - jobChan: make(chan Task[A, R], cap), + taskChan: make(chan Task[A, R], cap), } for i := 0; i < numWorkers; i++ { - newWorker(p.jobChan) + newWorker(p.taskChan) } return p } // Submit submits a task and waits for the result func (p *Pool[A, R]) Submit(handler func(A) R, args A) R { - result := make(chan R) - p.jobChan <- Task[A, R]{ + result := p.AsyncSubmit(handler, args) + return <-result +} + +// AsyncSubmit submits a task and returns the channel to wait for the result +func (p *Pool[A, R]) AsyncSubmit(handler func(A) R, args A) chan R { + resChan := make(chan R) + p.taskChan <- Task[A, R]{ handler: handler, args: args, - result: result, + result: resChan, } - return <-result + return resChan } // Map submits a batch of tasks and waits for the results func (p *Pool[A, R]) Map(handler func(A) R, args []A) []R { + resultChanList := p.AsyncMap(handler, args) + results := make([]R, len(args)) + for i := 0; i < len(args); i++ { + results[i] = <-resultChanList[i] + } + return results +} + +// AsyncMap submits a batch of tasks and returns the channel to wait for the results +func (p *Pool[A, R]) AsyncMap(handler func(A) R, args []A) []chan R { resultChanList := make([]chan R, len(args)) for i := 0; i < len(args); i++ { resultChanList[i] = make(chan R) - p.jobChan <- Task[A, R]{ + p.taskChan <- Task[A, R]{ handler: handler, args: args[i], result: resultChanList[i], } } - results := make([]R, len(args)) - for i := 0; i < len(args); i++ { - results[i] = <-resultChanList[i] - } - return results + return resultChanList } // Close closes the pool and waits for all the workers to stop func (p *Pool[A, R]) Close() { for i := 0; i < p.numWorkers; i++ { - p.jobChan <- Task[A, R]{ + p.taskChan <- Task[A, R]{ stop: true, } } diff --git a/pool_test.go b/pool_test.go index fdf3d3c..557e388 100644 --- a/pool_test.go +++ b/pool_test.go @@ -62,6 +62,34 @@ func TestPool_Submit(t *testing.T) { } } +func TestPool_AsyncSubmit(t *testing.T) { + type args struct { + handler func(interface{}) interface{} + args interface{} + } + tests := []struct { + name string + args args + }{ + { + name: "test", + args: args{ + handler: func(arg interface{}) interface{} { + for k := 0; k < 100; k++ { + _ = k + } + return nil + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := NewPool[interface{}, interface{}](10, 100) + p.AsyncSubmit(tt.args.handler, tt.args.args) + }) + } +} func TestPool_Map(t *testing.T) { type args struct { handler func(interface{}) interface{} From cffe764f3fc71dbba08f03f17e60e7a523a599fe Mon Sep 17 00:00:00 2001 From: Tommy TIAN Date: Fri, 2 Sep 2022 14:25:26 +0800 Subject: [PATCH 2/2] Update README. Signed-off-by: txaty --- README.md | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.md b/README.md index cd86645..05b8caa 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,12 @@ # Gool A generic goroutine pool just like Python ThreadPoolExecutor. + +Gool provides the following methods: + +- ```Submit```: Submit a task and return the result (if any). +- ```AsyncSubmit```: Submit a task and return a future of the result (if any), the future is actually the result + channel. +- ```Map```: Submit a bundle of tasks and return the results in order (if any). +- ```AsyncMap```: Submit a bundle of tasks and return the futures of the results (if any), the futures are the result + channels.