Skip to content

Commit

Permalink
feat: more advanced fullnode throttle logic
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelTaylor3D committed Oct 5, 2024
1 parent ae0acf0 commit 5cd3f5a
Showing 1 changed file with 144 additions and 92 deletions.
236 changes: 144 additions & 92 deletions src/blockchain/FullNodePeer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,17 @@ const CACHE_DURATION = 30000; // in milliseconds
const COOLDOWN_DURATION = 300000; // 5 minutes in milliseconds
const MAX_PEERS_TO_FETCH = 5; // Maximum number of peers to fetch from DNS
const MAX_RETRIES = 3; // Maximum number of retry attempts
const MAX_REQUESTS_PER_MINUTE = 100; // Throttle limit
const MAX_REQUESTS_PER_MINUTE = 100; // Per-peer rate limit

/**
* Represents a peer with its reliability weight and address.
* Represents a peer with its reliability weight, address, and rate limiter.
*/
interface PeerInfo {
peer: Peer;
weight: number;
address: string;
isConnected: boolean; // Indicates if the peer is currently connected
}

/**
* Represents a queued method call.
*/
interface QueuedCall {
execute: () => Promise<any>;
resolve: (value: any) => void;
reject: (reason?: any) => void;
limiter: Bottleneck; // Rate limiter for the peer
}

