-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathnats_mock.go
95 lines (77 loc) · 2.28 KB
/
nats_mock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
/* This Source Code Form is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */
package akira
import (
"time"
"github.com/nats-io/go-nats"
"github.com/nats-io/nuid"
)
// FakeConnector : A fake nats connector for testing nats handlers
type FakeConnector struct {
Events map[string][]*nats.Msg
Handlers map[string]nats.MsgHandler
gen *nuid.NUID
}
// NewFakeConnector : Returns a new fake connector
func NewFakeConnector() Connector {
return &FakeConnector{
Events: make(map[string][]*nats.Msg),
Handlers: make(map[string]nats.MsgHandler),
gen: nuid.New(),
}
}
// Reset : resets all handlers and events
func (f *FakeConnector) Reset() {
f.ResetEvents()
f.ResetHandlers()
}
// ResetEvents : Resets cache of collected events
func (f *FakeConnector) ResetEvents() {
f.Events = make(map[string][]*nats.Msg)
}
// ResetHandlers : Resets all handlers
func (f *FakeConnector) ResetHandlers() {
f.Handlers = make(map[string]nats.MsgHandler)
}
// Close : Resets all handlers and events
func (f *FakeConnector) Close() {
f.Reset()
}
// Request : Make a request
func (f *FakeConnector) Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error) {
msg := &nats.Msg{
Subject: subj,
Reply: "_INBOX." + f.gen.Next(),
Data: data,
}
f.Events[subj] = append(f.Events[subj], msg)
if f.Handlers[subj] == nil {
// check for wildcard subscription
if f.Handlers[">"] == nil {
return nil, nats.ErrTimeout
}
subj = ">"
}
f.Handlers[subj](msg)
if len(f.Events[msg.Reply]) < 1 {
return nil, nats.ErrTimeout
}
return f.Events[msg.Reply][0], nil
}
// Publish : Publish an event
func (f *FakeConnector) Publish(subj string, data []byte) error {
msg := &nats.Msg{Subject: subj, Data: data}
f.Events[subj] = append(f.Events[subj], msg)
return nil
}
// Subscribe : Subscribe to an event stream
func (f *FakeConnector) Subscribe(subj string, cb nats.MsgHandler) (*nats.Subscription, error) {
f.Handlers[subj] = cb
return nil, nil
}
// QueueSubscribe : Subscribe to an event stream
func (f *FakeConnector) QueueSubscribe(subj string, queue string, cb nats.MsgHandler) (*nats.Subscription, error) {
f.Handlers[subj] = cb
return nil, nil
}