Skip to content

Commit

Permalink
fix!: remove default stream close event (#81)
Browse files Browse the repository at this point in the history
* fix!: remove default stream close event

BREAKING CHANGE: there is no "end" event when stream is closing in case you depended on it

The current implementation sends its own event when an async iterable ends, this is bad as the implemented SSE  protocol doesn't expect that message, it's an application level thing. It should be up to the application to send such a message or just close the SSE stream.

Remove that event (Possibly a breaking change if anyone relied on it).

For example, that caused an error in our project from our handler receving that message and crashing cause it's an unknown message type whose body is not even JSON as our application expects.

Contributed on behalf of [Swimm](https://swimm.io/)

* fix tests

Signed-off-by: Marin Petrunic <[email protected]>

* update ci nodejs version

Signed-off-by: Marin Petrunic <[email protected]>

---------

Signed-off-by: Marin Petrunic <[email protected]>
Co-authored-by: Marin Petrunic <[email protected]>
  • Loading branch information
edenhermelin and mpetrunic committed Mar 20, 2024
1 parent 3495dbc commit 311c920
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 38 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
node-version: [16, 18]
node-version: [18, 20]
steps:
- uses: actions/checkout@v3
- name: Use Node.js ${{ matrix.node-version }}
Expand Down
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
{
"name": "fastify-sse-v2",
"version": "3.1.2",
"packageManager": "[email protected]",
"description": "Fastify plugin for sending server side events.",
"main": "lib/index.js",
"types": "lib/index.d.ts",
Expand Down
1 change: 0 additions & 1 deletion src/sse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ export async function* transformAsyncIterable(
for await (const message of source) {
yield serializeSSEEvent(message);
}
yield serializeSSEEvent({ event: "end", data: "Stream closed" });
}

export function serializeSSEEvent(chunk: EventMessage): string {
Expand Down
68 changes: 32 additions & 36 deletions test/fastify4/fastify4.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import {expect} from "chai";
import {FastifyInstance, EventMessage, RouteHandler} from "fastify";
import {getEventSource, getFastifyServer, getBaseUrl} from "./utils";
import pushable, {Pushable} from "it-pushable";
import { get } from "http";
import { expect } from "chai";
import { FastifyInstance, EventMessage, RouteHandler } from "fastify";
import EventSource from "eventsource";
import pushable, { Pushable } from "it-pushable";
import sinon from "sinon";
import {get} from "http";
import { getEventSource, getFastifyServer, getBaseUrl } from "./utils";

describe("Fastify - Test SSE plugin", function () {

let server: FastifyInstance;
let source: Pushable<EventMessage>;

Expand All @@ -17,7 +17,7 @@ describe("Fastify - Test SSE plugin", function () {

afterEach(async function () {
source.end();
if(server) {
if (server) {
await server.close();
}
});
Expand All @@ -33,12 +33,12 @@ describe("Fastify - Test SSE plugin", function () {

it("should set plugin headers", function (done) {
try {
get(getBaseUrl(server), {timeout: 100}, (res) => {
get(getBaseUrl(server), { timeout: 100 }, (res) => {
expect(res.headers["x-test-header2"]).to.be.deep.equal("test2");
res.destroy();
done();
});
} catch(e) {
} catch (e) {
done(e);
}
});
Expand All @@ -65,85 +65,81 @@ describe("Fastify - Test SSE plugin", function () {
eventsource.close();
throw "shouldn't be called";
});
eventsource.addEventListener("end", function (e: Event) {
expect(e.type).to.be.equal("end");
// @ts-ignore
expect(e.data).to.be.equal("Stream closed");
eventsource.close();
done();
eventsource.addEventListener("error", function (e) {
if (e.data == undefined) {
// Connection closed by server
eventsource.close();
done();
} else {
throw "shouldn't happen";
}
});
});

it("should send single event", function (done) {
const eventsource = getEventSource(server);
source.push({data: "Something", id: "1", event: "message"});
eventsource.onmessage = (evt => {
source.push({ data: "Something", id: "1", event: "message" });
eventsource.onmessage = (evt) => {
expect(evt.data).equal("Something");
expect(evt.type).equal("message");
expect(evt.lastEventId).equal("1");
eventsource.close();
done();
});

};
});

it("should send multiple events without async iterable", function (done) {
const handler: RouteHandler = async (req, resp): Promise<void> => {
for await( const event of source) {
for await (const event of source) {
resp.sse(event);
return resp;
}

};
getFastifyServer(handler).then((server2) => {
const eventsource = getEventSource(server2);
source.push({id: "1", event: "message", data: "Something"});
eventsource.onmessage = (evt => {
source.push({ id: "1", event: "message", data: "Something" });
eventsource.onmessage = (evt) => {
expect(evt.data).equal("Something");
expect(evt.type).equal("message");
expect(evt.lastEventId).equal("1");
eventsource.close();
server2.close();
done();
});
};
});

});

it("should send event after headers has been sent by user", function (done) {
const handler: RouteHandler = async (req, resp): Promise<void> => {
resp.header("Content-Type", "text/event-stream");
resp.raw.flushHeaders();
resp.sse({id: "1", event: "message", data: "Something"});
resp.sse({ id: "1", event: "message", data: "Something" });
return resp;
};
getFastifyServer(handler).then((server2) => {
const eventsource = getEventSource(server2);
eventsource.onmessage = (evt => {
eventsource.onmessage = (evt) => {
expect(evt.data).equal("Something");
expect(evt.type).equal("message");
expect(evt.lastEventId).equal("1");
eventsource.close();
server2.close();
done();
});
};
});

});

it("should send multiple events", function (done) {
const eventsource = getEventSource(server);
source.push({id: "1", event: "message", data: "Something"});
source.push({id: "2", event: "message", data: "Something"});
source.push({ id: "1", event: "message", data: "Something" });
source.push({ id: "2", event: "message", data: "Something" });
source.end();
const spy = sinon.spy();
eventsource.onmessage = (() => spy());
eventsource.onerror = (() => {
eventsource.onmessage = () => spy();
eventsource.onerror = () => {
expect(spy.callCount).to.be.equal(2);
eventsource.close();
done();
});

};
});

});

0 comments on commit 311c920

Please sign in to comment.