Skip to content
This repository has been archived by the owner on Dec 31, 2024. It is now read-only.

Commit

Permalink
Remove jobs from queue if it doesnt remove after two minutes (prevent…
Browse files Browse the repository at this point in the history
… blocking queue)

related to #594
  • Loading branch information
mohammadranjbarz committed Aug 25, 2021
1 parent 73ab4fb commit 43e0ec8
Showing 1 changed file with 11 additions and 1 deletion.
12 changes: 11 additions & 1 deletion src/blockchain/lib/eventHandlerQueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const { EventStatus } = require('../../models/events.model');

const handleEventQueue = new Queue('eventHandler', { redis: config.get('redis') });
const pendingEventQueue = new Queue('NewEventQueue', { redis: config.get('redis') });
const TWO_MINUTES = 1000 * 60 * 2;

setInterval(async () => {
const eventHandlerQueueCount = await handleEventQueue.count();
Expand All @@ -18,7 +19,7 @@ setInterval(async () => {
eventHandlerQueueCount,
NewEventQueueCount,
});
}, 1000 * 60 * 2);
}, TWO_MINUTES);

const removeEvent = async (app, event) => {
const { id, transactionHash } = event;
Expand Down Expand Up @@ -155,9 +156,17 @@ const initEventHandlerQueue = app => {

handleEventQueue.process(1, async (job, done) => {
const { event } = job.data;
const callDoneTimeout = setTimeout(() => {
logger.error('The event handler didnt respond, call done() to prevent stocking queue');
done();
}, TWO_MINUTES);

try {
const remainingEventsInQueue = await handleEventQueue.count();
const handler = handlers[event.event];

logger.info('Handling Event: ', {
remainingEventsInQueue,
event: event.event,
transactionHash: event.transactionHash,
status: event.status,
Expand All @@ -181,6 +190,7 @@ const initEventHandlerQueue = app => {
processingError: e.toString(),
});
} finally {
clearTimeout(callDoneTimeout);
done();
}
});
Expand Down

0 comments on commit 43e0ec8

Please sign in to comment.