From a4f0932ccef1ccbbfa392e2247730d916e20fd9e Mon Sep 17 00:00:00 2001 From: Matt Carvin <90224411+mcarvin8@users.noreply.github.com> Date: Tue, 21 Jan 2025 12:19:56 -0500 Subject: [PATCH] fix: process files in parallel --- src/service/deleteReassembledXML.ts | 22 ++++++++------ src/service/getConcurrencyThreshold.ts | 10 +++++++ src/service/json2xmlReassembler.ts | 20 ++++++++----- src/service/transform2JSON.ts | 30 +++++++++++-------- src/service/withConcurrencyLimit.ts | 22 ++++++++++++++ src/service/xml2jsonDisassembler.ts | 40 ++++++++++++++++---------- test/main.spec.ts | 28 ++++++++++++++++++ 7 files changed, 130 insertions(+), 42 deletions(-) create mode 100644 src/service/getConcurrencyThreshold.ts create mode 100644 src/service/withConcurrencyLimit.ts diff --git a/src/service/deleteReassembledXML.ts b/src/service/deleteReassembledXML.ts index be1a6f8..7cb7880 100644 --- a/src/service/deleteReassembledXML.ts +++ b/src/service/deleteReassembledXML.ts @@ -1,19 +1,25 @@ "use strict"; -import { stat, readdir, rm } from "node:fs/promises"; +import { readdir, rm } from "node:fs/promises"; import { join } from "node:path/posix"; +import { getConcurrencyThreshold } from "./getConcurrencyThreshold"; +import { withConcurrencyLimit } from "./withConcurrencyLimit"; export async function deleteReassembledXML( disassembledPath: string, ): Promise { - const files = await readdir(disassembledPath); + const tasks: (() => Promise)[] = []; + const files = await readdir(disassembledPath, { withFileTypes: true }); + const concurrencyLimit = getConcurrencyThreshold(); + for (const file of files) { - const filePath = join(disassembledPath, file); - const fileStat = await stat(filePath); - if (fileStat.isFile() && filePath.endsWith(".xml")) { - await rm(filePath); - } else if (fileStat.isDirectory()) { - await deleteReassembledXML(filePath); + const subFilePath = join(disassembledPath, file.name); + + if (file.isFile() && subFilePath.endsWith(".xml")) { + tasks.push(() => rm(subFilePath)); + } else if (file.isDirectory()) { + tasks.push(() => deleteReassembledXML(subFilePath)); } } + await withConcurrencyLimit(tasks, concurrencyLimit); } diff --git a/src/service/getConcurrencyThreshold.ts b/src/service/getConcurrencyThreshold.ts new file mode 100644 index 0000000..0e360f4 --- /dev/null +++ b/src/service/getConcurrencyThreshold.ts @@ -0,0 +1,10 @@ +"use strict"; + +import { availableParallelism } from "node:os"; + +export function getConcurrencyThreshold(): number { + const AVAILABLE_PARALLELISM: number = availableParallelism + ? availableParallelism() + : Infinity; + return Math.min(AVAILABLE_PARALLELISM, 6); +} diff --git a/src/service/json2xmlReassembler.ts b/src/service/json2xmlReassembler.ts index 1d116e6..d8f9339 100644 --- a/src/service/json2xmlReassembler.ts +++ b/src/service/json2xmlReassembler.ts @@ -7,6 +7,8 @@ import { logger } from "@src/index"; import { reassembleHandler } from "@src/service/reassembleHandler"; import { transform2XML } from "@src/service/transform2XML"; import { deleteReassembledXML } from "@src/service/deleteReassembledXML"; +import { withConcurrencyLimit } from "./withConcurrencyLimit"; +import { getConcurrencyThreshold } from "./getConcurrencyThreshold"; export class JsonToXmlReassembler { async reassemble(xmlAttributes: { @@ -34,15 +36,19 @@ export class JsonToXmlReassembler { } async processFile(filePath: string): Promise { - const files = await readdir(filePath); + const tasks: (() => Promise)[] = []; + const files = await readdir(filePath, { withFileTypes: true }); + const concurrencyLimit = getConcurrencyThreshold(); + for (const file of files) { - const subFilePath = join(filePath, file); - const subFileStat = await stat(subFilePath); - if (subFileStat.isFile() && subFilePath.endsWith(".json")) { - await transform2XML(subFilePath); - } else if (subFileStat.isDirectory()) { - await this.processFile(subFilePath); + const subFilePath = join(filePath, file.name); + + if (file.isFile() && subFilePath.endsWith(".json")) { + tasks.push(() => transform2XML(subFilePath)); + } else if (file.isDirectory()) { + tasks.push(() => this.processFile(subFilePath)); } } + await withConcurrencyLimit(tasks, concurrencyLimit); } } diff --git a/src/service/transform2JSON.ts b/src/service/transform2JSON.ts index b7aa4f3..30f05b6 100644 --- a/src/service/transform2JSON.ts +++ b/src/service/transform2JSON.ts @@ -1,25 +1,31 @@ "use strict"; -import { readdir, rm, stat, writeFile } from "node:fs/promises"; +import { readdir, rm, writeFile } from "node:fs/promises"; import { join } from "node:path/posix"; import { parseXML } from "xml-disassembler"; import { logger } from "@src/index"; +import { withConcurrencyLimit } from "./withConcurrencyLimit"; +import { getConcurrencyThreshold } from "./getConcurrencyThreshold"; export async function transform2JSON(xmlPath: string): Promise { - const subFiles = await readdir(xmlPath); - for (const subFile of subFiles) { - const subFilePath = join(xmlPath, subFile); - if ((await stat(subFilePath)).isDirectory()) { - await transform2JSON(subFilePath); - } else if ( - (await stat(subFilePath)).isFile() && - subFilePath.endsWith(".xml") - ) { - await writeJSON(subFilePath); - await rm(subFilePath); + const tasks: (() => Promise)[] = []; + const files = await readdir(xmlPath, { withFileTypes: true }); + const concurrencyLimit = getConcurrencyThreshold(); + const foldersToRemote = []; + + for (const subFile of files) { + const subFilePath = join(xmlPath, subFile.name); + if (subFile.isDirectory()) { + tasks.push(() => transform2JSON(subFilePath)); + } else if (subFile.isFile() && subFilePath.endsWith(".xml")) { + tasks.push(() => writeJSON(subFilePath)); + foldersToRemote.push(subFilePath); } } + await withConcurrencyLimit(tasks, concurrencyLimit); + const deleteTasks = foldersToRemote.map((filePath) => () => rm(filePath)); + await withConcurrencyLimit(deleteTasks, concurrencyLimit); } async function writeJSON(xmlPath: string): Promise { diff --git a/src/service/withConcurrencyLimit.ts b/src/service/withConcurrencyLimit.ts new file mode 100644 index 0000000..53a5ab7 --- /dev/null +++ b/src/service/withConcurrencyLimit.ts @@ -0,0 +1,22 @@ +export async function withConcurrencyLimit( + tasks: (() => Promise)[], + limit: number, +): Promise { + const results: Promise[] = []; + const executing: Promise[] = []; // Change this to Promise[] + + for (const task of tasks) { + const p = task().then((result) => { + executing.splice(executing.indexOf(p), 1); + return result; + }); + results.push(p); + executing.push(p); + + if (executing.length >= limit) { + await Promise.race(executing); // Wait for the first one to complete + } + } + + return Promise.all(results); +} diff --git a/src/service/xml2jsonDisassembler.ts b/src/service/xml2jsonDisassembler.ts index 1446adb..2727e95 100644 --- a/src/service/xml2jsonDisassembler.ts +++ b/src/service/xml2jsonDisassembler.ts @@ -7,6 +7,8 @@ import { resolve, join, basename, dirname, extname } from "node:path/posix"; import { logger } from "@src/index"; import { disassembleHandler } from "@src/service/disassembleHandler"; import { transform2JSON } from "@src/service/transform2JSON"; +import { withConcurrencyLimit } from "./withConcurrencyLimit"; +import { getConcurrencyThreshold } from "./getConcurrencyThreshold"; export class XmlToJsonDisassembler { async disassemble(xmlAttributes: { @@ -23,36 +25,44 @@ export class XmlToJsonDisassembler { postPurge = false, ignorePath = ".xmldisassemblerignore", } = xmlAttributes; + const concurrencyLimit = getConcurrencyThreshold(); + const tasks = []; const fileStat = await stat(filePath); if (fileStat.isFile()) { const resolvedPath = resolve(filePath); if (!resolvedPath.endsWith(".xml")) { - logger.error(`The file path is not an XML file: ${resolvedPath}`); + logger.error(`The file path is not an XML file: ${resolvedPath}`); return; } - await this.processFile({ - filePath: resolvedPath, - uniqueIdElements, - prePurge, - postPurge, - ignorePath, - }); + tasks.push(() => + this.processFile({ + filePath: resolvedPath, + uniqueIdElements, + prePurge, + postPurge, + ignorePath, + }), + ); } else if (fileStat.isDirectory()) { const subFiles = await readdir(filePath); for (const subFile of subFiles) { const subFilePath = join(filePath, subFile); if (subFilePath.endsWith(".xml")) { - await this.processFile({ - filePath: subFilePath, - uniqueIdElements, - prePurge, - postPurge, - ignorePath, - }); + tasks.push(() => + this.processFile({ + filePath: subFilePath, + uniqueIdElements, + prePurge, + postPurge, + ignorePath, + }), + ); } } } + + await withConcurrencyLimit(tasks, concurrencyLimit); } async processFile(xmlAttributes: { diff --git a/test/main.spec.ts b/test/main.spec.ts index ed8cc18..5e0ec28 100644 --- a/test/main.spec.ts +++ b/test/main.spec.ts @@ -30,6 +30,7 @@ describe("main function", () => { afterEach(async () => { jest.restoreAllMocks(); + jest.resetModules(); }); afterAll(async () => { @@ -120,6 +121,33 @@ describe("main function", () => { await rm(fakeFile); expect(logger.error).toHaveBeenCalled(); }); + it("should return the minimum of available parallelism and 6", () => { + jest.mock("node:os", () => ({ + availableParallelism: jest.fn(() => 4), // Mock availableParallelism to return 4 + })); + const { + getConcurrencyThreshold, + } = require("../src/service/getConcurrencyThreshold"); + expect(getConcurrencyThreshold()).toBe(4); + }); + it("should return 6 if availableParallelism returns a higher value", () => { + jest.mock("node:os", () => ({ + availableParallelism: jest.fn(() => 10), // Mock availableParallelism to return 10 + })); + const { + getConcurrencyThreshold, + } = require("../src/service/getConcurrencyThreshold"); + expect(getConcurrencyThreshold()).toBe(6); + }); + it("should return 6 if availableParallelism is undefined", () => { + jest.mock("node:os", () => ({ + availableParallelism: undefined, // Simulate unavailable function + })); + const { + getConcurrencyThreshold, + } = require("../src/service/getConcurrencyThreshold"); + expect(getConcurrencyThreshold()).toBe(6); + }); // This should always be the final test it("should compare the files created in the mock directory against the baselines to confirm no changes.", async () => { await compareDirectories(baselineDir, mockDir);