-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathworker_pool.go
89 lines (68 loc) · 2.3 KB
/
worker_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
package fangtooth
import (
"os"
"os/signal"
"github.com/gocraft/work"
"github.com/gomodule/redigo/redis"
)
// JobHandlerMap is a map of job name and its handler function.
type JobHandlerMap map[string]interface{}
// PeriodicMap is a map of cron tab value and job name to run in.
type PeriodicMap map[string]string
// Configurator is a function that accept worker pool as parameter and change
// its configurable attributes.
type Configurator func(*WorkerPool)
// WorkerPool is responsible for fetching jobs out from redis pool and
// dispatching them to workers.
type WorkerPool struct {
ConcurrentProcess uint
namespace string
redisPool *redis.Pool
pool *work.WorkerPool
}
// Middleware will add given middleware to process pipeline.
func (p *WorkerPool) Middleware(m ...interface{}) *WorkerPool {
for _, middleware := range m {
p.pool.Middleware(middleware)
}
return p
}
// Periodic will periodically enqueue given jobs based on it cron tabs.
func (p *WorkerPool) Periodic(jobMap PeriodicMap) *WorkerPool {
for jobName, cronTab := range jobMap {
p.pool.PeriodicallyEnqueue(cronTab, jobName)
}
return p
}
// Listen will listens for incoming job with specified job name and handle it
// with its handler function.
func (p *WorkerPool) Listen(jobMap JobHandlerMap) *WorkerPool {
for jobName, handler := range jobMap {
p.pool.Job(jobName, handler)
}
return p
}
// Run starts worker pool and starts processing jobs.
func (p *WorkerPool) Run() {
p.pool.Start()
// Wait for a signal to quit:
signalChan := make(chan os.Signal, 1)
signal.Notify(signalChan, os.Interrupt, os.Kill)
<-signalChan
p.pool.Stop()
}
// NewWorkerPool construct new instance worker pool.
func NewWorkerPool(worker WorkerInterface, namespace string, redisPool *redis.Pool, configurators ...Configurator) *WorkerPool {
wp := &WorkerPool{namespace: namespace, redisPool: redisPool}
// run configurators to set custom value for model attributes
for _, configure := range configurators {
configure(wp)
}
// check for concurrent process number setting: set it to default if not exist.
if wp.ConcurrentProcess == 0 {
wp.ConcurrentProcess = 1
}
wp.pool = work.NewWorkerPool(worker.Self(), wp.ConcurrentProcess, wp.namespace, wp.redisPool)
wp.Middleware(worker.Log, worker.CaptureError)
return wp
}