diff --git a/src/services/providers/provider-sources/load-balance-provider.ts b/src/services/providers/provider-sources/load-balance-provider.ts index 259dc3a1..642ddf41 100644 --- a/src/services/providers/provider-sources/load-balance-provider.ts +++ b/src/services/providers/provider-sources/load-balance-provider.ts @@ -3,6 +3,7 @@ import { IProviderSource } from '../types'; import { chainsUnion } from '@chains'; import { createTransport, EIP1193RequestFn, Transport } from 'viem'; import ms from 'ms'; +import { timeoutPromise } from '@shared/timeouts'; export type LoadBalanceProviderSourceConfig = { minSuccessRate?: number; @@ -10,6 +11,10 @@ export type LoadBalanceProviderSourceConfig = { maxAttempts?: number; maxConcurrent?: number; samplesTtl?: TimeString; + timeout?: { + default?: TimeString; + byMethod?: Record; + }; }; export class LoadBalanceProviderSource implements IProviderSource { constructor(private readonly sources: IProviderSource[], private readonly config: LoadBalanceProviderSourceConfig | undefined) { @@ -37,74 +42,84 @@ const DEFAULT_CONFIG = { } satisfies LoadBalanceProviderSourceConfig; function loadBalance(transports_: readonly Transport[], config: LoadBalanceProviderSourceConfig = {}): Transport { - const { minSuccessRate, minSamples, maxAttempts, maxConcurrent, samplesTtl } = { ...DEFAULT_CONFIG, ...config }; + const { minSuccessRate, minSamples, maxAttempts, maxConcurrent, samplesTtl, timeout } = { ...DEFAULT_CONFIG, ...config }; - return ({ chain, timeout, ...rest }) => { + return ({ chain, timeout: transportTimeout, ...rest }) => { + const defaultTimeout = timeout?.default ? ms(timeout.default) : transportTimeout; + const timeoutsByMethod = timeout?.byMethod + ? Object.fromEntries(Object.entries(timeout.byMethod).map(([method, timeout]) => [method, ms(timeout)])) + : {}; const transports = transports_ - .map((t) => t({ chain, timeout, ...rest })) + .map((t) => t({ chain, timeout: defaultTimeout, ...rest })) .map((transport) => new TransportInstance(transport, { samplesTtl })); - return createTransport({ - key: 'load-balance', - name: 'Load Balancing', - type: 'load-balance', - async request({ method, ...params }): Promise { - const availableTransports: Record = Object.fromEntries( - transports.map((transport, index) => [`${index}`, transport]) - ); - const errors: any[] = []; - let attempts = 0; - - while (!maxAttempts || attempts < maxAttempts) { - const filteredTransports = Object.entries(availableTransports) - .map(([id, transport]) => ({ transport, id, metrics: transport.metrics(method) })) - .filter(({ metrics }) => metrics.samples.length < minSamples || calculateSuccessRate(metrics) > minSuccessRate); - - if (filteredTransports.length === 0) { - break; // No transports available - } - let toExecute: { transport: TransportInstance; id: string }[]; - const transportsWithSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length > 0); - const transportsWithoutSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length === 0); - - if (transportsWithSamples.length > 0) { - // If there are some transports with samples, then find the best among them - const bestTransport = transportsWithSamples.reduce((best, current) => - calculateScore(current.metrics) > calculateScore(best.metrics) ? current : best - ); - // We will execute the best transport together with all transports that have no samples. We do this because we don't know if those transports are good or bad - // and we will take this opportunity to gather some samples from them - toExecute = [bestTransport, ...transportsWithoutSamples]; - } else { - // If there are no transports with samples, then we will execute all transports. We will return one that succeeds first and add some samples for the others at the same time - toExecute = transportsWithoutSamples; - } + const request: EIP1193RequestFn = async ({ method, ...params }: { method: string }): Promise => { + const availableTransports: Record = Object.fromEntries( + transports.map((transport, index) => [`${index}`, transport]) + ); + const errors: any[] = []; + let attempts = 0; - if (maxAttempts || maxConcurrent > 0) { - // If we have a limit on the number of attempts, we will execute only the number of transports that we can afford - const attemptsLeft = maxAttempts ? maxAttempts - attempts : Infinity; - toExecute = toExecute.slice(0, Math.min(attemptsLeft, maxConcurrent)); - } + while (!maxAttempts || attempts < maxAttempts) { + const filteredTransports = Object.entries(availableTransports) + .map(([id, transport]) => ({ transport, id, metrics: transport.metrics(method) })) + .filter(({ metrics }) => metrics.samples.length < minSamples || calculateSuccessRate(metrics) > minSuccessRate); - try { - return await Promise.any(toExecute.map(({ transport }) => transport.request({ method, ...params }))); - } catch (error: any) { - // Consider all transports used as attempts - attempts += toExecute.length; - - // Remove executed transports from the list of available transports - toExecute.forEach(({ id }) => delete availableTransports[id]); - - // Remember error - if (error instanceof AggregateError) { - errors.push(...error.errors); - } else { - errors.push(error); - } + if (filteredTransports.length === 0) { + break; // No transports available + } + + let toExecute: { transport: TransportInstance; id: string }[]; + const transportsWithSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length > 0); + const transportsWithoutSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length === 0); + + if (transportsWithSamples.length > 0) { + // If there are some transports with samples, then find the best among them + const bestTransport = transportsWithSamples.reduce((best, current) => + calculateScore(current.metrics) > calculateScore(best.metrics) ? current : best + ); + // We will execute the best transport together with all transports that have no samples. We do this because we don't know if those transports are good or bad + // and we will take this opportunity to gather some samples from them + toExecute = [bestTransport, ...transportsWithoutSamples]; + } else { + // If there are no transports with samples, then we will execute all transports. We will return one that succeeds first and add some samples for the others at the same time + toExecute = transportsWithoutSamples; + } + + if (maxAttempts || maxConcurrent > 0) { + // If we have a limit on the number of attempts, we will execute only the number of transports that we can afford + const attemptsLeft = maxAttempts ? maxAttempts - attempts : Infinity; + toExecute = toExecute.slice(0, Math.min(attemptsLeft, maxConcurrent)); + } + + try { + return await Promise.any(toExecute.map(({ transport }) => transport.request({ method, ...params }))); + } catch (error: any) { + // Consider all transports used as attempts + attempts += toExecute.length; + + // Remove executed transports from the list of available transports + toExecute.forEach(({ id }) => delete availableTransports[id]); + + // Remember error + if (error instanceof AggregateError) { + errors.push(...error.errors); + } else { + errors.push(error); } } + } - throw errors.length > 0 ? new AggregateError(errors) : new Error('Failed to find a transport to execute the request'); // No transports available + throw errors.length > 0 ? new AggregateError(errors) : new Error('Failed to find a transport to execute the request'); // No transports available + }; + + return createTransport({ + key: 'load-balance', + name: 'Load Balancing', + type: 'load-balance', + async request({ method, ...params }): Promise { + const timeout: number | undefined = timeoutsByMethod[method] ?? defaultTimeout; + return timeoutPromise(request({ method, ...params }), timeout); }, }); }; diff --git a/src/shared/timeouts.ts b/src/shared/timeouts.ts index 8ab6e1a5..10255b1b 100644 --- a/src/shared/timeouts.ts +++ b/src/shared/timeouts.ts @@ -2,23 +2,24 @@ import { TimeString } from '@types'; import ms from 'ms'; export class TimeoutError extends Error { - constructor(description: string, timeout: TimeString) { - super(`${description} timeouted at ${timeout}`); + constructor(description: string, timeout: TimeString | number) { + super(`${description} timeouted at ${typeof timeout === 'number' ? `${timeout}ms` : timeout}`); } } export function timeoutPromise( promise: Promise, - timeout: TimeString | undefined, - options?: { reduceBy?: TimeString; description?: string; onTimeout?: (timeout: TimeString) => void } + timeout: TimeString | number | undefined, + options?: { reduceBy?: TimeString; description?: string; onTimeout?: (timeout: TimeString | number) => void } ) { if (!timeout) return promise; const realTimeout = options?.reduceBy ? reduceTimeout(timeout, options.reduceBy) : timeout; + const timeoutMs = typeof realTimeout === 'number' ? realTimeout : ms(realTimeout); return new Promise((resolve, reject) => { const timer = setTimeout(() => { options?.onTimeout?.(realTimeout); reject(new TimeoutError(options?.description ?? 'Promise', timeout)); - }, ms(realTimeout)); + }, timeoutMs); promise .then(resolve) .catch(reject) @@ -26,9 +27,9 @@ export function timeoutPromise( }); } -export function reduceTimeout(timeout: T, reduceBy: TimeString): T { +export function reduceTimeout(timeout: T, reduceBy: TimeString): T { if (!timeout) return undefined as T; - const millisTimeout = ms(timeout); + const millisTimeout = typeof timeout === 'number' ? timeout : ms(timeout); const millisToTakeOut = ms(reduceBy); return millisTimeout > millisToTakeOut ? ((millisTimeout - millisToTakeOut).toString() as T)