Skip to content

Commit

Permalink
feat: delete queues when app exit
Browse files Browse the repository at this point in the history
  • Loading branch information
Matheus Paice committed Apr 19, 2024
1 parent d30518d commit 9406ead
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 20 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "vtru-studio-websocket",
"version": "1.1.36",
"version": "1.1.37",
"description": "",
"main": "index.js",
"scripts": {
Expand Down
25 changes: 14 additions & 11 deletions src/controllers/notify/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import debug from 'debug';
import { nanoid } from 'nanoid';
import { RABBITMQ_EXCHANGE_CREATORS } from '../../constants';
import { getChannel } from '../../services/rabbitmq';
import { disconnect, getChannel } from '../../services/rabbitmq';
import { captureException } from '../../services/sentry';
import { io } from '../../services';
import { NotifyEnvelope } from './types';
Expand All @@ -28,19 +28,14 @@ export const start = async () => {

logger('Channel controller notify started');

const notificationQueue = `${RABBITMQ_EXCHANGE_CREATORS}.notifications.${uniqueId}`;

const logQueue = `${RABBITMQ_EXCHANGE_CREATORS}.notifications.${uniqueId}`;
logger('logQueue', logQueue);
channel.assertExchange(RABBITMQ_EXCHANGE_CREATORS, 'topic', {
durable: true,
});
channel.assertQueue(notificationQueue, { durable: false });
channel.bindQueue(
notificationQueue,
RABBITMQ_EXCHANGE_CREATORS,
'userNotification'
);

channel.consume(notificationQueue, async (message) => {
channel.assertQueue(logQueue, { durable: false });
channel.bindQueue(logQueue, RABBITMQ_EXCHANGE_CREATORS, 'userNotification');
channel.consume(logQueue, async (message) => {
if (!message) return;

try {
Expand Down Expand Up @@ -68,4 +63,12 @@ export const start = async () => {
}
channel.nack(message);
});

process.once('SIGINT', async () => {
logger(`Deleting queue ${logQueue}`);
await channel.deleteQueue(logQueue);

// disconnect from RabbitMQ
await disconnect();
});
};
15 changes: 10 additions & 5 deletions src/controllers/preSignedURL/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import debug from 'debug';
import { nanoid } from 'nanoid';
import type { AssetEnvelope } from '../types';
import { RABBITMQ_EXCHANGE_CREATORS } from '../../constants';
import { getChannel } from '../../services/rabbitmq';
import { disconnect, getChannel } from '../../services/rabbitmq';
import { captureException } from '../../services/sentry';
import { io } from '../../services';

Expand Down Expand Up @@ -30,15 +30,12 @@ export const start = async () => {
logger('Channel controller preSignedURL started');

const logQueue = `${RABBITMQ_EXCHANGE_CREATORS}.assets.${uniqueId}`;

logger('logQueue', logQueue);
channel.assertExchange(RABBITMQ_EXCHANGE_CREATORS, 'topic', {
durable: true,
});

channel.assertQueue(logQueue, { durable: false });

channel.bindQueue(logQueue, RABBITMQ_EXCHANGE_CREATORS, 'preSignedURL');

channel.consume(logQueue, async (message) => {
console.log('message received:', message);
if (!message) return;
Expand Down Expand Up @@ -78,4 +75,12 @@ export const start = async () => {
}
channel.nack(message);
});

process.once('SIGINT', async () => {
logger(`Deleting queue ${logQueue}`);
await channel.deleteQueue(logQueue);

// disconnect from RabbitMQ
await disconnect();
});
};
8 changes: 7 additions & 1 deletion src/services/rabbitmq/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,16 @@ export const disconnect = async () => {

try {
await oldConnection.close();
return; // exit function
} catch (error) {
// ignore
logger('Error closing RabbitMQ connection: %O', error);
captureException(error, { tags: { scope: 'rabbitmq' } });
process.exit(1);
}
}

logger('RabbitMQ connection not established');
process.exit(1);
};

export const getConnection = async () => {
Expand Down

0 comments on commit 9406ead

Please sign in to comment.