Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release/v0.0.1 alpha.160 #144

Merged
merged 2 commits into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,13 @@

All notable changes to this project will be documented in this file. See [standard-version](https://github.com/conventional-changelog/standard-version) for commit guidelines.

### [0.0.1-alpha.160](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.159...v0.0.1-alpha.160) (2024-10-07)


### Features

* fullnode peer improvements ([ba02ee2](https://github.com/DIG-Network/dig-chia-sdk/commit/ba02ee216d95e8295e4138035d823cf9f4cd5480))

### [0.0.1-alpha.159](https://github.com/DIG-Network/dig-chia-sdk/compare/v0.0.1-alpha.158...v0.0.1-alpha.159) (2024-10-07)


Expand Down
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@dignetwork/dig-sdk",
"version": "0.0.1-alpha.159",
"version": "0.0.1-alpha.160",
"description": "",
"type": "commonjs",
"main": "./dist/index.js",
Expand Down
4 changes: 1 addition & 3 deletions src/DigNetwork/PropagationServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,9 @@ export class PropagationServer {

try {
const response = await axios.get(url, config);
console.log(green(`✔ Successfully pinged peer: ${this.ipAddress}`));

return response.data;
} catch (error: any) {
console.error(red(`✖ Failed to ping peer: ${this.ipAddress}`));
console.error(red(error.message));
throw error;
}
Expand Down Expand Up @@ -164,7 +162,7 @@ export class PropagationServer {

return response.data;
} catch (error: any) {
console.error(red(`✖ Failed to ping peer: ${this.ipAddress}`));
console.error(red(`✖ Failed to ping peer: ${this.ipAddress}`), error.message);
console.error(red(error.message));
throw error;
}
Expand Down
99 changes: 43 additions & 56 deletions src/blockchain/FullNodePeer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
// FullNodePeer.ts

import path from "path";
import os from "os";
import fs from "fs";
Expand All @@ -12,7 +10,6 @@ import { Environment } from "../utils/Environment";
import NodeCache from "node-cache";
import Bottleneck from "bottleneck";

// Constants
const FULLNODE_PORT = 8444;
const LOCALHOST = "127.0.0.1";
const CHIA_NODES_HOST = "chia-nodes";
Expand All @@ -24,9 +21,8 @@ const DNS_HOSTS = [
];
const CONNECTION_TIMEOUT = 2000; // in milliseconds
const CACHE_DURATION = 30000; // in milliseconds
const COOLDOWN_DURATION = 60000; // in milliseconds
const COOLDOWN_DURATION = 600000; // 10 minutes in milliseconds
const MAX_PEERS_TO_FETCH = 5; // Maximum number of peers to fetch from DNS
const MAX_RETRIES = 3; // Maximum number of retry attempts
const MAX_REQUESTS_PER_MINUTE = 100; // Per-peer rate limit

/**
Expand All @@ -45,12 +41,12 @@ export class FullNodePeer {
// Singleton instance
private static instance: FullNodePeer | null = null;

// Cached peer with timestamp
private static cachedPeer: { peer: Peer; timestamp: number } | null = null;

// Cooldown cache to exclude faulty peers temporarily
private static cooldownCache = new NodeCache({ stdTTL: COOLDOWN_DURATION / 1000 });

// Failed DNS hosts cooldown cache
private static failedDNSCache = new NodeCache({ stdTTL: COOLDOWN_DURATION / 1000 });

// Peer reliability weights
private static peerWeights: Map<string, number> = new Map();

Expand All @@ -66,6 +62,9 @@ export class FullNodePeer {
// Map to store rate limiters per peer IP
private static peerLimiters: Map<string, Bottleneck> = new Map();

// Round-robin index
private static roundRobinIndex: number = 0;

// Private constructor for singleton pattern
private constructor(private peer: Peer) {}

Expand All @@ -91,7 +90,7 @@ export class FullNodePeer {
this.peer = bestPeer;
FullNodePeer.instance = this; // Assign the initialized instance
} catch (error: any) {
console.error(`Initialization failed: ${error.message}`);
console.error(`Fullnode Initialization failed: ${error.message}`);
throw error;
}
}
Expand All @@ -103,6 +102,8 @@ export class FullNodePeer {
*/
public static async connect(): Promise<Peer> {
const instance = FullNodePeer.getInstance();
// Remove cached peer to ensure a new connection each time
FullNodePeer.cachedPeer = null;
await instance.initialize();
return instance.peer;
}
Expand Down Expand Up @@ -216,6 +217,12 @@ export class FullNodePeer {
// Fetch peers from DNS introducers
const fetchedPeers: string[] = [];
for (const DNS_HOST of DNS_HOSTS) {
// Check if DNS_HOST is in failedDNSCache
if (FullNodePeer.failedDNSCache.has(DNS_HOST)) {
console.warn(`Skipping DNS host ${DNS_HOST} due to recent failure.`);
continue;
}

try {
const ips = await resolve4(DNS_HOST);
if (ips && ips.length > 0) {
Expand All @@ -238,6 +245,8 @@ export class FullNodePeer {
}
} catch (error: any) {
console.error(`Failed to resolve IPs from ${DNS_HOST}: ${error.message}`);
// Add DNS_HOST to failedDNSCache for cooldown
FullNodePeer.failedDNSCache.set(DNS_HOST, true);
}
}

Expand Down Expand Up @@ -286,32 +295,22 @@ export class FullNodePeer {
}

/**
* Selects a peer based on weighted random selection.
* Prioritized peers have higher weights.
* Selects the next peer based on round-robin selection.
* @returns {string} The selected peer IP.
*/
private static selectPeerByWeight(): string {
const peers = Array.from(FullNodePeer.peerWeights.entries())
.filter(([ip, _]) => !FullNodePeer.cooldownCache.has(ip))
.map(([ip, weight]) => ({ ip, weight }));

const totalWeight = peers.reduce((sum, peer) => sum + peer.weight, 0);
if (totalWeight === 0) {
throw new Error("All peers are in cooldown.");
}
private static selectPeerRoundRobin(): string {
const peers = Array.from(FullNodePeer.peerWeights.keys()).filter(
(ip) => !FullNodePeer.cooldownCache.has(ip)
);

const random = Math.random() * totalWeight;
let cumulative = 0;

for (const peer of peers) {
cumulative += peer.weight;
if (random < cumulative) {
return peer.ip;
}
if (peers.length === 0) {
throw new Error("All fullnode peers are in cooldown.");
}

// Fallback
return peers[peers.length - 1].ip;
// Round-robin selection
const selectedPeer = peers[FullNodePeer.roundRobinIndex % peers.length];
FullNodePeer.roundRobinIndex += 1;
return selectedPeer;
}

/**
Expand All @@ -332,32 +331,26 @@ export class FullNodePeer {
}

/**
* Connects to the best available peer based on weighted selection and reliability.
* Connects to the best available peer based on round-robin selection and reliability.
* @returns {Promise<Peer>} The connected Peer instance.
*/
private async getBestPeer(): Promise<Peer> {
const now = Date.now();

// Refresh cachedPeer if expired
if (
FullNodePeer.cachedPeer &&
now - FullNodePeer.cachedPeer.timestamp < CACHE_DURATION
) {
return FullNodePeer.cachedPeer.peer;
}
// Removed cachedPeer logic to ensure a new connection each time

// Fetch peer IPs
const peerIPs = await FullNodePeer.getPeerIPs();

// Setup peer weights with prioritization
FullNodePeer.setupPeers(peerIPs);

// Weighted random selection
// Round-robin selection
let selectedPeerIP: string;
try {
selectedPeerIP = FullNodePeer.selectPeerByWeight();
selectedPeerIP = FullNodePeer.selectPeerRoundRobin();
} catch (error: any) {
throw new Error(`Failed to select a peer: ${error.message}`);
throw new Error(`Failed to select a fullnode peer: ${error.message}`);
}

// Attempt to create a peer connection
Expand All @@ -376,7 +369,7 @@ export class FullNodePeer {
peer = await Peer.new(`${selectedPeerIP}:${FULLNODE_PORT}`, false, tls);
} catch (error: any) {
console.error(
`Failed to create peer for IP ${selectedPeerIP}: ${error.message}`
`Failed to create fullnode peer for IP ${selectedPeerIP}: ${error.message}`
);
// Add to cooldown
FullNodePeer.cooldownCache.set(selectedPeerIP, true);
Expand All @@ -387,7 +380,7 @@ export class FullNodePeer {
} else {
FullNodePeer.peerWeights.delete(selectedPeerIP);
}
throw new Error(`Unable to connect to peer ${selectedPeerIP}`);
throw new Error(`Unable to connect to fullnode peer ${selectedPeerIP}`);
}

// Create a Bottleneck limiter for this peer
Expand All @@ -407,9 +400,6 @@ export class FullNodePeer {
FullNodePeer.peerLimiters.set(selectedPeerIP, limiter);
const proxiedPeer = this.createPeerProxy(peer, selectedPeerIP);

// Cache the peer
FullNodePeer.cachedPeer = { peer: peer, timestamp: now };

console.log(`Using Fullnode Peer: ${selectedPeerIP}`);

return proxiedPeer;
Expand All @@ -427,9 +417,8 @@ export class FullNodePeer {
// Start the timeout to forget the peer after 1 minute
const timeoutPromise = new Promise<null>((_, reject) => {
timeoutId = setTimeout(() => {
FullNodePeer.cachedPeer = null;
reject(
new Error("Operation timed out. Reconnecting to a new peer.")
new Error("Operation timed out. Reconnecting to a new fullnode peer.")
);
}, 60000); // 1 minute
});
Expand All @@ -453,9 +442,7 @@ export class FullNodePeer {
error.message.includes("WebSocket") ||
error.message.includes("Operation timed out")
) {
FullNodePeer.cachedPeer = null;

this.handlePeerDisconnection(peerIP);
FullNodePeer.handlePeerDisconnection(peerIP);
const newPeer = await this.getBestPeer();
return (newPeer as any)[prop](...args);
}
Expand All @@ -472,7 +459,7 @@ export class FullNodePeer {
* Handles peer disconnection by marking it in cooldown and updating internal states.
* @param {string} peerIP - The IP address of the disconnected peer.
*/
public handlePeerDisconnection(peerIP: string): void {
public static handlePeerDisconnection(peerIP: string): void {
// Add the faulty peer to the cooldown cache
FullNodePeer.cooldownCache.set(peerIP, true);

Expand All @@ -490,7 +477,7 @@ export class FullNodePeer {
// Remove the limiter
FullNodePeer.peerLimiters.delete(peerIP);

console.warn(`Peer ${peerIP} has been marked as disconnected and is in cooldown.`);
console.warn(`Fullnode Peer ${peerIP} has been marked as disconnected and is in cooldown.`);
}

/**
Expand Down Expand Up @@ -521,21 +508,21 @@ export class FullNodePeer {
try {
peer = await FullNodePeer.connect();
} catch (error: any) {
spinner.error({ text: "Failed to connect to a peer." });
spinner.error({ text: "Failed to connect to a fullnode peer." });
console.error(`waitForConfirmation connection error: ${error.message}`);
throw error;
}

// Extract peer IP to access the corresponding limiter
const peerIP = FullNodePeer.extractPeerIP(peer);
if (!peerIP) {
spinner.error({ text: "Failed to extract peer IP." });
spinner.error({ text: "Failed to extract fullnode peer IP." });
throw new Error("Failed to extract peer IP.");
}

const limiter = FullNodePeer.peerLimiters.get(peerIP);
if (!limiter) {
spinner.error({ text: "No rate limiter found for the peer." });
spinner.error({ text: "No rate limiter found for the fullnode peer." });
throw new Error("No rate limiter found for the peer.");
}

Expand Down
Loading