Skip to content

Commit

Permalink
Merge pull request #46 from DIG-Network/develop
Browse files Browse the repository at this point in the history
feat: phase out manifest file
  • Loading branch information
MichaelTaylor3D authored Sep 20, 2024
2 parents 9fa4657 + fa89a6c commit 8d71483
Show file tree
Hide file tree
Showing 11 changed files with 1,063 additions and 569 deletions.
856 changes: 844 additions & 12 deletions package-lock.json

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
],
"dependencies": {
"@dignetwork/datalayer-driver": "^0.1.24",
"archiver": "^7.0.1",
"bip39": "^3.1.0",
"chia-bls": "^1.0.2",
"chia-config-loader": "^1.0.1",
Expand All @@ -43,9 +44,11 @@
"nconf": "^0.12.1",
"node-cache": "^5.1.2",
"p-limit": "^6.1.0",
"superagent": "^10.0.0"
"superagent": "^10.0.0",
"unzipper": "^0.12.3"
},
"devDependencies": {
"@types/archiver": "^6.0.2",
"@types/chai": "^4.3.17",
"@types/cli-progress": "^3.11.6",
"@types/crypto-js": "^4.2.2",
Expand All @@ -56,6 +59,7 @@
"@types/nconf": "^0.10.7",
"@types/node": "^22.1.0",
"@types/superagent": "^8.1.8",
"@types/unzipper": "^0.10.10",
"chai": "^5.1.1",
"copyfiles": "^2.4.1",
"mocha": "^10.7.0",
Expand Down
22 changes: 18 additions & 4 deletions src/DigNetwork/ContentServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,11 @@ export class ContentServer {
}

// Method to get the content of a specified key from the peer, with optional challenge query
public async getKey(key: string, rootHash: string, challengeHex?: string): Promise<string> {
public async getKey(
key: string,
rootHash: string,
challengeHex?: string
): Promise<string> {
// Construct the base URL
let url = `https://${this.ipAddress}:${ContentServer.port}/chia.${this.storeId}.${rootHash}/${key}`;

Expand Down Expand Up @@ -83,19 +87,29 @@ export class ContentServer {
}

// Method to check if a specific store exists (HEAD request)
public async headStore(options?: { hasRootHash: string}): Promise<{
public async headStore(options?: { hasRootHash: string }): Promise<{
success: boolean;
headers?: http.IncomingHttpHeaders;
}> {
let url = `https://${this.ipAddress}:${ContentServer.port}/${this.storeId}`;

if (options?.hasRootHash) {
url += `?hasRootHash=${options.hasRootHash}`;
}

return this.head(url);
}

public async hasRootHash(rootHash: string): Promise<boolean> {
const { success, headers } = await this.headStore({
hasRootHash: rootHash,
});
if (success) {
return headers?.["x-has-root-hash"] === "true";
}
return false;
}

public streamKey(key: string): Promise<Readable> {
return new Promise((resolve, reject) => {
const url = `https://${this.ipAddress}:${ContentServer.port}/${this.storeId}/${key}`;
Expand Down Expand Up @@ -285,7 +299,7 @@ export class ContentServer {
const redirectUrl = new URL(response.headers.location, url); // Resolve relative URLs based on the original URL

// Recursively follow the redirect, passing the same query params
// console.log(`Redirecting to: ${redirectUrl.toString()}`);
// console.log(`Redirecting to: ${redirectUrl.toString()}`);
this.fetch(redirectUrl.toString(), maxRedirects - 1)
.then(resolve)
.catch(reject);
Expand Down
198 changes: 2 additions & 196 deletions src/DigNetwork/DigNetwork.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import * as fs from "fs";
import * as path from "path";
import { MultiBar, Presets } from "cli-progress";
import { DigPeer } from "./DigPeer";
import { getDeltaFiles } from "../utils/deltaUtils";
import { getFilePathFromSha256 } from "../utils/hashUtils";
import { DataStore, ServerCoin } from "../blockchain";
import { DIG_FOLDER_PATH } from "../utils/config";
Expand All @@ -26,135 +25,6 @@ export class DigNetwork {
this.peerBlacklist = new Map<string, Set<string>>(); // Initialize empty map for blacklists
}

private async uploadPreflight(
digPeer: DigPeer
): Promise<{ generationIndex: number; lastLocalRootHash: string }> {
// Preflight check is handled internally by PropagationServer if needed
const { lastUploadedHash, generationIndex } =
await digPeer.propagationServer.getUploadDetails();

const rootHistory = await this.dataStore.getLocalRootHistory();

if (!rootHistory || rootHistory.length === 0) {
throw new Error(
"No root hashes found. Please commit your changes first."
);
}

const lastLocalRootHash = rootHistory[rootHistory.length - 1].root_hash;
const localGenerationIndex = rootHistory.length - 1;

// Handle conditions based on the upload details
if (
lastUploadedHash !== lastLocalRootHash &&
generationIndex === localGenerationIndex
) {
throw new Error(
"The repository seems to be corrupted. Please pull the latest changes before pushing."
);
}

if (
lastUploadedHash === lastLocalRootHash &&
generationIndex === localGenerationIndex
) {
throw new Error("No changes detected. Skipping push.");
}

if (
lastUploadedHash !== lastLocalRootHash &&
generationIndex > localGenerationIndex
) {
throw new Error(
"Remote repository is ahead of the local repository. Please pull the latest changes before pushing."
);
}
return { generationIndex, lastLocalRootHash };
}

public async uploadStoreHead(digPeer: DigPeer): Promise<void> {
// First make sure that the remote store is up to date.
const rootHistory = await this.dataStore.getRootHistory();
const localManifestHashes = await this.dataStore.getManifestHashes();
const remoteManifestFile = await digPeer.propagationServer.getStoreData(
"manifest.dat"
);

const remoteManifestHashes = remoteManifestFile.split("\n").filter(Boolean);
const onChainRootHashes = rootHistory.map((root) => root.root_hash);

// Check that remote manifest is one behind on-chain root hashes
if (remoteManifestHashes.length !== onChainRootHashes.length - 1) {
throw new Error(
"Remote manifest should be one behind the on-chain root. Cannot push head."
);
}

// Compare each remote manifest hash with the corresponding on-chain root hash
for (let i = 0; i < remoteManifestHashes.length; i++) {
if (remoteManifestHashes[i] !== onChainRootHashes[i]) {
throw new Error(
`Remote manifest does not match on-chain root at index ${i}. Cannot push head.`
);
}
}

// Get the files for the latest local manifest hash
const filesToUpload = await this.dataStore.getFileSetForRootHash(
localManifestHashes[localManifestHashes.length - 1]
);

if (!filesToUpload.length) {
console.log("No files to upload.");
return;
}

// Upload files to the remote peer with a progress bar
await this.runProgressBar(
filesToUpload.length,
"Store Data",
async (progress) => {
for (const filePath of filesToUpload) {
const relativePath = path
.relative(this.storeDir, filePath)
.replace(/\\/g, "/");
await digPeer.propagationServer.pushFile(filePath, relativePath);
progress.increment();
}
}
);
}

// Uploads the store to a specific peer
public async uploadStore(digPeer: DigPeer): Promise<void> {
const { generationIndex } = await this.uploadPreflight(digPeer);

const filesToUpload = await getDeltaFiles(
this.dataStore.StoreId,
generationIndex,
path.resolve(DIG_FOLDER_PATH, "stores")
);

if (!filesToUpload.length) {
console.log("No files to upload.");
return;
}

await this.runProgressBar(
filesToUpload.length,
"Store Data",
async (progress) => {
for (const filePath of filesToUpload) {
const relativePath = path
.relative(this.storeDir, filePath)
.replace(/\\/g, "/");
await digPeer.propagationServer.pushFile(filePath, relativePath);
progress.increment();
}
}
);
}

public static async subscribeToStore(storeId: string): Promise<void> {
fs.mkdirSync(path.join(DIG_FOLDER_PATH, "stores", storeId), {
recursive: true,
Expand Down Expand Up @@ -232,10 +102,7 @@ export class DigNetwork {
fs.unlinkSync(path.join(DIG_FOLDER_PATH, "stores", storeId + ".json"));
}

// Downloads files from the network based on the manifest
public async downloadFiles(
forceDownload: boolean = false,
renderProgressBar: boolean = true,
skipData: boolean = false
): Promise<void> {
console.log("Starting file download process...");
Expand Down Expand Up @@ -369,8 +236,6 @@ export class DigNetwork {
break;
}

await this.downloadManifestFile(true);

console.log("Syncing store complete.");
} catch (error: any) {
if (selectedPeer) {
Expand Down Expand Up @@ -403,15 +268,6 @@ export class DigNetwork {
);
}

private async downloadManifestFile(forceDownload: boolean): Promise<void> {
const heightFilePath = path.join(this.storeDir, "manifest.dat");
await this.downloadFileFromPeers(
"manifest.dat",
heightFilePath,
forceDownload
);
}

private async downloadFileFromPeers(
dataPath: string,
filePath: string,
Expand All @@ -429,47 +285,14 @@ export class DigNetwork {
try {
if (blacklist.has(digPeer.IpAddress)) continue;

const response = await digPeer.propagationServer.headStore();

if (!response.success) {
continue;
}

// Create directory if it doesn't exist
const directory = path.dirname(tempFilePath);
if (!fs.existsSync(directory)) {
fs.mkdirSync(directory, { recursive: true });
}

// Stream the file data to a temporary file
const fileStream = fs.createWriteStream(tempFilePath);

// Start streaming the data from the peer
const peerStream = await digPeer.propagationServer.streamStoreData(
dataPath
);

// Pipe the peer stream to the temp file
await new Promise<void>((resolve, reject) => {
peerStream.pipe(fileStream);

peerStream.on("end", resolve);
peerStream.on("error", reject);
fileStream.on("error", reject);
});

// Rename the temp file to the final file path after successful download
await rename(tempFilePath, filePath);

if (process.env.DIG_DEBUG === "1") {
console.log(`Downloaded ${dataPath} from ${digPeer.IpAddress}`);
}
await digPeer.downloadData(this.dataStore.StoreId, dataPath);

return; // Exit the method if download succeeds
} catch (error) {
console.warn(
`Failed to download ${dataPath} from ${digPeer.IpAddress}, blacklisting peer and trying next...`
);

blacklist.add(digPeer.IpAddress);

// Clean up the temp file in case of failure
Expand All @@ -478,23 +301,6 @@ export class DigNetwork {
}
}
}

this.peerBlacklist.set(dataPath, blacklist);

if (blacklist.size >= digPeers.length) {
if (process.env.DIG_DEBUG === "1") {
console.warn(
`All peers blacklisted for ${dataPath}. Refreshing peers...`
);
}

digPeers = await this.fetchAvailablePeers();
if (!digPeers.length) {
throw new Error(
`Failed to download ${dataPath}: no peers available.`
);
}
}
}
}

Expand Down
Loading

0 comments on commit 8d71483

Please sign in to comment.