-
Notifications
You must be signed in to change notification settings - Fork 53
/
Copy pathmonitor.go
79 lines (67 loc) · 2.1 KB
/
monitor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package delayqueue
import (
"log"
"github.com/redis/go-redis/v9"
)
// Monitor can get running status and events of DelayQueue
type Monitor struct {
inner *DelayQueue
}
// NewMonitor0 creates a new Monitor by a RedisCli instance
func NewMonitor0(name string, cli RedisCli, opts ...interface{}) *Monitor {
return &Monitor{
inner: NewQueue0(name, cli, opts...),
}
}
// NewMonitor creates a new Monitor by a *redis.Client
func NewMonitor(name string, cli *redis.Client, opts ...interface{}) *Monitor {
rc := &redisV9Wrapper{
inner: cli,
}
return NewMonitor0(name, rc, opts...)
}
// NewMonitor creates a new Monitor by a *redis.ClusterClient
func NewMonitorOnCluster(name string, cli *redis.ClusterClient, opts ...interface{}) *Monitor {
rc := &redisClusterWrapper{
inner: cli,
}
return NewMonitor0(name, rc, opts...)
}
// WithLogger customizes logger for queue
func (m *Monitor) WithLogger(logger *log.Logger) *Monitor {
m.inner.logger = logger
return m
}
// GetPendingCount returns the number of messages which delivery time has not arrived
func (m *Monitor) GetPendingCount() (int64, error) {
return m.inner.GetPendingCount()
}
// GetReadyCount returns the number of messages which have arrived delivery time but but have not been delivered yet
func (m *Monitor) GetReadyCount() (int64, error) {
return m.inner.GetReadyCount()
}
// GetProcessingCount returns the number of messages which are being processed
func (m *Monitor) GetProcessingCount() (int64, error) {
return m.inner.GetProcessingCount()
}
// ListenEvent register a listener which will be called when events occured in this queue
// so it can be used to monitor running status
// returns: close function, error
func (m *Monitor) ListenEvent(listener EventListener) (func(), error) {
reportChan := genReportChannel(m.inner.name)
sub, closer, err := m.inner.redisCli.Subscribe(reportChan)
if err != nil {
return nil, err
}
go func() {
for payload := range sub {
event, err := decodeEvent(payload)
if err != nil {
m.inner.logger.Printf("[listen event] %v\n", event)
} else {
listener.OnEvent(event)
}
}
}()
return closer, nil
}