forked from FunNode/npm-rabbitmq
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
134 lines (117 loc) · 3.83 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
/* eslint-disable brace-style, camelcase, semi */
/* global R5 */
require('dotenv').config();
module.exports = Rabbitmq;
if (!global.R5) {
global.R5 = {
out: new (require(`./Output.js`))('rabbitmq')
};
}
let config = {
channel: process.env.NODE_ENV,
host: process.env.RABBITMQ_HOST,
user: process.env.RABBITMQ_USER,
pass: process.env.RABBITMQ_PASS
};
// Constructors
function Rabbitmq (configuration, type, callback) {
this.config = configuration;
this.connect(type, 5, callback);
}
// Public Methods
Rabbitmq.prototype.connect = function (type, retry_after_time = 5, callback = function () { }) {
let _this = this;
_this.amqp = require('amqplib/callback_api');
_this.amqp.connect(`amqp://${config.user}:${config.pass}@${config.host}/${config.channel}`, function (err, conn) {
if (err) { throw new Error(err.message); }
_this.conn = conn;
_this.conn.createChannel(function (err, ch) {
if (err) { throw new Error(err.message); }
retry_after_time = 5;
if (type === 'send') {
_this.ch = ch;
_this.ch.assertExchange(_this.config.exchange_name, 'topic', { durable: false });
R5.out.log(`Connected to RabbitMQ (queue: ${_this.config.queue_name})`);
return callback();
}
else if (type === 'receive') {
ch.assertExchange(_this.config.exchange_name, 'topic', { durable: false });
ch.assertQueue(_this.config.queue_name, {}, function (err, q) {
if (err) { throw new Error(err.message); }
R5.out.log(`[*] Waiting for messages from ${_this.config.message_type}. To exit press CTRL+C`);
ch.bindQueue(q.queue, _this.config.exchange_name, _this.config.message_type);
ch.consume(q.queue, function (msg) {
let obj = parse_json(msg.content.toString());
if (obj === null) {
ch.ack(msg);
return;
}
callback(obj);
ch.ack(msg);
}, { noAck: false });
});
}
else {
throw new Error(`Provided type '${type}' is invalid`);
}
});
_this.conn.on('close', function () {
R5.out.err(`[AMQP] reconnecting on close in ${retry_after_time} seconds`);
retry_after_time = retry_after_time + 5;
setTimeout(function () {
_this.connect(type, retry_after_time);
}, (retry_after_time - 5) * 1000);
});
});
}
Rabbitmq.prototype.queue_message = function (message_object, pause_time = 0, callback = function () {}) {
let message_string = get_message_from_obj(message_object);
setTimeout(function (_this) {
_this.ch.publish(_this.config.exchange_name, _this.config.queue_name, Buffer.from(message_string, 'utf8'));
R5.out.log(`SEND ${message_object.category}:${message_object.type}`);
return callback();
}, pause_time, this);
};
// Private Methods
function get_message_from_obj (obj) {
return JSON.stringify(obj);
}
function parse_json (str) {
let obj = null;
let counter = 0;
let max = 4;
let initial_string = str;
let initial_error;
while (counter < max && !obj) {
try {
obj = JSON.parse(str);
}
catch (e) {
if (counter === 0) { initial_error = e; }
counter++;
R5.out.log(`${e} Attempt #${counter} out of ${max}`);
switch (counter) {
case 1:
break;
case 2:
let re = /"/g;
str = str.replace(re, '\\"');
break;
case 3:
re = /'/g;
str = str.replace(re, '"');
break;
default:
let msg = {
text: `Error using JSON.parse on: <br>
<pre>${initial_string}</pre> <br>
This message was sent from rabbitmq.js. Error Message: <br>
<pre>${initial_error.stack}</pre>`
};
R5.out.err(`${msg.text}`);
counter = max;
}
}
}
return obj;
}