-
Notifications
You must be signed in to change notification settings - Fork 19
/
Copy pathrabbitmq.go
328 lines (267 loc) · 8.42 KB
/
rabbitmq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
package rabbitmq
import (
"errors"
"fmt"
"os"
"os/signal"
"strings"
"syscall"
"github.com/koding/logging"
"github.com/streadway/amqp"
)
type Config struct {
Host string
Port int
Username string
Password string
Vhost string
}
func New(c *Config, log logging.Logger) *RabbitMQ {
return &RabbitMQ{
config: c,
log: log,
}
}
type RabbitMQ struct {
// The connection between client and the server
conn *amqp.Connection
// config stores the current koding configuration based on the given profile
config *Config
// logger interface
log logging.Logger
}
type Exchange struct {
// Exchange name
Name string
// Exchange type
Type string
// Durable exchanges will survive server restarts
Durable bool
// Will remain declared when there are no remaining bindings.
AutoDelete bool
// Exchanges declared as `internal` do not accept accept publishings.Internal
// exchanges are useful for when you wish to implement inter-exchange topologies
// that should not be exposed to users of the broker.
Internal bool
// When noWait is true, declare without waiting for a confirmation from the server.
NoWait bool
// amqp.Table of arguments that are specific to the server's implementation of
// the exchange can be sent for exchange types that require extra parameters.
Args amqp.Table
}
type Queue struct {
// The queue name may be empty, in which the server will generate a unique name
// which will be returned in the Name field of Queue struct.
Name string
// Check Exchange comments for durable
Durable bool
// Check Exchange comments for autodelete
AutoDelete bool
// Exclusive queues are only accessible by the connection that declares them and
// will be deleted when the connection closes. Channels on other connections
// will receive an error when attempting declare, bind, consume, purge or delete a
// queue with the same name.
Exclusive bool
// When noWait is true, the queue will assume to be declared on the server. A
// channel exception will arrive if the conditions are met for existing queues
// or attempting to modify an existing queue from a different connection.
NoWait bool
// Check Exchange comments for Args
Args amqp.Table
}
type ConsumerOptions struct {
// The consumer is identified by a string that is unique and scoped for all
// consumers on this channel.
Tag string
// When autoAck (also known as noAck) is true, the server will acknowledge
// deliveries to this consumer prior to writing the delivery to the network. When
// autoAck is true, the consumer should not call Delivery.Ack
AutoAck bool // autoAck
// Check Queue struct documentation
Exclusive bool // exclusive
// When noLocal is true, the server will not deliver publishing sent from the same
// connection to this consumer. (Do not use Publish and Consume from same channel)
NoLocal bool // noLocal
// Check Queue struct documentation
NoWait bool // noWait
// Check Exchange comments for Args
Args amqp.Table // arguments
}
type BindingOptions struct {
// Publishings messages to given Queue with matching -RoutingKey-
// Every Queue has a default binding to Default Exchange with their Qeueu name
// So you can send messages to a queue over default exchange
RoutingKey string
// Do not wait for a consumer
NoWait bool
// App specific data
Args amqp.Table
}
// Returns RMQ connection
func (r *RabbitMQ) Conn() *amqp.Connection {
return r.conn
}
// Dial dials the RMQ server
func (r *RabbitMQ) Dial() error {
// if config is nil do not continue
if r.config == nil {
return errors.New("config is nil")
}
conf := amqp.URI{
Scheme: "amqp",
Host: r.config.Host,
Port: r.config.Port,
Username: r.config.Username,
Password: r.config.Password,
Vhost: r.config.Vhost,
}.String()
var err error
// Connects opens an AMQP connection from the credentials in the URL.
r.conn, err = amqp.Dial(conf)
if err != nil {
return err
}
r.handleErrors(r.conn)
return nil
}
// Connect opens a connection to RabbitMq. This function is idempotent
//
// TODO this should not return RabbitMQ struct - cihangir,arslan config changes
func (r *RabbitMQ) Connect() (*RabbitMQ, error) {
// if we alredy connected do not re-connect
if r.conn != nil {
return r, nil
}
// r.Dial sets the conn variable
if err := r.Dial(); err != nil {
return nil, err
}
return r, nil
}
// Session is holding the current Exchange, Queue,
// Binding Consuming and Publishing settings for enclosed
// rabbitmq connection
type Session struct {
// Exchange declaration settings
Exchange Exchange
// Queue declaration settings
Queue Queue
// Binding options for current exchange to queue binding
BindingOptions BindingOptions
// Consumer options for a queue or exchange
ConsumerOptions ConsumerOptions
// Publishing options for a queue or exchange
PublishingOptions PublishingOptions
}
// NotifyClose registers a listener for close events either initiated by an error
// accompaning a connection.close method or by a normal shutdown.
// On normal shutdowns, the chan will be closed.
// To reconnect after a transport or protocol error, we should register a listener here and
// re-connect to server
// Reconnection is -not- working by now
func (r *RabbitMQ) handleErrors(conn *amqp.Connection) {
go func() {
for amqpErr := range conn.NotifyClose(make(chan *amqp.Error)) {
// if the computer sleeps then wakes longer than a heartbeat interval,
// the connection will be closed by the client.
// https://github.com/streadway/amqp/issues/82
r.log.Fatal(amqpErr.Error())
if strings.Contains(amqpErr.Error(), "NOT_FOUND") {
// do not continue
}
// CRITICAL Exception (320) Reason: "CONNECTION_FORCED - broker forced connection closure with reason 'shutdown'"
// CRITICAL Exception (501) Reason: "read tcp 127.0.0.1:5672: i/o timeout"
// CRITICAL Exception (503) Reason: "COMMAND_INVALID - unimplemented method"
if amqpErr.Code == 501 {
// reconnect
}
if amqpErr.Code == 320 {
// fmt.Println("tryin to reconnect")
// c.reconnect()
}
}
}()
go func() {
for b := range conn.NotifyBlocked(make(chan amqp.Blocking)) {
if b.Active {
r.log.Info("TCP blocked: %q", b.Reason)
} else {
r.log.Info("TCP unblocked")
}
}
}()
}
// reconnect re-connects to rabbitmq after a disconnection
func (c *Consumer) reconnect() {
err := c.Shutdown()
if err != nil {
panic(err)
}
err = c.connect()
if err != nil {
panic(err)
}
c.Consume(c.handler)
}
// Shutdown closes the RabbitMQ connection
func (r *RabbitMQ) Shutdown() error {
return shutdown(r.conn)
}
// RegisterSignalHandler watchs for interrupt signals
// and gracefully closes connection
func (r *RabbitMQ) RegisterSignalHandler() {
registerSignalHandler(r)
}
// Closer interface is for handling reconnection logic in a sane way
// Every reconnection supported struct should implement those methods
// in order to work properly
type Closer interface {
RegisterSignalHandler()
Shutdown() error
}
// shutdown is a general closer function for handling close gracefully
// Mostly here for both consumers and producers
// After a reconnection scenerio we are gonna call shutdown before connection
func shutdown(conn *amqp.Connection) error {
if err := conn.Close(); err != nil {
if amqpError, isAmqpError := err.(*amqp.Error); isAmqpError && amqpError.Code != 504 {
return fmt.Errorf("AMQP connection close error: %s", err)
}
}
return nil
}
// shutdownChannel is a general closer function for channels
func shutdownChannel(channel *amqp.Channel, tag string) error {
// This waits for a server acknowledgment which means the sockets will have
// flushed all outbound publishings prior to returning. It's important to
// block on Close to not lose any publishings.
if err := channel.Cancel(tag, true); err != nil {
if amqpError, isAmqpError := err.(*amqp.Error); isAmqpError && amqpError.Code != 504 {
return fmt.Errorf("AMQP connection close error: %s", err)
}
}
if err := channel.Close(); err != nil {
return err
}
return nil
}
// registerSignalHandler helper function for stopping consumer or producer from
// operating further
// Watchs for SIGINT, SIGTERM, SIGQUIT, SIGSTOP and closes connection
func registerSignalHandler(c Closer) {
go func() {
signals := make(chan os.Signal, 1)
signal.Notify(signals)
for {
signal := <-signals
switch signal {
case syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGSTOP:
err := c.Shutdown()
if err != nil {
panic(err)
}
os.Exit(1)
}
}
}()
}