Skip to content

Commit

Permalink
FetchRefactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mikecot committed Jan 15, 2025
1 parent 946cc7a commit b169905
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 137 deletions.
2 changes: 1 addition & 1 deletion src/indexer/indexerFillupBlocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ export const FillUpBlocks = async () => {

} catch (error) {
logger.error('fillUp error:', error);
throw error; // Let the BackoffRetry handle the retry
throw error;
}
};

Expand Down
10 changes: 1 addition & 9 deletions src/restRpc/ext/CoinGeko/CoinGekoCache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,7 @@ class CoinGekoCacheClass {

for (let attempt = 1; attempt <= MAX_RETRIES; attempt++) {
try {
const data = await FetchRestData<CoinGeckoRateResponse>(
url,
{},
false,
3, // retries
2, // backoff
1200, // timeout
5000 // delay
);
const data = await FetchRestData<CoinGeckoRateResponse>(url);
return data; // Return the fetched data if successful
} catch (error: unknown) {
const errorMessage = (error instanceof Error) ? error.message : 'Unknown error';
Expand Down
139 changes: 58 additions & 81 deletions src/restRpc/fetch.ts
Original file line number Diff line number Diff line change
@@ -1,98 +1,75 @@
import { logger } from "@jsinfo/utils/logger";
import { BackoffRetry } from "@jsinfo/utils/retry";
import axios, { AxiosRequestConfig } from "axios";

const activeFetches: Record<string, Promise<any>> = {};
const RATE_LIMIT_DELAY = 60000; // 1 minute in milliseconds
const rateDelayCache = new Map<string, number>();

export async function FetchRestData<T>(
const HTTP_RETRY_CODES = {
429: { delay: 60000, message: 'Rate Limited' },
500: { delay: 5000, message: 'Internal Server Error' },
502: { delay: 5000, message: 'Bad Gateway' },
503: { delay: 5000, message: 'Service Unavailable' },
504: { delay: 5000, message: 'Gateway Timeout' }
};

async function doFetch<T>(
url: string,
options: RequestInit = {},
skipBackoff: boolean = false,
retries: number = 8,
factor: number = 2,
minTimeout: number = 1000,
maxTimeout: number = 5000
options: AxiosRequestConfig = {},
maxRetries: number = 8,
retryDelay: number = 500,
timeout: number = 30000
): Promise<T> {
// Check if we need to wait due to previous rate limit
const lastRateLimit = rateDelayCache.get(url);
if (lastRateLimit) {
const timeToWait = lastRateLimit - Date.now();
if (timeToWait > 0) {
logger.info(`Rate limit cooling down for URL: ${url}, waiting ${timeToWait}ms`);
await new Promise(resolve => setTimeout(resolve, timeToWait));
}
rateDelayCache.delete(url);
}

if (url in activeFetches) {
return activeFetches[url] as Promise<T>;
}
let attempt = 0;

const fetchFunc = async () => {
while (attempt < maxRetries) {
try {
const maxRetries = 3; // Define the maximum number of retries
let attempt = 0; // Initialize the attempt counter

while (attempt < maxRetries) {
try {
const lastRateLimit = rateDelayCache.get(url);
if (lastRateLimit) {
const timeToWait = lastRateLimit - Date.now();
if (timeToWait > 0) {
logger.info(`Rate limit cooling down for URL: ${url}, waiting ${timeToWait}ms`);
await new Promise(resolve => setTimeout(resolve, timeToWait));
}
rateDelayCache.delete(url);
}

const response = await fetch(url, options);

// Handle rate limit (429) specifically
if (response.status === 429) {
logger.warn(`Rate limit hit for ${url}, waiting 60 seconds before retry`);
rateDelayCache.set(url, Date.now() + RATE_LIMIT_DELAY);
await new Promise(resolve => setTimeout(resolve, RATE_LIMIT_DELAY));
attempt++; // Increment the attempt counter
logger.info(`Retrying fetch for ${url} (Attempt ${attempt})...`);
continue; // Retry the fetch
}

// Check for other response errors
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const response = await axios({
url,
timeout,
validateStatus: null,
...options
});

return await response.json() as T; // Return the JSON response if successful
} catch (error) {
logger.error(`Error fetching data from ${url}:`, error);
if (attempt === maxRetries - 1) {
throw error; // Rethrow the error if max retries reached
}
attempt++; // Increment the attempt counter
logger.info(`Retrying fetch for ${url} (Attempt ${attempt})...`);
await new Promise(resolve => setTimeout(resolve, 1000)); // Wait before retrying
if (response.status !== 200) {
const retryConfig = HTTP_RETRY_CODES[response.status];
if (retryConfig) {
logger.warn(`${retryConfig.message} (${response.status}) for ${url}, waiting ${retryConfig.delay / 1000}s before retry`);
await new Promise(resolve => setTimeout(resolve, retryConfig.delay));
attempt++;
logger.info(`Retrying fetch for ${url} (Attempt ${attempt}/${maxRetries})...`);
continue;
}

throw new Error(`HTTP ${response.status} for ${url}\nResponse: ${JSON.stringify(response.data).slice(0, 200)}`);
}
} finally {
delete activeFetches[url];
}
};

// Check for rate limit before making request
const lastRateLimit2 = rateDelayCache.get(url);
if (lastRateLimit2) {
const timeToWait = lastRateLimit2 + RATE_LIMIT_DELAY - Date.now();
if (timeToWait > 0) {
logger.info(`Rate limit cooling down for URL: ${url}, waiting ${timeToWait}ms`);
await new Promise(resolve => setTimeout(resolve, timeToWait));
rateDelayCache.delete(url);
return response.data;
} catch (error) {
if (attempt === maxRetries - 1) throw error;
attempt++;
logger.error(`Error fetching data from ${url}:`, axios.isAxiosError(error) ? error.message : error);
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
throw new Error(`Max retries (${maxRetries}) exceeded for ${url}`);
}

export async function FetchRestData<T>(
url: string,
options: AxiosRequestConfig = {},
maxRetries?: number,
retryDelay?: number,
timeout?: number
): Promise<T> {
if (url in activeFetches) {
return activeFetches[url] as Promise<T>;
}

const promise = skipBackoff ?
fetchFunc() :
BackoffRetry(`FetchRestData: ${url}`, fetchFunc, retries, factor, minTimeout, maxTimeout);
const promise = doFetch<T>(url, options, maxRetries, retryDelay, timeout);
activeFetches[url] = promise;
return await promise as T;

try {
return await promise;
} finally {
delete activeFetches[url];
}
}
4 changes: 2 additions & 2 deletions src/restRpc/lavaRpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ const LavaRPCBaseUrl = (() => {
return url;
})();

export async function QueryLavaRPC<T>(path: string, skipBackoff: boolean = false): Promise<T> {
export async function QueryLavaRPC<T>(path: string): Promise<T> {
if (LavaRPCBaseUrl.endsWith('/') && path.startsWith('/')) {
path = path.slice(1);
}
const url = `${LavaRPCBaseUrl}${path}`;
return FetchRestData<T>(url, {}, skipBackoff);
return FetchRestData<T>(url);
}

44 changes: 0 additions & 44 deletions src/utils/retry.ts

This file was deleted.

0 comments on commit b169905

Please sign in to comment.