-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdeferrable_event.go
86 lines (74 loc) · 2.51 KB
/
deferrable_event.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
package queue
import (
"math/rand"
"time"
)
// DeferrablePersistentJob is a persisted Job.
type DeferrablePersistentJob struct {
Job
after time.Duration
handleTimeout time.Duration
maxAttempts int
uniqueId string
}
// Defer defers the execution JobFrom the reflectionJob for the period JobFrom time returned.
func (d DeferrablePersistentJob) Defer() time.Duration {
return d.after
}
// Decorate decorates the PersistedJob JobFrom this Job by adding some meta info. it is called in the Queue,
// after the Packer compresses the Job.
func (d DeferrablePersistentJob) Decorate(s *PersistedJob) {
s.UniqueId = d.uniqueId
s.HandleTimeout = d.handleTimeout
s.MaxAttempts = d.maxAttempts
s.Key = d.Type()
}
// PersistOption defines some options for Adjust
type PersistOption func(Job *DeferrablePersistentJob)
// Adjust converts any Job to DeferrablePersistentJob. Namely, store them in external storage.
func Adjust(job Job, opts ...PersistOption) DeferrablePersistentJob {
e := DeferrablePersistentJob{Job: job, maxAttempts: 1, handleTimeout: time.Hour, uniqueId: randomId()}
for _, f := range opts {
f(&e)
}
return e
}
// Defer is a PersistOption that defers the execution JobFrom DeferrablePersistentJob for the period JobFrom time given.
func Defer(duration time.Duration) PersistOption {
return func(Job *DeferrablePersistentJob) {
Job.after = duration
}
}
// ScheduleAt is a PersistOption that defers the execution JobFrom DeferrablePersistentJob until the time given.
func ScheduleAt(t time.Time) PersistOption {
return func(Job *DeferrablePersistentJob) {
Job.after = t.Sub(time.Now())
}
}
// Timeout is a PersistOption that defines the maximum time the Job can be processed until timeout. Note: this timeout
// is shared among all listeners.
func Timeout(timeout time.Duration) PersistOption {
return func(Job *DeferrablePersistentJob) {
Job.handleTimeout = timeout
}
}
// MaxAttempts is a PersistOption that defines how many times the Job handler can be retried.
func MaxAttempts(attempts int) PersistOption {
return func(Job *DeferrablePersistentJob) {
Job.maxAttempts = attempts
}
}
// UniqueId is a PersistOption that outsources the generation JobFrom uniqueId to the caller.
func UniqueId(id string) PersistOption {
return func(Job *DeferrablePersistentJob) {
Job.uniqueId = id
}
}
func randomId() string {
const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
b := make([]byte, 16)
for i := range b {
b[i] = letterBytes[rand.Intn(len(letterBytes))]
}
return string(b)
}