Skip to content

Commit

Permalink
feat: add peer ranker to sync stores
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Oct 6, 2024
1 parent 694b588 commit a63aa91
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 12 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
],
"dependencies": {
"@dignetwork/datalayer-driver": "^0.1.28",
"@dignetwork/dig-sdk": "^0.0.1-alpha.142",
"@dignetwork/dig-sdk": "^0.0.1-alpha.147",
"async-mutex": "^0.5.0",
"busboy": "^1.6.0",
"express": "^4.19.2",
Expand Down
32 changes: 31 additions & 1 deletion src/controllers/storeController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -197,4 +197,34 @@ export const getUserIpAddresses = (req: Request, res: Response): void => {
error: "An error occurred while retrieving the IP addresses.",
});
}
};
};

/**
* Pings the peer to measure latency.
* Responds with a simple success message for the PeerRanker to calculate latency.
*/
export const pingPeer = (req: Request, res: Response): void => {
res.status(200).send('pong');
};

/**
* Handles the upload of random data to measure bandwidth.
* Discards the incoming data and returns a success message once all data is received.
*/
export const uploadTest = (req: Request, res: Response): void => {
// Listen to the data event to consume the incoming data
req.on('data', (chunk) => {
// Discard the chunk of data (we are not saving it)
});

// When the upload is finished, respond with a success message
req.on('end', () => {
res.status(200).send('Upload complete');
});

// Handle potential errors during upload
req.on('error', (err) => {
console.error('Error during upload:', err);
res.status(500).send('Upload failed');
});
};
8 changes: 7 additions & 1 deletion src/routes/storeRoutes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import {
subscribeToStore,
unsubscribeToStore,
syncStoreFromRequestor,
getUserIpAddresses
getUserIpAddresses,
pingPeer,
uploadTest
} from "../controllers/storeController";
import { verifyAuthorization } from "../middleware/verifyAuthorization";

Expand Down Expand Up @@ -51,6 +53,10 @@ router.post("/mnemonic", express.json(), verifyAuthorization, setMnemonic);
router.post("/update", express.json(), syncStoreFromRequestor);
router.post("/peer", express.json(), getUserIpAddresses);

// Diagnostics routes
router.get("/diagnostics/ping", pingPeer);
router.post("/diagnostics/bandwidth", uploadTest);

// Head request to check if a store exists
router.head("/:storeId", verifyMnemonic, headStore);

Expand Down
17 changes: 12 additions & 5 deletions src/tasks/sync_stores.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ import {
NconfManager,
ServerCoin,
DigPeer,
withTimeout
withTimeout,
PeerRanker
} from "@dignetwork/dig-sdk";
import { Mutex } from "async-mutex";
import { getStorageLocation } from "../utils/storage";
Expand Down Expand Up @@ -53,7 +54,7 @@ const processPeer = async (peerIp: string, storeId: string, rootHash: string, ch
*/
const cleanCheckedPeersMap = (currentRootHash: string): void => {
for (const [rootHash, _] of checkedPeersMap.entries()) {
if (rootHash !== currentRootHash) {f
if (rootHash !== currentRootHash) {
checkedPeersMap.delete(rootHash);
console.log(`Removed outdated rootHash ${rootHash} from checkedPeersMap.`);
}
Expand Down Expand Up @@ -94,15 +95,21 @@ const handleSyncedStore = async (storeId: string, serverCoin: ServerCoin): Promi
// Pass the checkedPeers as the blocklist to getActiveEpochPeers
const blocklist = Array.from(checkedPeers);
const peerIps: string[] = await serverCoin.getActiveEpochPeers(blocklist);
console.log(`Active epoch peers for store ${storeId}:`, peerIps);


if (peerIps.length === 0) {
console.log(`No new peers to process for rootHash ${currentRootHash} in store ${storeId}.`);
return;
}

// Process peers one at a time sequentially
for (const peerIp of peerIps) {
console.log(`Ranking peers based on latency and bandwidth...`);
const peerRanker = new PeerRanker(peerIps);
const rankedPeers = await peerRanker.rankPeers();

console.log(`Active epoch peers for store ${storeId}:`, rankedPeers);

// Process peers one at a time sequentially, starting with the best-ranked peers
for (const { ip: peerIp } of rankedPeers) {
if (checkedPeers.has(peerIp)) {
console.log(`Peer ${peerIp} has already been checked for rootHash ${currentRootHash}. Skipping.`);
continue;
Expand Down

0 comments on commit a63aa91

Please sign in to comment.