Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v0.0.1 alpha.15 #15

Merged
merged 2 commits into from
Sep 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.15](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.14...v0.0.1-alpha.15) (2024-09-10)


### Bug Fixes

* write stream in tree ([b10d6a2](https://github.com/DIG-Network/dig-chia-sdk/commit/b10d6a2489fc66ee8c8c51546b0521f39aee3c24))

### [0.0.1-alpha.14](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.10...v0.0.1-alpha.14) (2024-09-10)


Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.14",
"version": "0.0.1-alpha.15",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
86 changes: 61 additions & 25 deletions src/DataIntegrityTree/DataIntegrityTree.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,46 +188,52 @@ class DataIntegrityTree {
if (!isHexString(key)) {
throw new Error(`key must be a valid hex string: ${key}`);
}

const uncompressedHash = crypto.createHash("sha256");
const gzip = zlib.createGzip();

let sha256: string;
const tempDir = path.join(this.storeDir, "tmp");
if (!fs.existsSync(tempDir)) {
fs.mkdirSync(tempDir, { recursive: true });
}
const tempFilePath = path.join(tempDir, `${crypto.randomUUID()}.gz`);

return new Promise((resolve, reject) => {
const tempWriteStream = fs.createWriteStream(tempFilePath);


// Update the hash with the original data
readStream.on("data", (chunk) => {
uncompressedHash.update(chunk);
});


// Pipe the read stream through gzip into the temporary write stream
readStream.pipe(gzip).pipe(tempWriteStream);

tempWriteStream.on("finish", async () => {
sha256 = uncompressedHash.digest("hex");

const finalWriteStream = this._createWriteStream(sha256);
const finalPath = finalWriteStream.path as string;

// Ensure the directory exists before copying the file
const finalDir = path.dirname(finalPath);
if (!fs.existsSync(finalDir)) {
fs.mkdirSync(finalDir, { recursive: true });
}

let finalWriteStream: fs.WriteStream | undefined;
try {
sha256 = uncompressedHash.digest("hex");

finalWriteStream = this._createWriteStream(sha256);
const finalPath = finalWriteStream.path as string;

// Ensure the directory exists
const finalDir = path.dirname(finalPath);
if (!fs.existsSync(finalDir)) {
fs.mkdirSync(finalDir, { recursive: true });
}

// Copy the temporary gzipped file to the final destination
await this._streamFile(tempFilePath, finalPath);
await unlink(tempFilePath);

await unlink(tempFilePath); // Clean up the temporary file
const combinedHash = crypto
.createHash("sha256")
.update(`${key}/${sha256}`)
.digest("hex");


// Check if the key already exists with the same hash
if (
Array.from(this.files.values()).some(
(file) => file.hash === combinedHash
Expand All @@ -236,28 +242,58 @@ class DataIntegrityTree {
console.log(`No changes detected for key: ${key}`);
return resolve();
}


// Delete existing key if present
if (this.files.has(key)) {
this.deleteKey(key);
}


// Insert the new key with the hash
console.log(`Inserted key: ${key}`);
this.files.set(key, {
hash: combinedHash,
sha256: sha256,
});

this._rebuildTree();
resolve();
} catch (err) {
// On error, cleanup the temporary file and reject
await unlink(tempFilePath).catch(() => {});
reject(err);
} finally {
// Always close the final write stream if it exists
if (finalWriteStream) {
finalWriteStream.end();
}
}
});

tempWriteStream.on("error", (err) => {

tempWriteStream.on("error", async (err) => {
// Close streams and clean up in case of error
tempWriteStream.destroy();
gzip.destroy();
readStream.destroy();

await unlink(tempFilePath).catch(() => {}); // Clean up the temp file
reject(err);
});

readStream.on("error", (err) => {

readStream.on("error", async (err) => {
// Close streams and clean up in case of error
tempWriteStream.destroy();
gzip.destroy();
readStream.destroy();

await unlink(tempFilePath).catch(() => {}); // Clean up the temp file
reject(err);
});

gzip.on("error", (err) => {
// Handle errors in the gzip stream
tempWriteStream.destroy();
gzip.destroy();
readStream.destroy();
reject(err);
});
});
Expand Down
29 changes: 19 additions & 10 deletions src/utils/directoryUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ import ignore from "ignore";
import { DataIntegrityTree } from "../DataIntegrityTree";

// Custom concurrency handler
const limitConcurrency = async (concurrencyLimit: number, tasks: (() => Promise<void>)[]) => {
const limitConcurrency = async (
concurrencyLimit: number,
tasks: (() => Promise<void>)[]
) => {
const results = [];
const executing: Promise<void>[] = [];

Expand Down Expand Up @@ -61,16 +64,22 @@ export const addDirectory = async (
tasks.push(() => addDirectory(datalayer, filePath, baseDir));
} else {
// Add a task for each file to be processed
tasks.push(() =>
new Promise<void>((resolve, reject) => {
const stream = fs.createReadStream(filePath);
datalayer
.upsertKey(stream, Buffer.from(relativePath).toString("hex"))
.then(resolve)
.catch(reject);
})
tasks.push(
() =>
new Promise<void>((resolve, reject) => {
const stream = fs.createReadStream(filePath);
datalayer
.upsertKey(stream, Buffer.from(relativePath).toString("hex"))
.then(async () => {
await new Promise<void>((resolve) => setTimeout(resolve, 100));
resolve();
})
.catch(reject);
})
);
}

await new Promise<void>((resolve) => setTimeout(resolve, 100));
}

// Run tasks with limited concurrency (set the concurrency limit as needed)
Expand Down Expand Up @@ -99,4 +108,4 @@ export const calculateFolderSize = (folderPath: string): bigint => {
}

return totalSize;
};
};
Loading