diff --git a/lib/core/queue.js b/lib/core/queue.js new file mode 100644 index 0000000000..2aeb20a869 --- /dev/null +++ b/lib/core/queue.js @@ -0,0 +1,112 @@ +// Simplified implementation of concurrent-queue +// https://www.npmjs.com/package/concurrent-queue + +export function createQueue() { + let concurrency = Number.POSITIVE_INFINITY; + let processor; + + const tasks = []; + let runningCount = 0; + + const enqueuedCallbacks = []; + const processingStartedCallbacks = []; + const processingEndedCallbacks = []; + const drainedCallbacks = []; + + /** + * We will trigger drained callbacks when: + * tasks.length === 0 and runningCount === 0 + */ + function checkDrained() { + if (tasks.length === 0 && runningCount === 0) { + for (const cb of drainedCallbacks) { + cb(); + } + } + } + + /** + * Attempt to start processing more tasks if we have + * capacity (runningCount < concurrency). + */ + function tryProcessNext() { + while (tasks.length > 0 && runningCount < concurrency) { + const item = tasks.shift(); + runningCount++; + + for (const cb of processingStartedCallbacks) { + cb({ item }); + } + + const promise = Promise.resolve(processor(item)); + + promise + .then(() => { + // Fire processingEnded callbacks + for (const cb of processingEndedCallbacks) { + cb({ item, err: undefined }); + } + }) + .catch(error => { + // Fire processingEnded callbacks with an error + for (const cb of processingEndedCallbacks) { + cb({ item, err: error }); + } + }) + .finally(() => { + runningCount--; + checkDrained(); + tryProcessNext(); + }); + } + } + + const queue = function enqueue(item) { + for (const cb of enqueuedCallbacks) { + cb({ item }); + } + tasks.push(item); + + tryProcessNext(); + }; + + queue.limit = options => { + if (options && typeof options.concurrency === 'number') { + concurrency = options.concurrency; + } + return queue; + }; + + queue.process = fn => { + processor = fn; + return queue; + }; + + queue.enqueued = callback => { + enqueuedCallbacks.push(callback); + return queue; + }; + + queue.processingStarted = callback => { + processingStartedCallbacks.push(callback); + return queue; + }; + + queue.processingEnded = callback => { + processingEndedCallbacks.push(callback); + return queue; + }; + + queue.drained = callback => { + drainedCallbacks.push(callback); + return queue; + }; + + Object.defineProperty(queue, 'isDrained', { + get() { + return tasks.length === 0 && runningCount === 0; + } + }); + + return queue; +} diff --git a/lib/core/queueHandler.js b/lib/core/queueHandler.js index 73d15fd755..4be14c57fe 100644 --- a/lib/core/queueHandler.js +++ b/lib/core/queueHandler.js @@ -1,14 +1,11 @@ -/* eslint no-console:0 */ - -import cq from 'concurrent-queue'; import { getLogger } from '@sitespeed.io/log'; - import { messageMaker } from '../support/messageMaker.js'; import { registerQueueTime, registerProcessingTime, generateStatistics } from './queueStatistics.js'; +import { createQueue } from './queue.js'; const make = messageMaker('queueHandler').make; const log = getLogger('sitespeedio.queuehandler'); @@ -103,36 +100,32 @@ export class QueueHandler { this.queues = plugins .filter(plugin => plugin.processMessage) .map(plugin => { - const concurrency = plugin.concurrency || Number.POSITIVE_INFINITY; - const queue = cq().limit({ concurrency }); + const concurrency = plugin.concurrency ?? Number.POSITIVE_INFINITY; + // Create a queue with that concurrency + const queue = createQueue().limit({ concurrency }); queue.plugin = plugin; - const messageWaitingStart = {}, - messageProcessingStart = {}; + const messageWaitingStart = {}; + const messageProcessingStart = {}; queue.enqueued(object => { - const message = object.item; + const { item: message } = object; messageWaitingStart[message.uuid] = process.hrtime(); }); queue.processingStarted(object => { - const message = object.item; - + const { item: message } = object; const waitingDuration = process.hrtime( - messageWaitingStart[message.uuid] - ), - waitingNanos = waitingDuration[0] * 1e9 + waitingDuration[1]; - + messageWaitingStart[message.uuid] + ); + const waitingNanos = waitingDuration[0] * 1e9 + waitingDuration[1]; registerQueueTime(message, queue.plugin, waitingNanos); - messageProcessingStart[message.uuid] = process.hrtime(); }); - // FIXME handle rejections (i.e. failures while processing messages) properly queue.processingEnded(object => { - const message = object.item; - const error = object.err; + const { item: message, err: error } = object; if (error) { let rejectionMessage = 'Rejected ' + @@ -140,9 +133,9 @@ export class QueueHandler { ' for plugin: ' + plugin.getName(); - if (message && message.url) - rejectionMessage += ', url: ' + message.url; - + if (message?.url) { + rejectionMessage += `, url: ${message.url}`; + } if (error.stack) { log.error(error.stack); } @@ -154,7 +147,6 @@ export class QueueHandler { ); const processingNanos = processingDuration[0] * 1e9 + processingDuration[1]; - registerProcessingTime(message, queue.plugin, processingNanos); }); @@ -221,9 +213,9 @@ export class QueueHandler { } async startProcessingQueues() { - for (let item of this.queues) { - const queue = item.queue, - plugin = item.plugin; + for (const item of this.queues) { + const { queue, plugin } = item; + // For each queue, set up the processor that handles messages queue.process(message => Promise.resolve(plugin.processMessage(message, this)) ); diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index b57edead32..6a1f4ac15c 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -20,7 +20,6 @@ "axe-core": "4.10.2", "browsertime": "24.0.0-alpha.1", "coach-core": "8.1.1", - "concurrent-queue": "7.0.2", "dayjs": "1.11.11", "fast-crc32c": "2.0.0", "fast-stats": "0.0.7", @@ -2761,14 +2760,6 @@ "node": ">=0.4.0" } }, - "node_modules/afterward": { - "version": "2.0.0", - "resolved": "https://registry.npmjs.org/afterward/-/afterward-2.0.0.tgz", - "integrity": "sha1-lmp1MdL9tBv/Z7Tqg6vZ68vvXrI=", - "dependencies": { - "define-error": "~1.0.0" - } - }, "node_modules/agent-base": { "version": "6.0.2", "resolved": "https://registry.npmjs.org/agent-base/-/agent-base-6.0.2.tgz", @@ -3480,14 +3471,6 @@ } ] }, - "node_modules/capture-stack-trace": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/capture-stack-trace/-/capture-stack-trace-1.0.0.tgz", - "integrity": "sha1-Sm+gc5nCa7pH8LJJa00PtAjFVQ0=", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/catharsis": { "version": "0.9.0", "resolved": "https://registry.npmjs.org/catharsis/-/catharsis-0.9.0.tgz", @@ -3941,36 +3924,6 @@ "node": ">=10" } }, - "node_modules/concurrent-queue": { - "version": "7.0.2", - "resolved": "https://registry.npmjs.org/concurrent-queue/-/concurrent-queue-7.0.2.tgz", - "integrity": "sha1-PPzPqLnOCMoj2FMZ9cNuwb+VQPo=", - "dependencies": { - "afterward": "~2.0.0", - "define-error": "~1.0.0", - "eventuate": "~4.0.0", - "object-assign": "~4.0.1", - "on-error": "~2.1.0", - "once": "~1.3.2", - "promise-polyfill": "~2.1.0" - } - }, - "node_modules/concurrent-queue/node_modules/object-assign": { - "version": "4.0.1", - "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-4.0.1.tgz", - "integrity": "sha1-mVBEVsNZi1ytT8WcJuipuxB/4L0=", - "engines": { - "node": ">=0.10.0" - } - }, - "node_modules/concurrent-queue/node_modules/once": { - "version": "1.3.3", - "resolved": "https://registry.npmjs.org/once/-/once-1.3.3.tgz", - "integrity": "sha1-suJhVXzkwxTsgwTz+oJmPkKXyiA=", - "dependencies": { - "wrappy": "1" - } - }, "node_modules/console-control-strings": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/console-control-strings/-/console-control-strings-1.1.0.tgz", @@ -4235,14 +4188,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/define-error": { - "version": "1.0.0", - "resolved": "https://registry.npmjs.org/define-error/-/define-error-1.0.0.tgz", - "integrity": "sha1-X7SKRd1fZfiPgrDJoiPAGTN50/4=", - "dependencies": { - "capture-stack-trace": "~1.0.0" - } - }, "node_modules/define-properties": { "version": "1.2.1", "resolved": "https://registry.npmjs.org/define-properties/-/define-properties-1.2.1.tgz", @@ -5009,24 +4954,6 @@ "node": ">=6" } }, - "node_modules/eventuate": { - "version": "4.0.0", - "resolved": "https://registry.npmjs.org/eventuate/-/eventuate-4.0.0.tgz", - "integrity": "sha1-TiaQVTFv/0EJB4FB+SZgDHS1caA=", - "dependencies": { - "define-error": "~1.0.0", - "object-assign": "~3.0.0", - "shallow-copy": "0.0.1" - } - }, - "node_modules/eventuate/node_modules/object-assign": { - "version": "3.0.0", - "resolved": "https://registry.npmjs.org/object-assign/-/object-assign-3.0.0.tgz", - "integrity": "sha1-m+3VygiXlJvKR+f/QIBi1Un1h/I=", - "engines": { - "node": ">=0.10.0" - } - }, "node_modules/execa": { "version": "9.5.2", "resolved": "https://registry.npmjs.org/execa/-/execa-9.5.2.tgz", @@ -7692,11 +7619,6 @@ "url": "https://github.com/sponsors/ljharb" } }, - "node_modules/on-error": { - "version": "2.1.0", - "resolved": "https://registry.npmjs.org/on-error/-/on-error-2.1.0.tgz", - "integrity": "sha1-usVuS0mATEkWcVDa0nWbQmERfc4=" - }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -8206,11 +8128,6 @@ "asap": "~2.0.3" } }, - "node_modules/promise-polyfill": { - "version": "2.1.4", - "resolved": "https://registry.npmjs.org/promise-polyfill/-/promise-polyfill-2.1.4.tgz", - "integrity": "sha1-cxkiNTLCasPlVefpvMDPdiAbUa0=" - }, "node_modules/proxy-from-env": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/proxy-from-env/-/proxy-from-env-1.1.0.tgz", @@ -9172,11 +9089,6 @@ "resolved": "https://registry.npmjs.org/setimmediate/-/setimmediate-1.0.5.tgz", "integrity": "sha512-MATJdZp8sLqDl/68LfQmbP8zKPLQNV6BIZoIgrscFDQ+RsvK/BxeDQOgyxKKoh0y/8h3BqVFnCqQ/gd+reiIXA==" }, - "node_modules/shallow-copy": { - "version": "0.0.1", - "resolved": "https://registry.npmjs.org/shallow-copy/-/shallow-copy-0.0.1.tgz", - "integrity": "sha1-QV9CcC1z2BAzApLMXuhurhoRoXA=" - }, "node_modules/shebang-command": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/shebang-command/-/shebang-command-2.0.0.tgz", diff --git a/package.json b/package.json index 29394a735e..e6a943ba79 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,6 @@ "axe-core": "4.10.2", "browsertime": "24.0.0-alpha.1", "coach-core": "8.1.1", - "concurrent-queue": "7.0.2", "dayjs": "1.11.11", "fast-crc32c": "2.0.0", "fast-stats": "0.0.7",