/**
Expand Down Expand Up @@ -71,11 +63,11 @@ export class FullNodePeer {
// Cache for fetched peer IPs
private static peerIPCache = new NodeCache({ stdTTL: CACHE_DURATION / 1000 });

// Bottleneck instance for global throttling
private static limiter = new Bottleneck({
maxConcurrent: 1, // Ensures only one request is processed at a time
minTime: 60000 / MAX_REQUESTS_PER_MINUTE, // Calculated delay between requests
});
// List of available peers for round-robin
private static availablePeers: string[] = [];

// Current index for round-robin selection
private static currentPeerIndex: number = 0;

// Private constructor for singleton pattern
private constructor(private peer: Peer) {}
Expand Down Expand Up @@ -302,32 +294,16 @@ export class FullNodePeer {
}

/**
* Selects a peer based on weighted random selection.
* Prioritized peers have higher weights.
* Selects the next peer based on round-robin selection.
* @returns {string} The selected peer IP.
*/
private static selectPeerByWeight(): string {
const peers = Array.from(FullNodePeer.peerWeights.entries())
.filter(([ip, _]) => !FullNodePeer.cooldownCache.has(ip))
.map(([ip, weight]) => ({ ip, weight }));

const totalWeight = peers.reduce((sum, peer) => sum + peer.weight, 0);
if (totalWeight === 0) {
throw new Error("All peers are in cooldown.");
private static getNextPeerIP(): string {
if (FullNodePeer.availablePeers.length === 0) {
throw new Error("No available peers to select.");
}

const random = Math.random() * totalWeight;
let cumulative = 0;

for (const peer of peers) {
cumulative += peer.weight;
if (random < cumulative) {
return peer.ip;
}
}

// Fallback
return peers[peers.length - 1].ip;
const peerIP = FullNodePeer.availablePeers[FullNodePeer.currentPeerIndex];
FullNodePeer.currentPeerIndex = (FullNodePeer.currentPeerIndex + 1) % FullNodePeer.availablePeers.length;
return peerIP;
}

/**
Expand All @@ -348,7 +324,7 @@ export class FullNodePeer {
}

/**
* Connects to the best available peer based on weighted selection and reliability.
* Connects to the best available peer based on round-robin selection and reliability.
* @returns {Promise<Peer>} The connected Peer instance.
*/
private static async getBestPeer(): Promise<Peer> {
Expand All @@ -369,65 +345,119 @@ export class FullNodePeer {
// Setup peer weights with prioritization
FullNodePeer.setupPeers(peerIPs);

// Weighted random selection
let selectedPeerIP: string;
try {
selectedPeerIP = FullNodePeer.selectPeerByWeight();
} catch (error: any) {
throw new Error(`Failed to select a peer: ${error.message}`);
}
// Initialize or update peerInfos and availablePeers
for (const ip of peerIPs) {
if (!FullNodePeer.peerInfos.has(ip)) {
// Create a new Bottleneck limiter for the peer
const limiter = new Bottleneck({
maxConcurrent: 1, // One request at a time per peer
minTime: 60000 / MAX_REQUESTS_PER_MINUTE, // 600 ms between requests for 100 requests/min
});

// Attempt to create a peer connection
const sslFolder = path.resolve(os.homedir(), ".dig", "ssl");
const certFile = path.join(sslFolder, "public_dig.crt");
const keyFile = path.join(sslFolder, "public_dig.key");
// Attempt to create a peer connection
const sslFolder = path.resolve(os.homedir(), ".dig", "ssl");
const certFile = path.join(sslFolder, "public_dig.crt");
const keyFile = path.join(sslFolder, "public_dig.key");

if (!fs.existsSync(sslFolder)) {
fs.mkdirSync(sslFolder, { recursive: true });
}
if (!fs.existsSync(sslFolder)) {
fs.mkdirSync(sslFolder, { recursive: true });
}

const tls = new Tls(certFile, keyFile);
const tls = new Tls(certFile, keyFile);

let peer: Peer;
try {
peer = await Peer.new(`${ip}:${FULLNODE_PORT}`, false, tls);
} catch (error: any) {
console.error(`Failed to create peer for IP ${ip}: ${error.message}`);
// Add to cooldown
FullNodePeer.cooldownCache.set(ip, true);
// Decrease weight or remove peer
const currentWeight = FullNodePeer.peerWeights.get(ip) || 1;
if (currentWeight > 1) {
FullNodePeer.peerWeights.set(ip, currentWeight - 1);
} else {
FullNodePeer.peerWeights.delete(ip);
}
continue; // Skip adding this peer
}

let peer: Peer;
try {
peer = await Peer.new(`${selectedPeerIP}:${FULLNODE_PORT}`, false, tls);
} catch (error: any) {
console.error(
`Failed to create peer for IP ${selectedPeerIP}: ${error.message}`
);
// Add to cooldown
FullNodePeer.cooldownCache.set(selectedPeerIP, true);
// Decrease weight or remove peer
const currentWeight = FullNodePeer.peerWeights.get(selectedPeerIP) || 1;
if (currentWeight > 1) {
FullNodePeer.peerWeights.set(selectedPeerIP, currentWeight - 1);
// Wrap the peer with proxy to handle errors and retries
const proxiedPeer = FullNodePeer.createPeerProxy(peer, ip);

// Store PeerInfo
FullNodePeer.peerInfos.set(ip, {
peer: proxiedPeer,
weight: FullNodePeer.peerWeights.get(ip) || 1,
address: ip,
isConnected: true, // Mark as connected
limiter, // Assign the limiter
});

// Add to availablePeers
FullNodePeer.availablePeers.push(ip);
} else {
FullNodePeer.peerWeights.delete(selectedPeerIP);
const peerInfo = FullNodePeer.peerInfos.get(ip)!;
if (!peerInfo.isConnected) {
// Peer is back from cooldown, re-establish connection
const sslFolder = path.resolve(os.homedir(), ".dig", "ssl");
const certFile = path.join(sslFolder, "public_dig.crt");
const keyFile = path.join(sslFolder, "public_dig.key");

if (!fs.existsSync(sslFolder)) {
fs.mkdirSync(sslFolder, { recursive: true });
}

const tls = new Tls(certFile, keyFile);

let peer: Peer;
try {
peer = await Peer.new(`${ip}:${FULLNODE_PORT}`, false, tls);
} catch (error: any) {
console.error(`Failed to reconnect peer for IP ${ip}: ${error.message}`);
// Re-add to cooldown
FullNodePeer.cooldownCache.set(ip, true);
// Decrease weight or remove peer
const currentWeight = FullNodePeer.peerWeights.get(ip) || 1;
if (currentWeight > 1) {
FullNodePeer.peerWeights.set(ip, currentWeight - 1);
} else {
FullNodePeer.peerWeights.delete(ip);
}
continue; // Skip adding this peer
}

// Wrap the peer with proxy to handle errors and retries
const proxiedPeer = FullNodePeer.createPeerProxy(peer, ip);

// Update PeerInfo
peerInfo.peer = proxiedPeer;
peerInfo.isConnected = true;

// Add back to availablePeers
FullNodePeer.availablePeers.push(ip);
}
}
throw new Error(`Unable to connect to peer ${selectedPeerIP}`);
}

// Wrap the peer with proxy to handle errors and retries
const proxiedPeer = FullNodePeer.createPeerProxy(peer, selectedPeerIP);
if (FullNodePeer.availablePeers.length === 0) {
throw new Error("No available peers to connect.");
}

// Store PeerInfo
FullNodePeer.peerInfos.set(selectedPeerIP, {
peer: proxiedPeer,
weight: FullNodePeer.peerWeights.get(selectedPeerIP) || 1,
address: selectedPeerIP,
isConnected: true, // Mark as connected
});
// Select the next peer in round-robin
const selectedPeerIP = FullNodePeer.getNextPeerIP();
const selectedPeerInfo = FullNodePeer.peerInfos.get(selectedPeerIP)!;

// Cache the peer
FullNodePeer.cachedPeer = { peer: proxiedPeer, timestamp: now };
FullNodePeer.cachedPeer = { peer: selectedPeerInfo.peer, timestamp: now };

console.log(`Using Fullnode Peer: ${selectedPeerIP}`);

return proxiedPeer;
return selectedPeerInfo.peer;
}

/**
* Creates a proxy for the peer to handle errors, implement retries, and enforce throttling.
* Creates a proxy for the peer to handle errors, implement retries, and enforce per-peer throttling.
* @param {Peer} peer - The Peer instance.
* @param {string} peerIP - The IP address of the peer.
* @param {number} [retryCount=0] - The current retry attempt.
Expand Down Expand Up @@ -455,33 +485,43 @@ export class FullNodePeer {

if (typeof originalMethod === "function") {
return (...args: any[]) => {
// Wrap the method call with Bottleneck's scheduling
return FullNodePeer.limiter.schedule(async () => {
const peerInfo = FullNodePeer.peerInfos.get(peerIP);
// Select the next peer in round-robin
let selectedPeerIP: string;
try {
selectedPeerIP = FullNodePeer.getNextPeerIP();
} catch (error: any) {
return Promise.reject(error);
}

const selectedPeerInfo = FullNodePeer.peerInfos.get(selectedPeerIP)!;

// Schedule the method call via the selected peer's limiter
return selectedPeerInfo.limiter.schedule(async () => {
const peerInfo = FullNodePeer.peerInfos.get(selectedPeerIP);
if (!peerInfo || !peerInfo.isConnected) {
throw new Error(`Cannot perform operation: Peer ${peerIP} is disconnected.`);
throw new Error(`Cannot perform operation: Peer ${selectedPeerIP} is disconnected.`);
}

try {
const result = await originalMethod.apply(target, args);
const result = await originalMethod.apply(peerInfo.peer, args);
// On successful operation, increase the weight slightly
const currentWeight = FullNodePeer.peerWeights.get(peerIP) || 1;
FullNodePeer.peerWeights.set(peerIP, currentWeight + 0.1); // Increment weight
const currentWeight = FullNodePeer.peerWeights.get(selectedPeerIP) || 1;
FullNodePeer.peerWeights.set(selectedPeerIP, currentWeight + 0.1); // Increment weight
return result;
} catch (error: any) {
console.error(`Peer ${peerIP} encountered an error: ${error.message}`);
console.error(`Peer ${selectedPeerIP} encountered an error: ${error.message}`);

// Check if the error is related to WebSocket or Operation timed out
if (
error.message.includes("WebSocket") ||
error.message.includes("Operation timed out")
) {
// Handle the disconnection and mark the peer accordingly
FullNodePeer.handlePeerDisconnection(peerIP);
FullNodePeer.handlePeerDisconnection(selectedPeerIP);

// If maximum retries reached, throw the error
if (retryCount >= MAX_RETRIES) {
console.error(`Max retries reached for method ${String(prop)} on peer ${peerIP}.`);
console.error(`Max retries reached for method ${String(prop)} on peer ${selectedPeerIP}.`);
throw error;
}

Expand Down Expand Up @@ -542,13 +582,25 @@ export class FullNodePeer {
FullNodePeer.peerInfos.set(peerIP, peerInfo);
}

// Remove from availablePeers if present
const index = FullNodePeer.availablePeers.indexOf(peerIP);
if (index !== -1) {
FullNodePeer.availablePeers.splice(index, 1);
// Adjust currentPeerIndex if necessary
if (FullNodePeer.currentPeerIndex >= FullNodePeer.availablePeers.length) {
FullNodePeer.currentPeerIndex = 0;
}
}

// If the disconnected peer was the cached peer, invalidate the cache
if (
FullNodePeer.cachedPeer &&
FullNodePeer.extractPeerIP(FullNodePeer.cachedPeer.peer) === peerIP
) {
FullNodePeer.cachedPeer = null;
}

console.warn(`Peer ${peerIP} has been marked as disconnected and is in cooldown.`);
}

/**
Expand Down

0 comments on commit 5cd3f5a

Please sign in to comment.