Skip to content

Commit

Permalink
refactor: start connection on rabbitmq
Browse files Browse the repository at this point in the history
  • Loading branch information
Matheus Paice committed Apr 18, 2024
1 parent d1e1d10 commit d30518d
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 74 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "vtru-studio-websocket",
"version": "1.1.35",
"version": "1.1.36",
"description": "",
"main": "index.js",
"scripts": {
Expand Down
16 changes: 1 addition & 15 deletions src/controllers/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,7 @@ import { getConnection } from '../services';
const logger = debug('controllers');

export const controllersStart = async () => {
const rabbitmqStatus = await getConnection();
if (!rabbitmqStatus.isConnected || !rabbitmqStatus.connection) {
console.log('RabbitMQ connection failed, retrying in 10 seconds...');
setTimeout(controllersStart, 10000);
return;
}

rabbitmqStatus.connection.on('close', () => {
console.log('RabbitMQ connection closed, restarting in 10 seconds...');
setTimeout(controllersStart, 10000);
});

rabbitmqStatus.connection.on('error', (error) => {
console.error('Error occurred in RabbitMQ connection:', error);
});
await getConnection();

await preSignedURL.start();
await notify.start();
Expand Down
30 changes: 22 additions & 8 deletions src/controllers/notify/index.ts
Original file line number Diff line number Diff line change
@@ -1,32 +1,46 @@
import debug from 'debug';
import { nanoid } from 'nanoid';
import { RABBITMQ_EXCHANGE_CREATORS } from '../../constants';
import { getChannel } from '../../services/rabbitmq';
import { captureException } from '../../services/sentry';
import { io } from '../../services';
import { NotifyEnvelope } from './types';

const logger = debug('controllers:notify');

const uniqueId = nanoid();

export const start = async () => {
const channel = await getChannel();

channel?.on('close', () => {
if (!channel) {
logger('Channel not available');
process.exit(1);
}
channel.on('close', () => {
logger('Channel closed');
process.exit(1);
});
channel.on('error', (error) => {
logger('Error occurred in channel:', error);
process.exit(1);
})
});

logger('Channel controller notify started');

const notificationQueue = `${RABBITMQ_EXCHANGE_CREATORS}.notifications.${uniqueId}`;

channel?.assertExchange(RABBITMQ_EXCHANGE_CREATORS, 'topic', {
channel.assertExchange(RABBITMQ_EXCHANGE_CREATORS, 'topic', {
durable: true,
});
channel?.assertQueue(notificationQueue, { durable: false });
channel?.bindQueue(
channel.assertQueue(notificationQueue, { durable: false });
channel.bindQueue(
notificationQueue,
RABBITMQ_EXCHANGE_CREATORS,
'userNotification'
);

channel?.consume(notificationQueue, async (message) => {
channel.consume(notificationQueue, async (message) => {
if (!message) return;

try {
Expand All @@ -47,11 +61,11 @@ export const start = async () => {
}
});

channel?.ack(message);
channel.ack(message);
return;
} catch (parsingError) {
captureException(parsingError);
}
channel?.nack(message);
channel.nack(message);
});
};
28 changes: 21 additions & 7 deletions src/controllers/preSignedURL/index.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,45 @@
import debug from 'debug';
import { nanoid } from 'nanoid';
import type { AssetEnvelope } from '../types';
import { RABBITMQ_EXCHANGE_CREATORS } from '../../constants';
import { getChannel } from '../../services/rabbitmq';
import { captureException } from '../../services/sentry';
import { io } from '../../services';

const logger = debug('controllers:preSignedURL');

const uniqueId = nanoid();

// TODO: implement dead-letter queue
export const start = async () => {
const channel = await getChannel();

channel?.on('close', () => {
if (!channel) {
logger('Channel not available');
process.exit(1);
}
channel.on('close', () => {
logger('Channel closed');
process.exit(1);
});
channel.on('error', (error) => {
logger('Error occurred in channel:', error);
process.exit(1);
});

logger('Channel controller preSignedURL started');

const logQueue = `${RABBITMQ_EXCHANGE_CREATORS}.assets.${uniqueId}`;

channel?.assertExchange(RABBITMQ_EXCHANGE_CREATORS, 'topic', {
channel.assertExchange(RABBITMQ_EXCHANGE_CREATORS, 'topic', {
durable: true,
});

channel?.assertQueue(logQueue, { durable: false });
channel.assertQueue(logQueue, { durable: false });

channel?.bindQueue(logQueue, RABBITMQ_EXCHANGE_CREATORS, 'preSignedURL');
channel.bindQueue(logQueue, RABBITMQ_EXCHANGE_CREATORS, 'preSignedURL');

channel?.consume(logQueue, async (message) => {
channel.consume(logQueue, async (message) => {
console.log('message received:', message);
if (!message) return;

Expand Down Expand Up @@ -57,11 +71,11 @@ export const start = async () => {
}
});

channel?.ack(message);
channel.ack(message);
return;
} catch (parsingError) {
captureException(parsingError);
}
channel?.nack(message);
channel.nack(message);
});
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,6 @@ dayjs.extend(utc);
dayjs.extend(timezone);

const logger = debug('core:');
debug.enable('core:*,services:*,controllers:*');

logger('Starting websocket server');
75 changes: 34 additions & 41 deletions src/services/rabbitmq/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,63 +7,56 @@ const logger = debug('services:rabbitmq');

const status: {
connection: Connection | null;
isConnected: boolean;
} = {
connection: null,
isConnected: false,
};

export const getChannel = async () => {
try {
if (!status.connection) {
logger('RabbitMQ connection not established');
process.exit(1);
}

return status.connection.createChannel();
} catch (error) {
logger('Error creating channel: %O', error);
captureException(error, { tags: { scope: 'rabbitmq' } });
return null;
}
};

export const disconnect = async () => {
if (status.connection) {
const oldConnection = status.connection;
status.connection = null;
await oldConnection.close();

try {
await oldConnection.close();
} catch (error) {
// ignore
}
}
};

export const getConnection = async () => {
try {
if (!status.connection) {
status.connection = await rabbitmq.connect(RABBITMQ_URL);
status.connection.on('close', () => {
status.isConnected = false;
status.connection = null;
console.error('RabbitMQ connection closed');
process.exit(1);
});
status.connection.on('error', (error) => {
status.isConnected = false;
status.connection = null;
console.error('Error occurred in RabbitMQ connection:', error);
});
status.isConnected = true;
}
status.connection = await rabbitmq.connect(RABBITMQ_URL);
logger(`RabbitMQ connected: ${RABBITMQ_URL}`);

status.connection.on('close', () => {
logger('RabbitMQ connection closed');
process.exit(1);
});

status.connection.on('error', (error) => {
logger('Error occurred in RabbitMQ connection:', error);
process.exit(1);
});
} catch (err) {
status.isConnected = false;
status.connection = null;
captureException(err, { tags: { scope: 'rabbitmq' } });
logger('Error connecting to rabbitmq: %O', err);
captureException(err, { tags: { scope: 'rabbitmq' } });
process.exit(1);
}
return status;
};

export const getChannel = async () => {
try {
const { connection } = await getConnection();

if (connection) {
return connection.createChannel();
}

// if connection is null
process.exit(1);
} catch (error) {
captureException(error, { tags: { scope: 'rabbitmq' } });

process.exit(1);
}
return null;
};

export { Channel };

0 comments on commit d30518d

Please sign in to comment.