diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c04c71..05d21b0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines. +### [0.0.1-alpha.147](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.146...v0.0.1-alpha.147) (2024-10-06) + +### [0.0.1-alpha.146](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.145...v0.0.1-alpha.146) (2024-10-06) + ### [0.0.1-alpha.145](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.144...v0.0.1-alpha.145) (2024-10-06) ### [0.0.1-alpha.144](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.143...v0.0.1-alpha.144) (2024-10-06) diff --git a/package-lock.json b/package-lock.json index 5e2d1eb..75dea4d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.145", + "version": "0.0.1-alpha.147", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.145", + "version": "0.0.1-alpha.147", "license": "ISC", "dependencies": { "@dignetwork/datalayer-driver": "^0.1.29", diff --git a/package.json b/package.json index 9af7800..339aa1d 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@dignetwork/dig-sdk", - "version": "0.0.1-alpha.145", + "version": "0.0.1-alpha.147", "description": "", "type": "commonjs", "main": "./dist/index.js", diff --git a/src/utils/PeerRanker.ts b/src/utils/PeerRanker.ts index f2cfe97..4c0ae01 100644 --- a/src/utils/PeerRanker.ts +++ b/src/utils/PeerRanker.ts @@ -1,9 +1,8 @@ -// src/PeerRanker.ts - import axios, { AxiosRequestConfig } from 'axios'; import fs from 'fs'; import https from 'https'; import { getOrCreateSSLCerts } from './ssl'; +import { asyncPool } from './promiseUtils'; /** * Interface representing the metrics of a peer. @@ -14,15 +13,6 @@ export interface PeerMetrics { bandwidth: number; // in bytes per second (upload speed) } -/** - * Configuration options for the PeerRanker. - */ -interface PeerRankerOptions { - pingPath?: string; // Optional: Path for latency ping (e.g., '/ping') - timeout?: number; // Timeout for requests in milliseconds - uploadTestSize?: number; // Size of the data to upload in bytes -} - /** * Utility class to rank peers based on latency and upload bandwidth using HTTPS with mTLS. */ @@ -30,7 +20,6 @@ export class PeerRanker { private ipAddresses: string[]; private static certPath: string; private static keyPath: string; - private pingPath: string; private timeout: number; private uploadTestSize: number; @@ -41,14 +30,13 @@ export class PeerRanker { /** * Constructs a PeerRanker instance. * @param ipAddresses - Array of IP addresses to rank. - * @param options - Configuration options including paths to client certificates. */ - constructor(ipAddresses: string[], options: PeerRankerOptions) { + constructor(ipAddresses: string[], timeout: number = 5000, uploadTestSize: number = 1024 * 1024) { this.ipAddresses = ipAddresses; - this.pingPath = options.pingPath || '/'; // Default to root path if not provided - this.timeout = options.timeout || 5000; // Default timeout: 5 seconds - this.uploadTestSize = options.uploadTestSize || 1024 * 1024; // Default: 1MB + this.timeout = timeout; // Allow customizable timeout + this.uploadTestSize = uploadTestSize; // Default upload size: 1MB + // Fetch the SSL certificates used for mTLS. const { certPath, keyPath } = getOrCreateSSLCerts(); PeerRanker.certPath = certPath; PeerRanker.keyPath = keyPath; @@ -58,41 +46,38 @@ export class PeerRanker { * Measures the latency of a given IP address using an HTTPS request. * Tries HEAD first, then falls back to GET if HEAD is not supported. * @param ip - The IP address of the peer. - * @returns Promise resolving to the latency in milliseconds. + * @returns Promise resolving to the latency in milliseconds or rejecting if the peer fails. */ private async measureLatency(ip: string): Promise { - const path = this.pingPath; - const url = `https://${ip}${path}`; + const url = `https://${ip}:4159/diagnostics/ping`; - // Configuration for HEAD request const configHead: AxiosRequestConfig = { url: url, method: 'HEAD', httpsAgent: new https.Agent({ cert: fs.readFileSync(PeerRanker.certPath), key: fs.readFileSync(PeerRanker.keyPath), - rejectUnauthorized: false, // Set to true in production + rejectUnauthorized: false, }), timeout: this.timeout, - validateStatus: (status) => status < 500, // Resolve only if status is less than 500 + validateStatus: (status) => status < 500, }; const startTime = Date.now(); try { const response = await axios(configHead); - if (response.status === 405) { // Method Not Allowed - // Fallback to GET with Range header to minimize data transfer + if (response.status === 405) { const configGet: AxiosRequestConfig = { url: url, method: 'GET', httpsAgent: new https.Agent({ cert: fs.readFileSync(PeerRanker.certPath), key: fs.readFileSync(PeerRanker.keyPath), - rejectUnauthorized: false, // Set to true in production + rejectUnauthorized: false, }), timeout: this.timeout, headers: { - 'Range': 'bytes=0-0', // Request only the first byte + 'Range': 'bytes=0-0', }, validateStatus: (status) => status < 500, }; @@ -102,20 +87,18 @@ export class PeerRanker { return latency; } catch (error: any) { console.error(`Latency measurement failed for IP ${ip}:`, error.message); - return Infinity; // Indicate unreachable or unresponsive peer + throw new Error(`Latency measurement failed for IP ${ip}`); } } /** * Measures the upload bandwidth of a given IP address by sending random data. * @param ip - The IP address of the peer. - * @returns Promise resolving to the upload bandwidth in bytes per second. + * @returns Promise resolving to the upload bandwidth in bytes per second or rejecting if the peer fails. */ private async measureBandwidth(ip: string): Promise { - const url = `https://${ip}/upload`; // Assume /upload as the endpoint for upload testing - - // Generate random data - const randomData = Buffer.alloc(this.uploadTestSize, 'a'); // 1MB of 'a's + const url = `https://${ip}:4159/diagnostics/bandwidth`; + const randomData = Buffer.alloc(this.uploadTestSize, 'a'); const config: AxiosRequestConfig = { url: url, @@ -128,44 +111,52 @@ export class PeerRanker { httpsAgent: new https.Agent({ cert: fs.readFileSync(PeerRanker.certPath), key: fs.readFileSync(PeerRanker.keyPath), - rejectUnauthorized: false, // Set to true in production + rejectUnauthorized: false, }), timeout: this.timeout, maxContentLength: Infinity, maxBodyLength: Infinity, }; - return new Promise((resolve) => { - const startTime = Date.now(); - - axios(config) - .then(() => { - const timeElapsed = (Date.now() - startTime) / 1000; // seconds - const bandwidth = this.uploadTestSize / timeElapsed; // bytes per second - resolve(bandwidth); - }) - .catch((error: any) => { - console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message); - resolve(0); // Indicate failure in measuring bandwidth - }); - }); + const startTime = Date.now(); + + try { + await axios(config); + const timeElapsed = (Date.now() - startTime) / 1000; + const bandwidth = this.uploadTestSize / timeElapsed; + return bandwidth; + } catch (error: any) { + console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message); + throw new Error(`Bandwidth measurement failed for IP ${ip}`); + } } /** * Ranks the peers based on measured latency and upload bandwidth. + * Unresponsive peers are excluded from the final ranking. + * @param cooldown - Cooldown time in milliseconds between batches. * @returns Promise resolving to an array of PeerMetrics sorted by latency and bandwidth. */ - public async rankPeers(): Promise { - const metricsPromises = this.ipAddresses.map(async (ip) => { - const [latency, bandwidth] = await Promise.all([ - this.measureLatency(ip), - this.measureBandwidth(ip), - ]); - - return { ip, latency, bandwidth }; - }); + public async rankPeers(cooldown: number = 500): Promise { + const limit = 5; // Limit to 5 parallel requests at a time + + const iteratorFn = async (ip: string): Promise => { + try { + const [latency, bandwidth] = await Promise.all([ + this.measureLatency(ip), + this.measureBandwidth(ip), + ]); + return { ip, latency, bandwidth }; + } catch (error) { + // Peer failed, skip it by returning null + return null; + } + }; - const peerMetrics: PeerMetrics[] = await Promise.all(metricsPromises); + // Process all peers with a concurrency limit and cooldown between batches + const peerMetrics: PeerMetrics[] = ( + await asyncPool(limit, this.ipAddresses, iteratorFn, cooldown) + ).filter((metrics: any): metrics is PeerMetrics => metrics !== null); // Use a type guard // Sort by lowest latency first, then by highest bandwidth peerMetrics.sort((a, b) => { @@ -175,9 +166,7 @@ export class PeerRanker { return a.latency - b.latency; // Lower latency is better }); - // Update the internal sorted list this.sortedPeers = peerMetrics; - // Reset the iterator index this.currentIndex = 0; return peerMetrics; diff --git a/src/utils/promiseUtils.ts b/src/utils/promiseUtils.ts index 16c3ef2..4ff17cc 100644 --- a/src/utils/promiseUtils.ts +++ b/src/utils/promiseUtils.ts @@ -1,33 +1,37 @@ /** * Processes items in sequential batches with a concurrency limit. + * Adds a cooldown between batches. * @param {number} limit - The maximum number of concurrent executions per batch. * @param {Array} items - The array of items to process. * @param {(item: T) => Promise} iteratorFn - The async function to apply to each item. + * @param {number} cooldownMs - The cooldown duration between batches in milliseconds. * @returns {Promise>} - A promise that resolves when all items have been processed. */ export async function asyncPool( - limit: number, - items: T[], - iteratorFn: (item: T) => Promise - ): Promise { - const ret: R[] = []; - - for (let i = 0; i < items.length; i += limit) { - const batchItems = items.slice(i, i + limit); - const batchPromises = batchItems.map((item) => iteratorFn(item)); - - // Wait for the current batch to complete before starting the next one - const batchResults = await Promise.all(batchPromises); - ret.push(...batchResults); - - // Optional: add a cooldown between batches - // await new Promise((resolve) => setTimeout(resolve, 500)); + limit: number, + items: T[], + iteratorFn: (item: T) => Promise, + cooldownMs: number = 500 // Default cooldown of 500ms +): Promise { + const ret: R[] = []; + + for (let i = 0; i < items.length; i += limit) { + const batchItems = items.slice(i, i + limit); + const batchPromises = batchItems.map((item) => iteratorFn(item)); + + // Wait for the current batch to complete before starting the next one + const batchResults = await Promise.all(batchPromises); + ret.push(...batchResults); + + // Add a cooldown between batches, except after the last batch + if (i + limit < items.length) { + await new Promise((resolve) => setTimeout(resolve, cooldownMs)); } - - return ret; } + return ret; +} /** * Helper function to add a timeout to a promise. * @param promise The original promise.