diff --git a/src/auditLog/feathersElasticSearch.js b/src/auditLog/feathersElasticSearch.js index a486893c..ab5ab3a1 100644 --- a/src/auditLog/feathersElasticSearch.js +++ b/src/auditLog/feathersElasticSearch.js @@ -67,6 +67,7 @@ const configureAuditLog = app => { setAuditLogToFeathersService({ app, serviceName: 'donations' }); setAuditLogToFeathersService({ app, serviceName: 'pledgeAdmins' }); setAuditLogToFeathersService({ app, serviceName: 'events' }); + setAuditLogToFeathersService({ app, serviceName: 'emails' }); }; module.exports = { diff --git a/src/blockchain/lib/eventHandlerQueue.js b/src/blockchain/lib/eventHandlerQueue.js index 03aad0e4..139b88a6 100644 --- a/src/blockchain/lib/eventHandlerQueue.js +++ b/src/blockchain/lib/eventHandlerQueue.js @@ -10,6 +10,7 @@ const { EventStatus } = require('../../models/events.model'); const handleEventQueue = new Queue('eventHandler', { redis: config.get('redis') }); const pendingEventQueue = new Queue('NewEventQueue', { redis: config.get('redis') }); +const TWO_MINUTES = 1000 * 60 * 2; setInterval(async () => { const eventHandlerQueueCount = await handleEventQueue.count(); @@ -18,7 +19,7 @@ setInterval(async () => { eventHandlerQueueCount, NewEventQueueCount, }); -}, 1000 * 60 * 2); +}, TWO_MINUTES); const removeEvent = async (app, event) => { const { id, transactionHash } = event; @@ -155,9 +156,17 @@ const initEventHandlerQueue = app => { handleEventQueue.process(1, async (job, done) => { const { event } = job.data; + const callDoneTimeout = setTimeout(() => { + logger.error('The event handler didnt respond, call done() to prevent stocking queue'); + done(); + }, TWO_MINUTES); + try { + const remainingEventsInQueue = await handleEventQueue.count(); const handler = handlers[event.event]; + logger.info('Handling Event: ', { + remainingEventsInQueue, event: event.event, transactionHash: event.transactionHash, status: event.status, @@ -165,6 +174,18 @@ const initEventHandlerQueue = app => { logIndex: event.logIndex, _id: event._id, }); + const eventInDb = await eventService.get(event._id); + if (eventInDb.status === EventStatus.PROCESSED) { + logger.info('Event is already processed, so dont need to handle again', { + event: event.event, + _id: event._id, + transactionHash: event.transactionHash, + transactionIndex: event.transactionIndex, + }); + clearTimeout(callDoneTimeout); + done(); + return; + } if (typeof handler === 'function') { await handler(event); } else { @@ -181,6 +202,7 @@ const initEventHandlerQueue = app => { processingError: e.toString(), }); } finally { + clearTimeout(callDoneTimeout); done(); } }); diff --git a/src/services/serviceCommons.js b/src/services/serviceCommons.js index f9dc2da0..0b5f29d6 100644 --- a/src/services/serviceCommons.js +++ b/src/services/serviceCommons.js @@ -1,5 +1,5 @@ const defaultFeatherMongooseOptions = { - multi: ['patch'], + multi: ['patch', 'remove'], whitelist: [ '$exists', '$and',