Skip to content

Commit

Permalink
Initial implementation for streams with http2 module (#235)
Browse files Browse the repository at this point in the history
  • Loading branch information
ptpaterson authored Mar 18, 2024
1 parent c6fecd9 commit 4cb5eaa
Showing 1 changed file with 116 additions and 3 deletions.
119 changes: 116 additions & 3 deletions src/http-client/node-http2-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@ import {
HTTPClientOptions,
HTTPRequest,
HTTPResponse,
HTTPStreamClient,
HTTPStreamRequest,
StreamAdapter,
} from "./http-client";
import { NetworkError } from "../errors";
import { ServiceError, NetworkError } from "../errors";

// alias http2 types
type ClientHttp2Session = any;
Expand All @@ -22,7 +25,7 @@ type OutgoingHttpHeaders = any;
/**
* An implementation for {@link HTTPClient} that uses the node http package
*/
export class NodeHTTP2Client implements HTTPClient {
export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient {
static #clients: Map<string, NodeHTTP2Client> = new Map();

#http2_session_idle_ms: number;
Expand Down Expand Up @@ -107,6 +110,11 @@ export class NodeHTTP2Client implements HTTPClient {
});
}

/** {@inheritDoc HTTPStreamClient.stream} */
stream(req: HTTPStreamRequest): StreamAdapter {
return this.#doStream(req);
}

/** {@inheritDoc HTTPClient.close} */
close() {
// defend against redundant close calls
Expand Down Expand Up @@ -170,7 +178,7 @@ export class NodeHTTP2Client implements HTTPClient {

// append response data to the data string every time we receive new
// data chunks in the response
req.on("data", (chunk: any) => {
req.on("data", (chunk: string) => {
responseData += chunk;
});

Expand Down Expand Up @@ -213,4 +221,109 @@ export class NodeHTTP2Client implements HTTPClient {
}
});
}

/** {@inheritDoc HTTPStreamClient.stream} */
#doStream({
data: requestData,
headers: requestHeaders,
method,
}: HTTPStreamRequest): StreamAdapter {
let resolveChunk: (chunk: string) => void;
let rejectChunk: (reason: any) => void;

const setChunkPromise = () =>
new Promise<string>((res, rej) => {
resolveChunk = res;
rejectChunk = rej;
});

let chunkPromise = setChunkPromise();

let req: ClientHttp2Stream;
const onResponse = (
http2ResponseHeaders: IncomingHttpHeaders & IncomingHttpStatusHeader
) => {
const status = Number(
http2ResponseHeaders[http2.constants.HTTP2_HEADER_STATUS]
);
if (!(status >= 200 && status < 400)) {
// Get the error body and then throw an error
let responseData = "";

// append response data to the data string every time we receive new
// data chunks in the response
req.on("data", (chunk: string) => {
responseData += chunk;
});

// Once the response is finished, resolve the promise
// TODO: The Client contains the information for how to parse an error
// into the appropriate class, so lift this logic out of the HTTPClient.
req.on("end", () => {
rejectChunk(new ServiceError(JSON.parse(responseData), status));
});
} else {
let partOfLine = "";

// append response data to the data string every time we receive new
// data chunks in the response
req.on("data", (chunk: string) => {
const chunkLines = (partOfLine + chunk).split("\n");

// Yield all complete lines
for (let i = 0; i < chunkLines.length - 1; i++) {
resolveChunk(chunkLines[i].trim());
chunkPromise = setChunkPromise();
}

// Store the partial line
partOfLine = chunkLines[chunkLines.length - 1];
});

// Once the response is finished, resolve the promise
req.on("end", () => {
resolveChunk(partOfLine);
});
}
};

// eslint-disable-next-line @typescript-eslint/no-this-alias
const self = this;

async function* reader(): AsyncGenerator<string> {
const httpRequestHeaders: OutgoingHttpHeaders = {
...requestHeaders,
[http2.constants.HTTP2_HEADER_PATH]: "/stream/1",
[http2.constants.HTTP2_HEADER_METHOD]: method,
};

const session = self.#connect();
req = session
.request(httpRequestHeaders)
.setEncoding("utf8")
.on("error", (error: any) => {
rejectChunk(error);
})
.on("response", onResponse);

const body = JSON.stringify(requestData);

req.write(body, "utf8");

req.end();

while (true) {
yield await chunkPromise;
}
}

return {
read: reader(),
close: () => {
if (req) {
req.close();
}
},
};
}
}

0 comments on commit 4cb5eaa

Please sign in to comment.