diff --git a/packages/apps-engine/deno-runtime/lib/messenger.ts b/packages/apps-engine/deno-runtime/lib/messenger.ts index 5881d408c01c..b133e46a43b2 100644 --- a/packages/apps-engine/deno-runtime/lib/messenger.ts +++ b/packages/apps-engine/deno-runtime/lib/messenger.ts @@ -1,4 +1,4 @@ -import { writeAll } from "https://deno.land/std@0.216.0/io/write_all.ts"; +import { writeAll } from 'https://deno.land/std@0.216.0/io/write_all.ts'; import * as jsonrpc from 'jsonrpc-lite'; @@ -33,26 +33,52 @@ const COMMAND_PONG = '_zPONG'; export const RPCResponseObserver = new EventTarget(); +class ProcessingLock { + private isProcessing = false; + + lock() { + const granted = this.isProcessing === false; + + if (!granted) { + return { + granted, + [Symbol.dispose]: () => {}, + }; + } + + this.isProcessing = true; + + return { + granted, + [Symbol.dispose]: () => { + this.isProcessing = false; + }, + }; + } + [Symbol.dispose]() { + this.isProcessing = false; + } +} + export const Queue = new (class Queue { private queue: Uint8Array[] = []; - private isProcessing = false; + private isProcessingLock = new ProcessingLock(); private async processQueue() { - if (this.isProcessing) { + using processing = this.isProcessingLock.lock(); + + if (!processing.granted) { return; } - this.isProcessing = true; - while (this.queue.length) { - const message = this.queue.shift(); + const [message] = this.queue; if (message) { await Transport.send(message); } + this.queue.shift(); } - - this.isProcessing = false; } public enqueue(message: jsonrpc.JsonRpc | typeof COMMAND_PONG) { @@ -63,7 +89,7 @@ export const Queue = new (class Queue { public getCurrentSize() { return this.queue.length; } -}); +})(); export const Transport = new (class Transporter { private selectedTransport: Transporter['stdoutTransport'] | Transporter['noopTransport'];