Skip to content

Commit

Permalink
feat: add timeout to load balancer provider (#606)
Browse files Browse the repository at this point in the history
  • Loading branch information
0xsambugs authored Nov 7, 2024
1 parent 8d97045 commit 57f9eac
Show file tree
Hide file tree
Showing 2 changed files with 82 additions and 66 deletions.
133 changes: 74 additions & 59 deletions src/services/providers/provider-sources/load-balance-provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,18 @@ import { IProviderSource } from '../types';
import { chainsUnion } from '@chains';
import { createTransport, EIP1193RequestFn, Transport } from 'viem';
import ms from 'ms';
import { timeoutPromise } from '@shared/timeouts';

export type LoadBalanceProviderSourceConfig = {
minSuccessRate?: number;
minSamples?: number;
maxAttempts?: number;
maxConcurrent?: number;
samplesTtl?: TimeString;
timeout?: {
default?: TimeString;
byMethod?: Record<string, TimeString>;
};
};
export class LoadBalanceProviderSource implements IProviderSource {
constructor(private readonly sources: IProviderSource[], private readonly config: LoadBalanceProviderSourceConfig | undefined) {
Expand Down Expand Up @@ -37,74 +42,84 @@ const DEFAULT_CONFIG = {
} satisfies LoadBalanceProviderSourceConfig;

function loadBalance(transports_: readonly Transport[], config: LoadBalanceProviderSourceConfig = {}): Transport {
const { minSuccessRate, minSamples, maxAttempts, maxConcurrent, samplesTtl } = { ...DEFAULT_CONFIG, ...config };
const { minSuccessRate, minSamples, maxAttempts, maxConcurrent, samplesTtl, timeout } = { ...DEFAULT_CONFIG, ...config };

return ({ chain, timeout, ...rest }) => {
return ({ chain, timeout: transportTimeout, ...rest }) => {
const defaultTimeout = timeout?.default ? ms(timeout.default) : transportTimeout;
const timeoutsByMethod = timeout?.byMethod
? Object.fromEntries(Object.entries(timeout.byMethod).map(([method, timeout]) => [method, ms(timeout)]))
: {};
const transports = transports_
.map((t) => t({ chain, timeout, ...rest }))
.map((t) => t({ chain, timeout: defaultTimeout, ...rest }))
.map((transport) => new TransportInstance(transport, { samplesTtl }));
return createTransport({
key: 'load-balance',
name: 'Load Balancing',
type: 'load-balance',
async request({ method, ...params }): Promise<any> {
const availableTransports: Record<string, TransportInstance> = Object.fromEntries(
transports.map((transport, index) => [`${index}`, transport])
);
const errors: any[] = [];
let attempts = 0;

while (!maxAttempts || attempts < maxAttempts) {
const filteredTransports = Object.entries(availableTransports)
.map(([id, transport]) => ({ transport, id, metrics: transport.metrics(method) }))
.filter(({ metrics }) => metrics.samples.length < minSamples || calculateSuccessRate(metrics) > minSuccessRate);

if (filteredTransports.length === 0) {
break; // No transports available
}

let toExecute: { transport: TransportInstance; id: string }[];
const transportsWithSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length > 0);
const transportsWithoutSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length === 0);

if (transportsWithSamples.length > 0) {
// If there are some transports with samples, then find the best among them
const bestTransport = transportsWithSamples.reduce((best, current) =>
calculateScore(current.metrics) > calculateScore(best.metrics) ? current : best
);
// We will execute the best transport together with all transports that have no samples. We do this because we don't know if those transports are good or bad
// and we will take this opportunity to gather some samples from them
toExecute = [bestTransport, ...transportsWithoutSamples];
} else {
// If there are no transports with samples, then we will execute all transports. We will return one that succeeds first and add some samples for the others at the same time
toExecute = transportsWithoutSamples;
}
const request: EIP1193RequestFn = async ({ method, ...params }: { method: string }): Promise<any> => {
const availableTransports: Record<string, TransportInstance> = Object.fromEntries(
transports.map((transport, index) => [`${index}`, transport])
);
const errors: any[] = [];
let attempts = 0;

if (maxAttempts || maxConcurrent > 0) {
// If we have a limit on the number of attempts, we will execute only the number of transports that we can afford
const attemptsLeft = maxAttempts ? maxAttempts - attempts : Infinity;
toExecute = toExecute.slice(0, Math.min(attemptsLeft, maxConcurrent));
}
while (!maxAttempts || attempts < maxAttempts) {
const filteredTransports = Object.entries(availableTransports)
.map(([id, transport]) => ({ transport, id, metrics: transport.metrics(method) }))
.filter(({ metrics }) => metrics.samples.length < minSamples || calculateSuccessRate(metrics) > minSuccessRate);

try {
return await Promise.any(toExecute.map(({ transport }) => transport.request({ method, ...params })));
} catch (error: any) {
// Consider all transports used as attempts
attempts += toExecute.length;

// Remove executed transports from the list of available transports
toExecute.forEach(({ id }) => delete availableTransports[id]);

// Remember error
if (error instanceof AggregateError) {
errors.push(...error.errors);
} else {
errors.push(error);
}
if (filteredTransports.length === 0) {
break; // No transports available
}

let toExecute: { transport: TransportInstance; id: string }[];
const transportsWithSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length > 0);
const transportsWithoutSamples = filteredTransports.filter(({ metrics }) => metrics.samples.length === 0);

if (transportsWithSamples.length > 0) {
// If there are some transports with samples, then find the best among them
const bestTransport = transportsWithSamples.reduce((best, current) =>
calculateScore(current.metrics) > calculateScore(best.metrics) ? current : best
);
// We will execute the best transport together with all transports that have no samples. We do this because we don't know if those transports are good or bad
// and we will take this opportunity to gather some samples from them
toExecute = [bestTransport, ...transportsWithoutSamples];
} else {
// If there are no transports with samples, then we will execute all transports. We will return one that succeeds first and add some samples for the others at the same time
toExecute = transportsWithoutSamples;
}

if (maxAttempts || maxConcurrent > 0) {
// If we have a limit on the number of attempts, we will execute only the number of transports that we can afford
const attemptsLeft = maxAttempts ? maxAttempts - attempts : Infinity;
toExecute = toExecute.slice(0, Math.min(attemptsLeft, maxConcurrent));
}

try {
return await Promise.any(toExecute.map(({ transport }) => transport.request({ method, ...params })));
} catch (error: any) {
// Consider all transports used as attempts
attempts += toExecute.length;

// Remove executed transports from the list of available transports
toExecute.forEach(({ id }) => delete availableTransports[id]);

// Remember error
if (error instanceof AggregateError) {
errors.push(...error.errors);
} else {
errors.push(error);
}
}
}

throw errors.length > 0 ? new AggregateError(errors) : new Error('Failed to find a transport to execute the request'); // No transports available
throw errors.length > 0 ? new AggregateError(errors) : new Error('Failed to find a transport to execute the request'); // No transports available
};

return createTransport({
key: 'load-balance',
name: 'Load Balancing',
type: 'load-balance',
async request({ method, ...params }): Promise<any> {
const timeout: number | undefined = timeoutsByMethod[method] ?? defaultTimeout;
return timeoutPromise(request({ method, ...params }), timeout);
},
});
};
Expand Down
15 changes: 8 additions & 7 deletions src/shared/timeouts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,33 +2,34 @@ import { TimeString } from '@types';
import ms from 'ms';

export class TimeoutError extends Error {
constructor(description: string, timeout: TimeString) {
super(`${description} timeouted at ${timeout}`);
constructor(description: string, timeout: TimeString | number) {
super(`${description} timeouted at ${typeof timeout === 'number' ? `${timeout}ms` : timeout}`);
}
}

export function timeoutPromise<T>(
promise: Promise<T>,
timeout: TimeString | undefined,
options?: { reduceBy?: TimeString; description?: string; onTimeout?: (timeout: TimeString) => void }
timeout: TimeString | number | undefined,
options?: { reduceBy?: TimeString; description?: string; onTimeout?: (timeout: TimeString | number) => void }
) {
if (!timeout) return promise;
const realTimeout = options?.reduceBy ? reduceTimeout(timeout, options.reduceBy) : timeout;
const timeoutMs = typeof realTimeout === 'number' ? realTimeout : ms(realTimeout);
return new Promise<T>((resolve, reject) => {
const timer = setTimeout(() => {
options?.onTimeout?.(realTimeout);
reject(new TimeoutError(options?.description ?? 'Promise', timeout));
}, ms(realTimeout));
}, timeoutMs);
promise
.then(resolve)
.catch(reject)
.finally(() => clearTimeout(timer));
});
}

export function reduceTimeout<T extends TimeString | undefined>(timeout: T, reduceBy: TimeString): T {
export function reduceTimeout<T extends TimeString | number | undefined>(timeout: T, reduceBy: TimeString): T {
if (!timeout) return undefined as T;
const millisTimeout = ms(timeout);
const millisTimeout = typeof timeout === 'number' ? timeout : ms(timeout);
const millisToTakeOut = ms(reduceBy);
return millisTimeout > millisToTakeOut
? ((millisTimeout - millisToTakeOut).toString() as T)
Expand Down

0 comments on commit 57f9eac

Please sign in to comment.