Skip to content

Commit

Permalink
fix: process files in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
mcarvin8 committed Jan 21, 2025
1 parent 6f8fbdd commit a4f0932
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 42 deletions.
22 changes: 14 additions & 8 deletions src/service/deleteReassembledXML.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
"use strict";

import { stat, readdir, rm } from "node:fs/promises";
import { readdir, rm } from "node:fs/promises";
import { join } from "node:path/posix";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";
import { withConcurrencyLimit } from "./withConcurrencyLimit";

export async function deleteReassembledXML(
disassembledPath: string,
): Promise<void> {
const files = await readdir(disassembledPath);
const tasks: (() => Promise<void>)[] = [];
const files = await readdir(disassembledPath, { withFileTypes: true });
const concurrencyLimit = getConcurrencyThreshold();

for (const file of files) {
const filePath = join(disassembledPath, file);
const fileStat = await stat(filePath);
if (fileStat.isFile() && filePath.endsWith(".xml")) {
await rm(filePath);
} else if (fileStat.isDirectory()) {
await deleteReassembledXML(filePath);
const subFilePath = join(disassembledPath, file.name);

if (file.isFile() && subFilePath.endsWith(".xml")) {
tasks.push(() => rm(subFilePath));
} else if (file.isDirectory()) {
tasks.push(() => deleteReassembledXML(subFilePath));
}
}
await withConcurrencyLimit(tasks, concurrencyLimit);
}
10 changes: 10 additions & 0 deletions src/service/getConcurrencyThreshold.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
"use strict";

import { availableParallelism } from "node:os";

export function getConcurrencyThreshold(): number {
const AVAILABLE_PARALLELISM: number = availableParallelism
? availableParallelism()
: Infinity;
return Math.min(AVAILABLE_PARALLELISM, 6);
}
20 changes: 13 additions & 7 deletions src/service/json2xmlReassembler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { logger } from "@src/index";
import { reassembleHandler } from "@src/service/reassembleHandler";
import { transform2XML } from "@src/service/transform2XML";
import { deleteReassembledXML } from "@src/service/deleteReassembledXML";
import { withConcurrencyLimit } from "./withConcurrencyLimit";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export class JsonToXmlReassembler {
async reassemble(xmlAttributes: {
Expand Down Expand Up @@ -34,15 +36,19 @@ export class JsonToXmlReassembler {
}

async processFile(filePath: string): Promise<void> {
const files = await readdir(filePath);
const tasks: (() => Promise<void>)[] = [];
const files = await readdir(filePath, { withFileTypes: true });
const concurrencyLimit = getConcurrencyThreshold();

for (const file of files) {
const subFilePath = join(filePath, file);
const subFileStat = await stat(subFilePath);
if (subFileStat.isFile() && subFilePath.endsWith(".json")) {
await transform2XML(subFilePath);
} else if (subFileStat.isDirectory()) {
await this.processFile(subFilePath);
const subFilePath = join(filePath, file.name);

if (file.isFile() && subFilePath.endsWith(".json")) {
tasks.push(() => transform2XML(subFilePath));
} else if (file.isDirectory()) {
tasks.push(() => this.processFile(subFilePath));
}
}
await withConcurrencyLimit(tasks, concurrencyLimit);
}
}
30 changes: 18 additions & 12 deletions src/service/transform2JSON.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
"use strict";

import { readdir, rm, stat, writeFile } from "node:fs/promises";
import { readdir, rm, writeFile } from "node:fs/promises";
import { join } from "node:path/posix";
import { parseXML } from "xml-disassembler";

import { logger } from "@src/index";
import { withConcurrencyLimit } from "./withConcurrencyLimit";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export async function transform2JSON(xmlPath: string): Promise<void> {
const subFiles = await readdir(xmlPath);
for (const subFile of subFiles) {
const subFilePath = join(xmlPath, subFile);
if ((await stat(subFilePath)).isDirectory()) {
await transform2JSON(subFilePath);
} else if (
(await stat(subFilePath)).isFile() &&
subFilePath.endsWith(".xml")
) {
await writeJSON(subFilePath);
await rm(subFilePath);
const tasks: (() => Promise<void>)[] = [];
const files = await readdir(xmlPath, { withFileTypes: true });
const concurrencyLimit = getConcurrencyThreshold();
const foldersToRemote = [];

for (const subFile of files) {
const subFilePath = join(xmlPath, subFile.name);
if (subFile.isDirectory()) {
tasks.push(() => transform2JSON(subFilePath));
} else if (subFile.isFile() && subFilePath.endsWith(".xml")) {
tasks.push(() => writeJSON(subFilePath));
foldersToRemote.push(subFilePath);
}
}
await withConcurrencyLimit(tasks, concurrencyLimit);
const deleteTasks = foldersToRemote.map((filePath) => () => rm(filePath));
await withConcurrencyLimit(deleteTasks, concurrencyLimit);
}

async function writeJSON(xmlPath: string): Promise<void> {
Expand Down
22 changes: 22 additions & 0 deletions src/service/withConcurrencyLimit.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
export async function withConcurrencyLimit<T>(
tasks: (() => Promise<T>)[],
limit: number,
): Promise<T[]> {
const results: Promise<T>[] = [];
const executing: Promise<T>[] = []; // Change this to Promise<T>[]

for (const task of tasks) {
const p = task().then((result) => {
executing.splice(executing.indexOf(p), 1);
return result;
});
results.push(p);
executing.push(p);

if (executing.length >= limit) {
await Promise.race(executing); // Wait for the first one to complete
}
}

return Promise.all(results);
}
40 changes: 25 additions & 15 deletions src/service/xml2jsonDisassembler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { resolve, join, basename, dirname, extname } from "node:path/posix";
import { logger } from "@src/index";
import { disassembleHandler } from "@src/service/disassembleHandler";
import { transform2JSON } from "@src/service/transform2JSON";
import { withConcurrencyLimit } from "./withConcurrencyLimit";
import { getConcurrencyThreshold } from "./getConcurrencyThreshold";

export class XmlToJsonDisassembler {
async disassemble(xmlAttributes: {
Expand All @@ -23,36 +25,44 @@ export class XmlToJsonDisassembler {
postPurge = false,
ignorePath = ".xmldisassemblerignore",
} = xmlAttributes;
const concurrencyLimit = getConcurrencyThreshold();
const tasks = [];
const fileStat = await stat(filePath);

if (fileStat.isFile()) {
const resolvedPath = resolve(filePath);
if (!resolvedPath.endsWith(".xml")) {
logger.error(`The file path is not an XML file: ${resolvedPath}`);
logger.error(`The file path is not an XML file: ${resolvedPath}`);
return;
}
await this.processFile({
filePath: resolvedPath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
});
tasks.push(() =>
this.processFile({
filePath: resolvedPath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
}),
);
} else if (fileStat.isDirectory()) {
const subFiles = await readdir(filePath);
for (const subFile of subFiles) {
const subFilePath = join(filePath, subFile);
if (subFilePath.endsWith(".xml")) {
await this.processFile({
filePath: subFilePath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
});
tasks.push(() =>
this.processFile({
filePath: subFilePath,
uniqueIdElements,
prePurge,
postPurge,
ignorePath,
}),
);
}
}
}

await withConcurrencyLimit(tasks, concurrencyLimit);
}

async processFile(xmlAttributes: {
Expand Down
28 changes: 28 additions & 0 deletions test/main.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ describe("main function", () => {

afterEach(async () => {
jest.restoreAllMocks();
jest.resetModules();
});

afterAll(async () => {
Expand Down Expand Up @@ -120,6 +121,33 @@ describe("main function", () => {
await rm(fakeFile);
expect(logger.error).toHaveBeenCalled();
});
it("should return the minimum of available parallelism and 6", () => {
jest.mock("node:os", () => ({
availableParallelism: jest.fn(() => 4), // Mock availableParallelism to return 4
}));
const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(4);
});
it("should return 6 if availableParallelism returns a higher value", () => {
jest.mock("node:os", () => ({
availableParallelism: jest.fn(() => 10), // Mock availableParallelism to return 10
}));
const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(6);
});
it("should return 6 if availableParallelism is undefined", () => {
jest.mock("node:os", () => ({
availableParallelism: undefined, // Simulate unavailable function
}));
const {
getConcurrencyThreshold,
} = require("../src/service/getConcurrencyThreshold");
expect(getConcurrencyThreshold()).toBe(6);
});
// This should always be the final test
it("should compare the files created in the mock directory against the baselines to confirm no changes.", async () => {
await compareDirectories(baselineDir, mockDir);
Expand Down

0 comments on commit a4f0932

Please sign in to comment.