From 45b6bfa1a00cab2e83cd7a73269ec28a3fc6ecc8 Mon Sep 17 00:00:00 2001 From: Ben Reinhart Date: Tue, 22 Oct 2024 12:40:13 -0700 Subject: [PATCH] Make WebSocket subscriptions receive ack to be considered subscribed (#409) --- packages/api/server/channels/app.mts | 3 + packages/api/server/ws-client.mts | 1 + packages/web/src/clients/websocket/channel.ts | 65 ++++++++++++++++++- packages/web/src/components/apps/use-app.tsx | 19 +++--- 4 files changed, 75 insertions(+), 13 deletions(-) diff --git a/packages/api/server/channels/app.mts b/packages/api/server/channels/app.mts index a1075a81..b2cd6465 100644 --- a/packages/api/server/channels/app.mts +++ b/packages/api/server/channels/app.mts @@ -138,6 +138,9 @@ async function previewStop( return; } + // Killing the process should result in its onExit handler being called. + // The onExit handler will remove the process from the processMetadata map + // and send the `preview:status` event with a value of 'stopped' result.process.kill('SIGTERM'); } diff --git a/packages/api/server/ws-client.mts b/packages/api/server/ws-client.mts index 3f1f14ac..048da6ca 100644 --- a/packages/api/server/ws-client.mts +++ b/packages/api/server/ws-client.mts @@ -213,6 +213,7 @@ export default class WebSocketServer { if (event === 'subscribe') { conn.subscriptions.push(topic); + conn.reply(topic, 'subscribed', { id: payload.id }); channel.onJoinCallback(topic, conn.socket); return; } diff --git a/packages/web/src/clients/websocket/channel.ts b/packages/web/src/clients/websocket/channel.ts index 10cb92ad..2a504155 100644 --- a/packages/web/src/clients/websocket/channel.ts +++ b/packages/web/src/clients/websocket/channel.ts @@ -2,6 +2,7 @@ import z from 'zod'; import WebSocketClient from '@/clients/websocket/client'; +import { randomid } from '@srcbook/shared'; export default class Channel< I extends Record>, @@ -14,19 +15,45 @@ export default class Channel< outgoing: O; }; + private subscribed: boolean; + + private subscriptionId: string | null; + + private readonly queue: Array< + { + [K in keyof O & string]: { + event: K; + payload: z.TypeOf; + }; + }[keyof O & string] + >; + private readonly client: WebSocketClient; private readonly callbacks: Record) => void>>; - private readonly receive: (event: string, payload: Record) => void; + private readonly receive: (event: K, payload: z.TypeOf) => void; constructor(client: WebSocketClient, topic: string, events: { incoming: I; outgoing: O }) { this.topic = topic; + this.queue = []; this.client = client; this.events = events; this.callbacks = {}; + this.subscribed = false; + this.subscriptionId = null; + + this.receive = (event: K, payload: z.TypeOf) => { + if (event === 'subscribed') { + this.receiveSubscribedEvent(payload); + return; + } + + // Ignore events until we are subscribed. + if (!this.subscribed) { + return; + } - this.receive = (event: string, payload: Record) => { const schema = this.events.incoming[event]; if (schema === undefined) { @@ -48,11 +75,18 @@ export default class Channel< } subscribe() { - this.client.push(this.topic, 'subscribe', {}); + if (this.subscribed) { + return; + } + + this.subscriptionId = randomid(); + this.client.push(this.topic, 'subscribe', { id: this.subscriptionId }); this.client.on(this.topic, this.receive); } unsubscribe() { + this.subscribed = false; + this.subscriptionId = null; this.client.push(this.topic, 'unsubscribe', {}); this.client.off(this.topic, this.receive); } @@ -73,6 +107,12 @@ export default class Channel< } push(event: K, payload: z.TypeOf): void { + // Queue outgoing events until we are subscribed. + if (!this.subscribed) { + this.queue.push({ event, payload }); + return; + } + const schema = this.events.outgoing[event]; if (schema === undefined) { @@ -81,4 +121,23 @@ export default class Channel< this.client.push(this.topic, event, schema.parse(payload)); } + + private receiveSubscribedEvent(payload: { id: string }) { + // This shouldn't normally happen, but could if multiple channels for the + // same topic were used or if duplicate events sent (cough cough, useEffect). + if (this.subscriptionId !== payload.id) { + return; + } + + this.subscribed = true; + this.subscriptionId = null; + this.flush(); + } + + private flush() { + while (this.queue.length > 0) { + const message = this.queue.shift()!; + this.push(message.event, message.payload); + } + } } diff --git a/packages/web/src/components/apps/use-app.tsx b/packages/web/src/components/apps/use-app.tsx index e89a9f42..a7f2f420 100644 --- a/packages/web/src/components/apps/use-app.tsx +++ b/packages/web/src/components/apps/use-app.tsx @@ -21,19 +21,18 @@ export function AppProvider({ app: initialApp, children }: ProviderPropsType) { const channelRef = useRef(AppChannel.create(app.id)); - // This is only meant to be run one time, when the component mounts. useEffect(() => { - channelRef.current.subscribe(); - return () => channelRef.current.unsubscribe(); - }, []); - - useEffect(() => { - if (app.id === channelRef.current.appId) { - return; + // If the app ID has changed, create a new channel for the new app. + if (channelRef.current.appId !== app.id) { + channelRef.current.unsubscribe(); + channelRef.current = AppChannel.create(app.id); } - channelRef.current.unsubscribe(); - channelRef.current = AppChannel.create(app.id); + // Subscribe to the channel + channelRef.current.subscribe(); + + // Unsubscribe when the component is unmounted + return () => channelRef.current.unsubscribe(); }, [app.id]); async function updateApp(attrs: { name: string }) {