Skip to content

Commit

Permalink
Make WebSocket subscriptions receive ack to be considered subscribed (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
benjreinhart authored Oct 22, 2024
1 parent 215d87c commit 45b6bfa
Show file tree
Hide file tree
Showing 4 changed files with 75 additions and 13 deletions.
3 changes: 3 additions & 0 deletions packages/api/server/channels/app.mts
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,9 @@ async function previewStop(
return;
}

// Killing the process should result in its onExit handler being called.
// The onExit handler will remove the process from the processMetadata map
// and send the `preview:status` event with a value of 'stopped'
result.process.kill('SIGTERM');
}

Expand Down
1 change: 1 addition & 0 deletions packages/api/server/ws-client.mts
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ export default class WebSocketServer {

if (event === 'subscribe') {
conn.subscriptions.push(topic);
conn.reply(topic, 'subscribed', { id: payload.id });
channel.onJoinCallback(topic, conn.socket);
return;
}
Expand Down
65 changes: 62 additions & 3 deletions packages/web/src/clients/websocket/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import z from 'zod';
import WebSocketClient from '@/clients/websocket/client';
import { randomid } from '@srcbook/shared';

export default class Channel<
I extends Record<string, z.ZodSchema<any>>,
Expand All @@ -14,19 +15,45 @@ export default class Channel<
outgoing: O;
};

private subscribed: boolean;

private subscriptionId: string | null;

private readonly queue: Array<
{
[K in keyof O & string]: {
event: K;
payload: z.TypeOf<O[K]>;
};
}[keyof O & string]
>;

private readonly client: WebSocketClient;

private readonly callbacks: Record<string, Array<(payload: Record<string, any>) => void>>;

private readonly receive: (event: string, payload: Record<string, any>) => void;
private readonly receive: <K extends keyof I & string>(event: K, payload: z.TypeOf<I[K]>) => void;

constructor(client: WebSocketClient, topic: string, events: { incoming: I; outgoing: O }) {
this.topic = topic;
this.queue = [];
this.client = client;
this.events = events;
this.callbacks = {};
this.subscribed = false;
this.subscriptionId = null;

this.receive = <K extends keyof I & string>(event: K, payload: z.TypeOf<I[K]>) => {
if (event === 'subscribed') {
this.receiveSubscribedEvent(payload);
return;
}

// Ignore events until we are subscribed.
if (!this.subscribed) {
return;
}

this.receive = (event: string, payload: Record<string, any>) => {
const schema = this.events.incoming[event];

if (schema === undefined) {
Expand All @@ -48,11 +75,18 @@ export default class Channel<
}

subscribe() {
this.client.push(this.topic, 'subscribe', {});
if (this.subscribed) {
return;
}

this.subscriptionId = randomid();
this.client.push(this.topic, 'subscribe', { id: this.subscriptionId });
this.client.on(this.topic, this.receive);
}

unsubscribe() {
this.subscribed = false;
this.subscriptionId = null;
this.client.push(this.topic, 'unsubscribe', {});
this.client.off(this.topic, this.receive);
}
Expand All @@ -73,6 +107,12 @@ export default class Channel<
}

push<K extends keyof O & string>(event: K, payload: z.TypeOf<O[K]>): void {
// Queue outgoing events until we are subscribed.
if (!this.subscribed) {
this.queue.push({ event, payload });
return;
}

const schema = this.events.outgoing[event];

if (schema === undefined) {
Expand All @@ -81,4 +121,23 @@ export default class Channel<

this.client.push(this.topic, event, schema.parse(payload));
}

private receiveSubscribedEvent(payload: { id: string }) {
// This shouldn't normally happen, but could if multiple channels for the
// same topic were used or if duplicate events sent (cough cough, useEffect).
if (this.subscriptionId !== payload.id) {
return;
}

this.subscribed = true;
this.subscriptionId = null;
this.flush();
}

private flush() {
while (this.queue.length > 0) {
const message = this.queue.shift()!;
this.push(message.event, message.payload);
}
}
}
19 changes: 9 additions & 10 deletions packages/web/src/components/apps/use-app.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,18 @@ export function AppProvider({ app: initialApp, children }: ProviderPropsType) {

const channelRef = useRef(AppChannel.create(app.id));

// This is only meant to be run one time, when the component mounts.
useEffect(() => {
channelRef.current.subscribe();
return () => channelRef.current.unsubscribe();
}, []);

useEffect(() => {
if (app.id === channelRef.current.appId) {
return;
// If the app ID has changed, create a new channel for the new app.
if (channelRef.current.appId !== app.id) {
channelRef.current.unsubscribe();
channelRef.current = AppChannel.create(app.id);
}

channelRef.current.unsubscribe();
channelRef.current = AppChannel.create(app.id);
// Subscribe to the channel
channelRef.current.subscribe();

// Unsubscribe when the component is unmounted
return () => channelRef.current.unsubscribe();
}, [app.id]);

async function updateApp(attrs: { name: string }) {
Expand Down

0 comments on commit 45b6bfa

Please sign in to comment.