Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v0.0.1 alpha.147 #132

Merged
merged 4 commits into from
Oct 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.147](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.146...v0.0.1-alpha.147) (2024-10-06)

### [0.0.1-alpha.146](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.145...v0.0.1-alpha.146) (2024-10-06)

### [0.0.1-alpha.145](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.144...v0.0.1-alpha.145) (2024-10-06)

### [0.0.1-alpha.144](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.143...v0.0.1-alpha.144) (2024-10-06)
Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.145",
"version": "0.0.1-alpha.147",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
109 changes: 49 additions & 60 deletions src/utils/PeerRanker.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
// src/PeerRanker.ts

import axios, { AxiosRequestConfig } from 'axios';
import fs from 'fs';
import https from 'https';
import { getOrCreateSSLCerts } from './ssl';
import { asyncPool } from './promiseUtils';

/**
* Interface representing the metrics of a peer.
Expand All @@ -14,23 +13,13 @@ export interface PeerMetrics {
bandwidth: number; // in bytes per second (upload speed)
}

/**
* Configuration options for the PeerRanker.
*/
interface PeerRankerOptions {
pingPath?: string; // Optional: Path for latency ping (e.g., '/ping')
timeout?: number; // Timeout for requests in milliseconds
uploadTestSize?: number; // Size of the data to upload in bytes
}

