From 5a6cd6a10fdc43abfa7c8be7adbe97b0ccccc2e2 Mon Sep 17 00:00:00 2001 From: Kevin Aleman Date: Tue, 7 Jan 2025 15:18:12 -0600 Subject: [PATCH 1/2] disconnect context --- .../omnichannel-services/src/QueueWorker.ts | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) diff --git a/ee/packages/omnichannel-services/src/QueueWorker.ts b/ee/packages/omnichannel-services/src/QueueWorker.ts index 5908512404a2..5950bbd128ea 100644 --- a/ee/packages/omnichannel-services/src/QueueWorker.ts +++ b/ee/packages/omnichannel-services/src/QueueWorker.ts @@ -11,14 +11,12 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { protected retryCount = 5; // Default delay is 5 seconds - protected retryDelay = 5000; + protected retryDelay = Number(process.env.RETRY_DELAY) || 5000; protected queue: MessageQueue; private logger: Logger; - private queueStarted = false; - constructor( private readonly db: Db, loggerClass: typeof Logger, @@ -28,7 +26,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { // eslint-disable-next-line new-cap this.logger = new loggerClass('QueueWorker'); this.queue = new MessageQueue(); - this.queue.pollingInterval = 5000; + this.queue.pollingInterval = Number(process.env.POLLING_INTERVAL) || 5000; } isServiceNotFoundMessage(message: string): boolean { @@ -46,6 +44,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { try { await this.createIndexes(); + this.registerWorkers(); } catch (e) { this.logger.fatal(e, 'Fatal error occurred when registering workers'); process.exit(1); @@ -55,7 +54,7 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { async createIndexes(): Promise { this.logger.info('Creating indexes for queue worker'); - // Library doesnt create indexes by itself, for some reason + // Library doesn't create indexes by itself, for some reason // This should create the indexes we need and improve queue perf on reading await this.db.collection(this.queue.collectionName).createIndex({ type: 1 }); await this.db.collection(this.queue.collectionName).createIndex({ rejectedTime: 1 }, { sparse: true }); @@ -105,8 +104,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { this.logger.info('Registering workers of type "workComplete"'); this.queue.registerWorker('workComplete', this.workerCallback.bind(this)); - - this.queueStarted = true; } private matchServiceCall(service: string): boolean { @@ -123,10 +120,6 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { // This is a "generic" job that allows you to call any service async queueWork>(queue: Actions, to: string, data: T): Promise { this.logger.info(`Queueing work for ${to}`); - if (!this.queueStarted) { - this.registerWorkers(); - } - if (!this.matchServiceCall(to)) { // We don't want to queue calls to invalid service names throw new Error(`Invalid service name ${to}`); @@ -150,8 +143,4 @@ export class QueueWorker extends ServiceClass implements IQueueWorkerService { ]) .toArray(); } - - async isQueueStarted(): Promise { - return this.queueStarted; - } } From 47ca64e0b45aedcb7d9363f4933f52e0897454b5 Mon Sep 17 00:00:00 2001 From: Kevin Aleman Date: Tue, 7 Jan 2025 15:22:40 -0600 Subject: [PATCH 2/2] Create fifty-apricots-clean.md --- .changeset/fifty-apricots-clean.md | 9 +++++++++ 1 file changed, 9 insertions(+) create mode 100644 .changeset/fifty-apricots-clean.md diff --git a/.changeset/fifty-apricots-clean.md b/.changeset/fifty-apricots-clean.md new file mode 100644 index 000000000000..e8f867f1ffff --- /dev/null +++ b/.changeset/fifty-apricots-clean.md @@ -0,0 +1,9 @@ +--- +"@rocket.chat/omnichannel-services": patch +--- + +Fixes a behavior when running microservices that caused queue worker to process just the first 60 seconds of request. + +This was due to a mistakenly bound context. Queue Worker was changed to start doing work only after it received the first request. + +However, with the introduction of ASL and actual context on calls, the worker registration was absorbing the context of the call that created them, causing service calls happening inside the callbacks to fail because of a timeout.