diff --git a/package-lock.json b/package-lock.json index fb5cb97..c1ec3a1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", - "@dignetwork/dig-sdk": "^0.0.1-alpha.142", + "@dignetwork/dig-sdk": "^0.0.1-alpha.147", "async-mutex": "^0.5.0", "busboy": "^1.6.0", "express": "^4.19.2", @@ -252,9 +252,9 @@ } }, "node_modules/@dignetwork/dig-sdk": { - "version": "0.0.1-alpha.142", - "resolved": "https://registry.npmjs.org/@dignetwork/dig-sdk/-/dig-sdk-0.0.1-alpha.142.tgz", - "integrity": "sha512-m9IU6OLRK1cphvR9RECY59hek77pKNquU++kCR0sH9OcSR/FA1PyQqnIfBQjq9Dcgiy/hf7ATbc7HTe5hJfgng==", + "version": "0.0.1-alpha.147", + "resolved": "https://registry.npmjs.org/@dignetwork/dig-sdk/-/dig-sdk-0.0.1-alpha.147.tgz", + "integrity": "sha512-BpH3WXcXqPn011YkQvM94IH314NuwiNvKFr+Nd7+geXbHoMBXoSvoMW6B5ZZT4oLrZJioqqdpy4DL9SOK18OIg==", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.29", "@dignetwork/dig-sdk": "^0.0.1-alpha.124", diff --git a/package.json b/package.json index c93c83c..6c09fe1 100644 --- a/package.json +++ b/package.json @@ -26,7 +26,7 @@ ], "dependencies": { "@dignetwork/datalayer-driver": "^0.1.28", - "@dignetwork/dig-sdk": "^0.0.1-alpha.142", + "@dignetwork/dig-sdk": "^0.0.1-alpha.147", "async-mutex": "^0.5.0", "busboy": "^1.6.0", "express": "^4.19.2", diff --git a/src/controllers/storeController.ts b/src/controllers/storeController.ts index c82371d..6ff9bfe 100644 --- a/src/controllers/storeController.ts +++ b/src/controllers/storeController.ts @@ -197,4 +197,34 @@ export const getUserIpAddresses = (req: Request, res: Response): void => { error: "An error occurred while retrieving the IP addresses.", }); } -}; \ No newline at end of file +}; + +/** + * Pings the peer to measure latency. + * Responds with a simple success message for the PeerRanker to calculate latency. + */ +export const pingPeer = (req: Request, res: Response): void => { + res.status(200).send('pong'); +}; + +/** + * Handles the upload of random data to measure bandwidth. + * Discards the incoming data and returns a success message once all data is received. + */ +export const uploadTest = (req: Request, res: Response): void => { + // Listen to the data event to consume the incoming data + req.on('data', (chunk) => { + // Discard the chunk of data (we are not saving it) + }); + + // When the upload is finished, respond with a success message + req.on('end', () => { + res.status(200).send('Upload complete'); + }); + + // Handle potential errors during upload + req.on('error', (err) => { + console.error('Error during upload:', err); + res.status(500).send('Upload failed'); + }); +}; diff --git a/src/routes/storeRoutes.ts b/src/routes/storeRoutes.ts index 760602f..766d5cb 100644 --- a/src/routes/storeRoutes.ts +++ b/src/routes/storeRoutes.ts @@ -17,7 +17,9 @@ import { subscribeToStore, unsubscribeToStore, syncStoreFromRequestor, - getUserIpAddresses + getUserIpAddresses, + pingPeer, + uploadTest } from "../controllers/storeController"; import { verifyAuthorization } from "../middleware/verifyAuthorization"; @@ -51,6 +53,10 @@ router.post("/mnemonic", express.json(), verifyAuthorization, setMnemonic); router.post("/update", express.json(), syncStoreFromRequestor); router.post("/peer", express.json(), getUserIpAddresses); +// Diagnostics routes +router.get("/diagnostics/ping", pingPeer); +router.post("/diagnostics/bandwidth", uploadTest); + // Head request to check if a store exists router.head("/:storeId", verifyMnemonic, headStore); diff --git a/src/tasks/sync_stores.ts b/src/tasks/sync_stores.ts index 5db9203..802e881 100644 --- a/src/tasks/sync_stores.ts +++ b/src/tasks/sync_stores.ts @@ -8,7 +8,8 @@ import { NconfManager, ServerCoin, DigPeer, - withTimeout + withTimeout, + PeerRanker } from "@dignetwork/dig-sdk"; import { Mutex } from "async-mutex"; import { getStorageLocation } from "../utils/storage"; @@ -53,7 +54,7 @@ const processPeer = async (peerIp: string, storeId: string, rootHash: string, ch */ const cleanCheckedPeersMap = (currentRootHash: string): void => { for (const [rootHash, _] of checkedPeersMap.entries()) { - if (rootHash !== currentRootHash) {f + if (rootHash !== currentRootHash) { checkedPeersMap.delete(rootHash); console.log(`Removed outdated rootHash ${rootHash} from checkedPeersMap.`); } @@ -94,15 +95,21 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi // Pass the checkedPeers as the blocklist to getActiveEpochPeers const blocklist = Array.from(checkedPeers); const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist); - console.log(`Active epoch peers for store ${storeId}:`, peerIps); + if (peerIps.length === 0) { console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`); return; } - // Process peers one at a time sequentially - for (const peerIp of peerIps) { + console.log(`Ranking peers based on latency and bandwidth...`); + const peerRanker = new PeerRanker(peerIps); + const rankedPeers = await peerRanker.rankPeers(); + + console.log(`Active epoch peers for store ${storeId}:`, rankedPeers); + + // Process peers one at a time sequentially, starting with the best-ranked peers + for (const { ip: peerIp } of rankedPeers) { if (checkedPeers.has(peerIp)) { console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`); continue;