From e89d6da002cda7d983c35df35659da133436dcc0 Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Sun, 5 Jan 2025 23:21:52 -0800 Subject: [PATCH] test: add a soda worker for majordomo --- examples/majordomo/README.md | 30 +++++++++++++++++------------- examples/majordomo/index.ts | 22 +++++++++++++++++----- examples/majordomo/worker.ts | 2 +- examples/queue/index.ts | 8 ++++---- 4 files changed, 39 insertions(+), 23 deletions(-) diff --git a/examples/majordomo/README.md b/examples/majordomo/README.md index f6280f9f..4f0d8be3 100644 --- a/examples/majordomo/README.md +++ b/examples/majordomo/README.md @@ -20,11 +20,12 @@ The example will start a broker and some workers, then do some requests. The output will be similar to this: ``` - starting broker on tcp://127.0.0.1:5555 -starting worker on tcp://127.0.0.1:5555 -starting worker on tcp://127.0.0.1:5555 -starting worker on tcp://127.0.0.1:5555 +starting broker on tcp://127.0.0.1:5555 +starting worker soda on tcp://127.0.0.1:5555 +starting worker tea on tcp://127.0.0.1:5555 +starting worker coffee on tcp://127.0.0.1:5555 +starting worker tea on tcp://127.0.0.1:5555 ---------- Started ----------- requesting 'cola' from 'soda' requesting 'oolong' from 'tea' @@ -41,27 +42,30 @@ registered worker 00800041a9 for 'tea' dispatching 'tea' 00800041ab req -> 00800041a7 dispatching 'tea' 00800041ac req -> 00800041a9 dispatching 'coffee' 00800041af req -> 00800041a8 -dispatching 'tea' 00800041ac <- rep 00800041a9 -dispatching 'tea' 00800041ad req -> 00800041a9 -received 'sencha' from 'tea' -dispatching 'tea' 00800041ad <- rep 00800041a9 -dispatching 'tea' 00800041ae req -> 00800041a9 -received 'earl grey, with milk' from 'tea' dispatching 'coffee' 00800041af <- rep 00800041a8 dispatching 'coffee' 00800041b0 req -> 00800041a8 received 'cappuccino' from 'coffee' +dispatching 'tea' 00800041ab <- rep 00800041a7 +dispatching 'tea' 00800041ad req -> 00800041a7 +received 'oolong' from 'tea' dispatching 'coffee' 00800041b0 <- rep 00800041a8 dispatching 'coffee' 00800041b1 req -> 00800041a8 received 'latte, with soy milk' from 'coffee' dispatching 'coffee' 00800041b1 <- rep 00800041a8 dispatching 'coffee' 00800041b2 req -> 00800041a8 received 'espresso' from 'coffee' +registered worker 00800041b3 for 'soda' +dispatching 'soda' 00800041aa req -> 00800041b3 +dispatching 'soda' 00800041aa <- rep 00800041b3 +received 'cola' from 'soda' +dispatching 'tea' 00800041ac <- rep 00800041a9 +dispatching 'tea' 00800041ae req -> 00800041a9 +received 'sencha' from 'tea' dispatching 'tea' 00800041ae <- rep 00800041a9 received 'jasmine' from 'tea' dispatching 'coffee' 00800041b2 <- rep 00800041a8 received 'irish coffee' from 'coffee' -dispatching 'tea' 00800041ab <- rep 00800041a7 -received 'oolong' from 'tea' -timeout expired waiting for 'soda' +dispatching 'tea' 00800041ad <- rep 00800041a7 +received 'earl grey, with milk' from 'tea' ---------- Stopping ----------- ``` diff --git a/examples/majordomo/index.ts b/examples/majordomo/index.ts index 6735c1ac..fc9c1c73 100644 --- a/examples/majordomo/index.ts +++ b/examples/majordomo/index.ts @@ -9,6 +9,15 @@ async function sleep(msec: number) { }) } +class SodaWorker extends Worker { + service = "soda" + + override async process(...msgs: Buffer[]): Promise { + await sleep(Math.random() * 1000) + return msgs + } +} + class TeaWorker extends Worker { service = "tea" @@ -29,7 +38,12 @@ class CoffeeWorker extends Worker { const broker = new Broker() -const workers = [new TeaWorker(), new CoffeeWorker(), new TeaWorker()] +const workers = [ + new SodaWorker(), + new TeaWorker(), + new CoffeeWorker(), + new TeaWorker(), +] async function request( service: string, @@ -46,12 +60,12 @@ async function request( console.log(`received '${res.join(", ")}' from '${service}'`) return res } catch (err) { - console.log(`timeout expired waiting for '${service}'`) + console.log(`timeout expired waiting for '${service}'`, err) } } async function main() { - const started = Promise.all([ + const _started = Promise.all([ // start the broker broker.start(), // start the workers @@ -81,8 +95,6 @@ async function main() { // stop the workers ...workers.map(worker => worker.stop()), ]) - // await outstanding promises - await started } main().catch(err => { diff --git a/examples/majordomo/worker.ts b/examples/majordomo/worker.ts index 9d5bd818..a4eb6e48 100644 --- a/examples/majordomo/worker.ts +++ b/examples/majordomo/worker.ts @@ -13,7 +13,7 @@ export class Worker { } async start() { - console.log(`starting worker on ${this.address}`) + console.log(`starting worker ${this.service} on ${this.address}`) await this.socket.send([null, Header.Worker, Message.Ready, this.service]) for await (const [_blank1, _header, _type, client, _blank2, ...req] of this diff --git a/examples/queue/index.ts b/examples/queue/index.ts index 06fc9c53..231ecc46 100644 --- a/examples/queue/index.ts +++ b/examples/queue/index.ts @@ -4,8 +4,8 @@ import {Queue} from "./queue" async function main() { const sender = new Dealer() - await sender.bind("tcp://127.0.0.1:5555") - console.log("sender bound to port 5555") + await sender.bind("tcp://127.0.0.1:4444") + console.log("sender bound to port 4444") const queue = new Queue(sender) @@ -16,8 +16,8 @@ async function main() { ]) const receiver = new Dealer() - receiver.connect("tcp://127.0.0.1:5555") - console.log("receiver connected to port 5555") + receiver.connect("tcp://127.0.0.1:4444") + console.log("receiver connected to port 4444") for await (const [msg] of receiver) { if (msg.length === 0) {