Skip to content

Commit

Permalink
fix: ensure order of stream pipes
Browse files Browse the repository at this point in the history
  • Loading branch information
invakid404 committed Nov 21, 2024
1 parent dead9d9 commit dead9c8
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 7 deletions.
11 changes: 7 additions & 4 deletions src/generator/context.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { AsyncLocalStorage } from "node:async_hooks";
import { Writable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { ResourceTypes } from "../windmill/resourceTypes.js";
import { InMemoryDuplex } from "../utils/inMemoryDuplex.js";

Expand All @@ -16,9 +17,9 @@ export const run = async <T,>(
allResourceTypes: ResourceTypes,
cb: () => T,
) => {
const write = (content: string) =>
const write = (content: string, stream = output) =>
new Promise<void>((resolve, reject) =>
output.write(content + "\n", (err) => {
stream.write(content + "\n", (err) => {
if (err != null) {
return void reject(err);
}
Expand Down Expand Up @@ -48,10 +49,12 @@ export const run = async <T,>(

// NOTE: in order to avoid the output being dependent on the write order,
// deferred writes are sorted before written to the output
buffer.write(deferredWrites.sort().join("\n"));
buffer.pipe(output);
await write(deferredWrites.sort().join("\n"), buffer);
await pipeline(buffer, output, { end: false });
}

await new Promise<void>((resolve) => output.end(() => resolve()));

return result;
};

Expand Down
7 changes: 4 additions & 3 deletions src/generator/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Writable } from "node:stream";
import { pipeline } from "node:stream/promises";
import { writePreamble } from "./preamble.js";
import { run } from "./context.js";
import { listResourceTypes } from "../windmill/resourceTypes.js";
Expand All @@ -19,8 +20,8 @@ export const generate = async (output: Writable) => {
),
);

results.forEach(({ buffer }) => {
buffer.pipe(output);
});
for (const { buffer } of results) {
await pipeline(buffer, output, { end: false });
}
});
};

0 comments on commit dead9c8

Please sign in to comment.