Skip to content

Commit

Permalink
feat(): add highWaterMark option, with 16kb default (#91)
Browse files Browse the repository at this point in the history
* feat(): add highWaterMark option, with 16kb default

* fix types

Signed-off-by: Marin Petrunic <[email protected]>

---------

Signed-off-by: Marin Petrunic <[email protected]>
Co-authored-by: Marin Petrunic <[email protected]>
  • Loading branch information
lucasboleite and mpetrunic authored Dec 2, 2024
1 parent 83d6926 commit fd0382d
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 5 deletions.
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,21 @@ server.register(FastifySSEPlugin, {
})
```

##### Change default highWaterMark

```javascript
import { FastifySSEPlugin } from "fastify-sse-v2";

const server = fastify();

server.register(FastifySSEPlugin) // highWaterMark defaults to 16384 bytes (16kb)

server.register(FastifySSEPlugin, {
highWaterMark: 1024 // override default setting of 16384 (16kb) with 1024 (1kb)
})
```

##### Note

- You can set parameter `retryDelay` to `false` to disable the default behavior of sending retry, or set parameter `retryDelay` to `milliseconds` override the default 3000 retry interval .
- You can set parameter `highWaterMark` to define the buffer size (in bytes) that determines when the buffer is full and a 'flush' should be performed. Default is 16kb. (![Learn more](https://nodejs.org/en/learn/modules/backpressuring-in-streams#how-does-backpressure-resolve-these-issues))
12 changes: 7 additions & 5 deletions src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ export const plugin: FastifyPluginAsync<SsePluginOptions> = async function (
serializeSSEEvent({ retry: options.retryDelay || 3000 })
);
}
handleAsyncIterable(this, this.sseContext.source);
handleAsyncIterable(this, this.sseContext.source, options);
}
if (isAsyncIterable(source)) {
return handleAsyncIterable(this, source);
return handleAsyncIterable(this, source, options);
} else {
if (!this.sseContext?.source) {
this.sseContext = { source: pushable<EventMessage>() };
handleAsyncIterable(this, this.sseContext.source);
handleAsyncIterable(this, this.sseContext.source, options);
}
this.sseContext.source.push(source);
return;
Expand All @@ -49,7 +49,9 @@ export const plugin: FastifyPluginAsync<SsePluginOptions> = async function (

function handleAsyncIterable(
reply: FastifyReply,
source: AsyncIterable<EventMessage>
source: AsyncIterable<EventMessage>,
options: SsePluginOptions
): void {
toStream(transformAsyncIterable(source)).pipe(reply.raw);
const highWaterMark = options.highWaterMark || 16384;
toStream(transformAsyncIterable(source), { highWaterMark }).pipe(reply.raw);
}
6 changes: 6 additions & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,10 @@ export interface SsePluginOptions {
* Update client reconnect interval (how long will client wait before trying to reconnect).
*/
retryDelay?: false | number;

/**
* Set the high-water mark for the event stream (byte size that determines when the buffer is full and a 'flush' should be performed).
* Default is 16kb.
*/
highWaterMark?: number;
}

0 comments on commit fd0382d

Please sign in to comment.