/**
* Utility class to rank peers based on latency and upload bandwidth using HTTPS with mTLS.
*/
export class PeerRanker {
private ipAddresses: string[];
private static certPath: string;
private static keyPath: string;
private pingPath: string;
private timeout: number;
private uploadTestSize: number;

Expand All @@ -41,14 +30,13 @@ export class PeerRanker {
/**
* Constructs a PeerRanker instance.
* @param ipAddresses - Array of IP addresses to rank.
* @param options - Configuration options including paths to client certificates.
*/
constructor(ipAddresses: string[], options: PeerRankerOptions) {
constructor(ipAddresses: string[], timeout: number = 5000, uploadTestSize: number = 1024 * 1024) {
this.ipAddresses = ipAddresses;
this.pingPath = options.pingPath || '/'; // Default to root path if not provided
this.timeout = options.timeout || 5000; // Default timeout: 5 seconds
this.uploadTestSize = options.uploadTestSize || 1024 * 1024; // Default: 1MB
this.timeout = timeout; // Allow customizable timeout
this.uploadTestSize = uploadTestSize; // Default upload size: 1MB

// Fetch the SSL certificates used for mTLS.
const { certPath, keyPath } = getOrCreateSSLCerts();
PeerRanker.certPath = certPath;
PeerRanker.keyPath = keyPath;
Expand All @@ -58,41 +46,38 @@ export class PeerRanker {
* Measures the latency of a given IP address using an HTTPS request.
* Tries HEAD first, then falls back to GET if HEAD is not supported.
* @param ip - The IP address of the peer.
* @returns Promise resolving to the latency in milliseconds.
* @returns Promise resolving to the latency in milliseconds or rejecting if the peer fails.
*/
private async measureLatency(ip: string): Promise<number> {
const path = this.pingPath;
const url = `https://${ip}${path}`;
const url = `https://${ip}:4159/diagnostics/ping`;

// Configuration for HEAD request
const configHead: AxiosRequestConfig = {
url: url,
method: 'HEAD',
httpsAgent: new https.Agent({
cert: fs.readFileSync(PeerRanker.certPath),
key: fs.readFileSync(PeerRanker.keyPath),
rejectUnauthorized: false, // Set to true in production
rejectUnauthorized: false,
}),
timeout: this.timeout,
validateStatus: (status) => status < 500, // Resolve only if status is less than 500
validateStatus: (status) => status < 500,
};

const startTime = Date.now();
try {
const response = await axios(configHead);
if (response.status === 405) { // Method Not Allowed
// Fallback to GET with Range header to minimize data transfer
if (response.status === 405) {
const configGet: AxiosRequestConfig = {
url: url,
method: 'GET',
httpsAgent: new https.Agent({
cert: fs.readFileSync(PeerRanker.certPath),
key: fs.readFileSync(PeerRanker.keyPath),
rejectUnauthorized: false, // Set to true in production
rejectUnauthorized: false,
}),
timeout: this.timeout,
headers: {
'Range': 'bytes=0-0', // Request only the first byte
'Range': 'bytes=0-0',
},
validateStatus: (status) => status < 500,
};
Expand All @@ -102,20 +87,18 @@ export class PeerRanker {
return latency;
} catch (error: any) {
console.error(`Latency measurement failed for IP ${ip}:`, error.message);
return Infinity; // Indicate unreachable or unresponsive peer
throw new Error(`Latency measurement failed for IP ${ip}`);
}
}

/**
* Measures the upload bandwidth of a given IP address by sending random data.
* @param ip - The IP address of the peer.
* @returns Promise resolving to the upload bandwidth in bytes per second.
* @returns Promise resolving to the upload bandwidth in bytes per second or rejecting if the peer fails.
*/
private async measureBandwidth(ip: string): Promise<number> {
const url = `https://${ip}/upload`; // Assume /upload as the endpoint for upload testing

// Generate random data
const randomData = Buffer.alloc(this.uploadTestSize, 'a'); // 1MB of 'a's
const url = `https://${ip}:4159/diagnostics/bandwidth`;
const randomData = Buffer.alloc(this.uploadTestSize, 'a');

const config: AxiosRequestConfig = {
url: url,
Expand All @@ -128,44 +111,52 @@ export class PeerRanker {
httpsAgent: new https.Agent({
cert: fs.readFileSync(PeerRanker.certPath),
key: fs.readFileSync(PeerRanker.keyPath),
rejectUnauthorized: false, // Set to true in production
rejectUnauthorized: false,
}),
timeout: this.timeout,
maxContentLength: Infinity,
maxBodyLength: Infinity,
};

return new Promise<number>((resolve) => {
const startTime = Date.now();

axios(config)
.then(() => {
const timeElapsed = (Date.now() - startTime) / 1000; // seconds
const bandwidth = this.uploadTestSize / timeElapsed; // bytes per second
resolve(bandwidth);
})
.catch((error: any) => {
console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message);
resolve(0); // Indicate failure in measuring bandwidth
});
});
const startTime = Date.now();

try {
await axios(config);
const timeElapsed = (Date.now() - startTime) / 1000;
const bandwidth = this.uploadTestSize / timeElapsed;
return bandwidth;
} catch (error: any) {
console.error(`Bandwidth measurement failed for IP ${ip}:`, error.message);
throw new Error(`Bandwidth measurement failed for IP ${ip}`);
}
}

/**
* Ranks the peers based on measured latency and upload bandwidth.
* Unresponsive peers are excluded from the final ranking.
* @param cooldown - Cooldown time in milliseconds between batches.
* @returns Promise resolving to an array of PeerMetrics sorted by latency and bandwidth.
*/
public async rankPeers(): Promise<PeerMetrics[]> {
const metricsPromises = this.ipAddresses.map(async (ip) => {
const [latency, bandwidth] = await Promise.all([
this.measureLatency(ip),
this.measureBandwidth(ip),
]);

return { ip, latency, bandwidth };
});
public async rankPeers(cooldown: number = 500): Promise<PeerMetrics[]> {
const limit = 5; // Limit to 5 parallel requests at a time

const iteratorFn = async (ip: string): Promise<PeerMetrics | null> => {
try {
const [latency, bandwidth] = await Promise.all([
this.measureLatency(ip),
this.measureBandwidth(ip),
]);
return { ip, latency, bandwidth };
} catch (error) {
// Peer failed, skip it by returning null
return null;
}
};

const peerMetrics: PeerMetrics[] = await Promise.all(metricsPromises);
// Process all peers with a concurrency limit and cooldown between batches
const peerMetrics: PeerMetrics[] = (
await asyncPool(limit, this.ipAddresses, iteratorFn, cooldown)
).filter((metrics: any): metrics is PeerMetrics => metrics !== null); // Use a type guard

// Sort by lowest latency first, then by highest bandwidth
peerMetrics.sort((a, b) => {
Expand All @@ -175,9 +166,7 @@ export class PeerRanker {
return a.latency - b.latency; // Lower latency is better
});

// Update the internal sorted list
this.sortedPeers = peerMetrics;
// Reset the iterator index
this.currentIndex = 0;

return peerMetrics;
Expand Down
40 changes: 22 additions & 18 deletions src/utils/promiseUtils.ts
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@

/**
* Processes items in sequential batches with a concurrency limit.
* Adds a cooldown between batches.
* @param {number} limit - The maximum number of concurrent executions per batch.
* @param {Array<T>} items - The array of items to process.
* @param {(item: T) => Promise<R>} iteratorFn - The async function to apply to each item.
* @param {number} cooldownMs - The cooldown duration between batches in milliseconds.
* @returns {Promise<Array<R>>} - A promise that resolves when all items have been processed.
*/
export async function asyncPool<T, R>(
limit: number,
items: T[],
iteratorFn: (item: T) => Promise<R>
): Promise<R[]> {
const ret: R[] = [];

for (let i = 0; i < items.length; i += limit) {
const batchItems = items.slice(i, i + limit);
const batchPromises = batchItems.map((item) => iteratorFn(item));

// Wait for the current batch to complete before starting the next one
const batchResults = await Promise.all(batchPromises);
ret.push(...batchResults);

// Optional: add a cooldown between batches
// await new Promise((resolve) => setTimeout(resolve, 500));
limit: number,
items: T[],
iteratorFn: (item: T) => Promise<R>,
cooldownMs: number = 500 // Default cooldown of 500ms
): Promise<R[]> {
const ret: R[] = [];

for (let i = 0; i < items.length; i += limit) {
const batchItems = items.slice(i, i + limit);
const batchPromises = batchItems.map((item) => iteratorFn(item));

// Wait for the current batch to complete before starting the next one
const batchResults = await Promise.all(batchPromises);
ret.push(...batchResults);

// Add a cooldown between batches, except after the last batch
if (i + limit < items.length) {
await new Promise((resolve) => setTimeout(resolve, cooldownMs));
}

return ret;
}

return ret;
}
/**
* Helper function to add a timeout to a promise.
* @param promise The original promise.
Expand Down
Loading