-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathmessagebus.go
193 lines (158 loc) · 6.63 KB
/
messagebus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
package foreman
import (
"github.com/go-foreman/foreman/log"
"github.com/go-foreman/foreman/pubsub/dispatcher"
"github.com/go-foreman/foreman/pubsub/endpoint"
"github.com/go-foreman/foreman/pubsub/message"
"github.com/go-foreman/foreman/pubsub/message/execution"
"github.com/go-foreman/foreman/pubsub/subscriber"
"github.com/go-foreman/foreman/pubsub/transport"
"github.com/go-foreman/foreman/runtime/scheme"
"github.com/pkg/errors"
)
//go:generate mockgen --build_flags=--mod=mod -destination testing/mocks/messagebus.go -package foreman . Component
// Component allow to wrap and prepare booting of your component, which will be initialized by MessageBus
type Component interface {
Init(b *MessageBus) error
}
// SubscriberOption allows to provide a few options for configuring Subscriber
type SubscriberOption func(subscriberOpts *subscriberOpts, c *subscriberContainer)
type subscriberOpts struct {
subscriber subscriber.Subscriber
opts []subscriber.Opt
transport transport.Transport
}
type subscriberContainer struct {
msgMarshaller message.Marshaller
processor subscriber.Processor
}
// WithSubscriber option allows to specify your own implementation of Subscriber for MessageBus
func WithSubscriber(subscriber subscriber.Subscriber) SubscriberOption {
return func(subscriberOpts *subscriberOpts, c *subscriberContainer) {
subscriberOpts.subscriber = subscriber
}
}
// DefaultSubscriber option allows to specify your own transport which will be used in the default subscriber
func DefaultSubscriber(transport transport.Transport, opts ...subscriber.Opt) SubscriberOption {
return func(subscriberOpts *subscriberOpts, c *subscriberContainer) {
subscriberOpts.transport = transport
subscriberOpts.opts = opts
}
}
// SubscriberFactory is a function which gives you an access to default Processor and message.Decoder, these are needed when you implement own Subscriber
type SubscriberFactory func(processor subscriber.Processor, marshaller message.Marshaller) subscriber.Subscriber
// WithSubscriberFactory provides a way to construct your own Subscriber and pass it along to MessageBus
func WithSubscriberFactory(factory SubscriberFactory) SubscriberOption {
return func(subscriberOpts *subscriberOpts, c *subscriberContainer) {
subscriberOpts.subscriber = factory(c.processor, c.msgMarshaller)
}
}
// ConfigOption allows to configure MessageBus's container
type ConfigOption func(o *container)
type container struct {
messageExuctionCtxFactory execution.MessageExecutionCtxFactory
messagesDispatcher dispatcher.Dispatcher
router endpoint.Router
msgMarshaller message.Marshaller
processor subscriber.Processor
components []Component
}
// WithComponents specifies a list of additional components you want to be registered in MessageBus
func WithComponents(components ...Component) ConfigOption {
return func(c *container) {
c.components = append(c.components, components...)
}
}
// WithRouter allows to provide another endpoint.Router implementation
func WithRouter(router endpoint.Router) ConfigOption {
return func(c *container) {
c.router = router
}
}
// WithDispatcher allows to provide another dispatcher.Dispatcher implementation
func WithDispatcher(dispatcher dispatcher.Dispatcher) ConfigOption {
return func(c *container) {
c.messagesDispatcher = dispatcher
}
}
// WithMessageExecutionFactory allows to provide own execution.MessageExecutionCtxFactory
func WithMessageExecutionFactory(factory execution.MessageExecutionCtxFactory) ConfigOption {
return func(c *container) {
c.messageExuctionCtxFactory = factory
}
}
// MessageBus is a main component, kind of a container which aggregates other components
type MessageBus struct {
marshaller message.Marshaller
messagesDispatcher dispatcher.Dispatcher
router endpoint.Router
scheme scheme.KnownTypesRegistry
subscriber subscriber.Subscriber
logger log.Logger
}
// NewMessageBus constructs MessageBus, allows to specify logger, choose subscriber or use default with transport and other options which configure implementations of other important parts
func NewMessageBus(logger log.Logger, msgMarshaller message.Marshaller, scheme scheme.KnownTypesRegistry, subscriberOption SubscriberOption, configOpts ...ConfigOption) (*MessageBus, error) {
mBus := &MessageBus{logger: logger, marshaller: msgMarshaller, scheme: scheme}
container := &container{
msgMarshaller: msgMarshaller,
}
for _, config := range configOpts {
config(container)
}
if container.messagesDispatcher == nil {
container.messagesDispatcher = dispatcher.NewDispatcher()
}
if container.router == nil {
container.router = endpoint.NewRouter()
}
if container.messageExuctionCtxFactory == nil {
container.messageExuctionCtxFactory = execution.NewMessageExecutionCtxFactory(container.router, logger)
}
if container.processor == nil {
container.processor = subscriber.NewMessageProcessor(msgMarshaller, container.messageExuctionCtxFactory, container.messagesDispatcher, logger)
}
mBus.messagesDispatcher = container.messagesDispatcher
mBus.router = container.router
mBus.scheme = scheme
subscriberCreationOpts := &subscriberOpts{}
subscriberOption(subscriberCreationOpts, &subscriberContainer{
msgMarshaller: msgMarshaller,
processor: container.processor,
})
if subscriberCreationOpts.subscriber != nil {
mBus.subscriber = subscriberCreationOpts.subscriber
} else if subscriberCreationOpts.transport != nil {
mBus.subscriber = subscriber.NewSubscriber(subscriberCreationOpts.transport, container.processor, logger, subscriberCreationOpts.opts...)
} else {
panic(errors.New("subscriber is nil"))
}
for _, component := range container.components {
if err := component.Init(mBus); err != nil {
return nil, err
}
}
return mBus, nil
}
// Dispatcher returns an instance of dispatcher.Dispatcher
func (b *MessageBus) Dispatcher() dispatcher.Dispatcher {
return b.messagesDispatcher
}
// Router returns an instance of endpoint.Router
func (b *MessageBus) Router() endpoint.Router {
return b.router
}
// SchemeRegistry returns an instance of current scheme.KnownTypesRegistry which should contain all the types of commands and events MB works with
func (b *MessageBus) SchemeRegistry() scheme.KnownTypesRegistry {
return b.scheme
}
// Subscriber returns an instance of subscriber.Subscriber which controls the main flow of messages
func (b *MessageBus) Subscriber() subscriber.Subscriber {
return b.subscriber
}
// Logger returns an instance of logger
func (b *MessageBus) Logger() log.Logger {
return b.logger
}
func (b *MessageBus) Marshaller() message.Marshaller {
return b.marshaller
}