From 4cb5eaa1ac31ab4930b23204b6d76672f98c4388 Mon Sep 17 00:00:00 2001 From: Paul Paterson Date: Mon, 18 Mar 2024 16:37:35 -0400 Subject: [PATCH] Initial implementation for streams with http2 module (#235) --- src/http-client/node-http2-client.ts | 119 ++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 3 deletions(-) diff --git a/src/http-client/node-http2-client.ts b/src/http-client/node-http2-client.ts index ebd61bb6..193c2263 100644 --- a/src/http-client/node-http2-client.ts +++ b/src/http-client/node-http2-client.ts @@ -9,8 +9,11 @@ import { HTTPClientOptions, HTTPRequest, HTTPResponse, + HTTPStreamClient, + HTTPStreamRequest, + StreamAdapter, } from "./http-client"; -import { NetworkError } from "../errors"; +import { ServiceError, NetworkError } from "../errors"; // alias http2 types type ClientHttp2Session = any; @@ -22,7 +25,7 @@ type OutgoingHttpHeaders = any; /** * An implementation for {@link HTTPClient} that uses the node http package */ -export class NodeHTTP2Client implements HTTPClient { +export class NodeHTTP2Client implements HTTPClient, HTTPStreamClient { static #clients: Map = new Map(); #http2_session_idle_ms: number; @@ -107,6 +110,11 @@ export class NodeHTTP2Client implements HTTPClient { }); } + /** {@inheritDoc HTTPStreamClient.stream} */ + stream(req: HTTPStreamRequest): StreamAdapter { + return this.#doStream(req); + } + /** {@inheritDoc HTTPClient.close} */ close() { // defend against redundant close calls @@ -170,7 +178,7 @@ export class NodeHTTP2Client implements HTTPClient { // append response data to the data string every time we receive new // data chunks in the response - req.on("data", (chunk: any) => { + req.on("data", (chunk: string) => { responseData += chunk; }); @@ -213,4 +221,109 @@ export class NodeHTTP2Client implements HTTPClient { } }); } + + /** {@inheritDoc HTTPStreamClient.stream} */ + #doStream({ + data: requestData, + headers: requestHeaders, + method, + }: HTTPStreamRequest): StreamAdapter { + let resolveChunk: (chunk: string) => void; + let rejectChunk: (reason: any) => void; + + const setChunkPromise = () => + new Promise((res, rej) => { + resolveChunk = res; + rejectChunk = rej; + }); + + let chunkPromise = setChunkPromise(); + + let req: ClientHttp2Stream; + const onResponse = ( + http2ResponseHeaders: IncomingHttpHeaders & IncomingHttpStatusHeader + ) => { + const status = Number( + http2ResponseHeaders[http2.constants.HTTP2_HEADER_STATUS] + ); + if (!(status >= 200 && status < 400)) { + // Get the error body and then throw an error + let responseData = ""; + + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + responseData += chunk; + }); + + // Once the response is finished, resolve the promise + // TODO: The Client contains the information for how to parse an error + // into the appropriate class, so lift this logic out of the HTTPClient. + req.on("end", () => { + rejectChunk(new ServiceError(JSON.parse(responseData), status)); + }); + } else { + let partOfLine = ""; + + // append response data to the data string every time we receive new + // data chunks in the response + req.on("data", (chunk: string) => { + const chunkLines = (partOfLine + chunk).split("\n"); + + // Yield all complete lines + for (let i = 0; i < chunkLines.length - 1; i++) { + resolveChunk(chunkLines[i].trim()); + chunkPromise = setChunkPromise(); + } + + // Store the partial line + partOfLine = chunkLines[chunkLines.length - 1]; + }); + + // Once the response is finished, resolve the promise + req.on("end", () => { + resolveChunk(partOfLine); + }); + } + }; + + // eslint-disable-next-line @typescript-eslint/no-this-alias + const self = this; + + async function* reader(): AsyncGenerator { + const httpRequestHeaders: OutgoingHttpHeaders = { + ...requestHeaders, + [http2.constants.HTTP2_HEADER_PATH]: "/stream/1", + [http2.constants.HTTP2_HEADER_METHOD]: method, + }; + + const session = self.#connect(); + req = session + .request(httpRequestHeaders) + .setEncoding("utf8") + .on("error", (error: any) => { + rejectChunk(error); + }) + .on("response", onResponse); + + const body = JSON.stringify(requestData); + + req.write(body, "utf8"); + + req.end(); + + while (true) { + yield await chunkPromise; + } + } + + return { + read: reader(), + close: () => { + if (req) { + req.close(); + } + }, + }; + } }