-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathpool.go
268 lines (237 loc) · 6.44 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package wasps
import (
"context"
"log"
"math/rand"
"sync"
"time"
)
// Pool is a struct that manages a collection of workers, each with their own
// goroutine. The Pool can initialize, expand, compress and close the workers,
// as well as asynchronously process submitted tasks.
type Pool struct {
capacity int
taskChan chan *Job
ctor func() Worker
workerChan chan Worker
stop chan struct{ size int }
stat chan *Stats
m sync.RWMutex
// task queue and work queue
taskQueueSize int
taskQ []*Job
workerQ []Worker
}
// Capacity returns the current capacity of the pool.
func (p *Pool) Capacity() int {
return p.getCap()
}
func (p *Pool) getCap() int {
p.m.RLock()
c := p.capacity
p.m.RUnlock()
return c
}
func (p *Pool) setCap(cap int) {
p.m.Lock()
p.capacity = cap
p.m.Unlock()
}
// New creates a new Pool of workers that starts with a number of workers. You must
// provide a constructor function that creates new Worker types and when you
// change the capacity of the pool the constructor will be called to create a new Worker.
// capacity - how many workers will be created for this pool and size of the pool.
// ctor - constructor function that creates new Worker types
func New(capacity int, ctor func() Worker) *Pool {
p := &Pool{
capacity: capacity,
ctor: ctor,
workerChan: make(chan Worker, capacity),
taskChan: make(chan *Job, 1),
stop: make(chan struct{ size int }, 1),
stat: make(chan *Stats),
taskQueueSize: defaultPoolTaskQueueSize,
}
go p.startSchedule()
for i := 0; i < capacity; i++ {
p.createWork()
}
return p
}
// NewCallback creates a new Pool of workers where workers cast the Job payload
// into a func() and runs it, or returns ErrNotFunc if the cast failed.
func NewCallback(capacity int) *Pool {
return New(capacity, func() Worker {
return &callbackWorker{
job: make(chan *Job),
close: make(chan struct{}),
}
})
}
// Task payload
type payLoad interface {}
// Do will submit a task to the task queue of the goroutine pool.
func (p *Pool) Do(in DefaultWorkerPayLoad, opts ...TaskOption) {
p.SubmitWithContext(context.TODO(), in, opts...)
}
// Submit will submit a task to the task queue of the goroutine pool.
func (p *Pool) Submit(in payLoad, opts ...TaskOption) {
p.SubmitWithContext(context.TODO(), in, opts...)
}
// SubmitWithContext will submit a task to the task queue of the coroutine pool, accompanied by a context.
// Before the task is executed, if the context is canceled, the task will not be executed.
func (p *Pool) SubmitWithContext(ctx context.Context, in payLoad, opts ...TaskOption) {
p.submit(ctx, in, opts...)
}
func (p *Pool) submit(ctx context.Context, in payLoad, opts ...TaskOption) {
tskOpt := defaultTaskOption()
for _, opt := range opts {
opt.apply(tskOpt)
}
// check task queue, ovoid OOM
for stats := p.Stats(); stats.WaitingTask == p.taskQueueSize; {
log.Printf("pool task too manny, wait schedule !!!")
time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
}
p.taskChan <- &Job{
Ctx: ctx,
PayLoad: in,
Args: tskOpt.Args,
RecoverFn: tskOpt.RecoverFn}
}
// Stats contains running pool Infos.
type Stats struct {
Cap int // goroutine pool capacity
IdleWorker int // Number of work goroutines payLoad idle state
WaitingTask int // Number of tasks waiting to be processed
}
// Stats returns information during the running of the goroutine pool.
func (p *Pool) Stats() *Stats {
if p.isClosed() {
return &Stats{}
}
p.stat <- &Stats{}
return <-p.stat
}
// SetCapacity changes the capacity of the pool and the total number of workers payLoad the Pool. This can be called
// by any goroutine at any time unless the Pool has been stopped, payLoad which case
// a panic will occur.
func (p *Pool) SetCapacity(size int) {
if size == p.getCap() {
return
}
if size == 0 {
p.Release()
} else if size > p.getCap() {
for i := size - p.getCap(); i > 0; i-- {
p.createWork()
}
} else {
p.stop <- struct{ size int }{size: size}
<-p.stop
}
p.setCap(size)
}
// Release will terminate all workers and close the channel of this Pool.
func (p *Pool) Release() {
if !p.isClosed() {
return
}
p.stop <- struct{ size int }{size: -1}
<-p.stop
p.setCap(0)
close(p.stop)
}
// isClosed returns whether the coroutine pool is stopped.
func (p *Pool) isClosed() bool {
select {
case <-p.stop:
return true
default:
}
return false
}
// startSchedule will schedule worker queue and task queue.
func (p *Pool) startSchedule() {
for {
var activeChan chan *Job
var activeTask *Job
// When the task queue and work goroutine queue have data at the same time,
// take out the task and work goroutine to complete the task
if len(p.taskQ) > 0 && len(p.workerQ) > 0 {
activeChan = p.workerQ[0].JobChan()
activeTask = p.taskQ[0]
}
select {
// append task
case r := <-p.taskChan:
p.taskQ = append(p.taskQ, r)
// append idle worker
case w := <-p.workerChan:
p.workerQ = append(p.workerQ, w)
// complete the task
case activeChan <- activeTask:
p.taskQ = p.taskQ[1:]
p.workerQ = p.workerQ[1:]
// export pool status
case <-p.stat:
p.stat <- &Stats{
Cap: p.getCap(),
IdleWorker: len(p.workerQ),
WaitingTask: len(p.taskQ),
}
// reduce one or all worker
case s := <-p.stop:
var start int
var idleWorkerNum = len(p.workerQ)
if s.size == -1 {
start = 0
} else if s.size == 0 {
start = idleWorkerNum
} else {
start = idleWorkerNum - s.size
}
for ; start < idleWorkerNum; start++ {
worker := p.workerQ[0]
worker.StopChan() <- struct{}{}
<-worker.StopChan()
p.workerQ = p.workerQ[1:]
}
p.stop <- struct{ size int }{}
if s.size == -1 {
// reducing all workers means releasing the goroutine pool and should stop scheduling
return
}
}
}
}
// workerReady notify pool that worker is an idle worker.
func (p *Pool) workerReady(w Worker) {
p.workerChan <- w
}
// createWork to create a new Worker by constructor func.
func (p *Pool) createWork() {
worker := p.ctor()
go func() {
for {
p.workerReady(worker)
select {
case job := <-worker.JobChan():
func() {
// This defer function will try to catches a crash
defer func() {
if r := recover(); r != nil {
if job.RecoverFn != nil {
job.RecoverFn(r)
}
}
}()
worker.Do(job)
}()
case <-worker.StopChan():
worker.StopChan() <- struct{}{}
return
}
}
}()
}