Skip to content

Commit

Permalink
feat: export agent pool stats (#481)
Browse files Browse the repository at this point in the history
fengmk2 authored Dec 21, 2023
1 parent 5bb3a62 commit 5f9be29
Showing 5 changed files with 108 additions and 33 deletions.
41 changes: 39 additions & 2 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
@@ -22,7 +22,9 @@ import {
Dispatcher,
Agent,
getGlobalDispatcher,
Pool,
} from 'undici';
import undiciSymbols from 'undici/lib/core/symbols.js';
import { FormData as FormDataNode } from 'formdata-node';
import { FormDataEncoder } from 'form-data-encoder';
import createUserAgent from 'default-user-agent';
@@ -136,7 +138,7 @@ function defaultIsRetry(response: HttpClientResponse) {
return response.status >= 500;
}

type RequestContext = {
export type RequestContext = {
retries: number;
socketErrorRetries: number;
requestStartTime?: number;
@@ -157,6 +159,20 @@ export type ResponseDiagnosticsMessage = {
error?: Error;
};

export interface PoolStat {
/** Number of open socket connections in this pool. */
connected: number;
/** Number of open socket connections in this pool that do not have an active request. */
free: number;
/** Number of pending requests across all clients in this pool. */
pending: number;
/** Number of queued requests across all clients in this pool. */
queued: number;
/** Number of currently active requests across all clients in this pool. */
running: number;
/** Number of active, pending, or queued requests across all clients in this pool. */
size: number;
}

export class HttpClient extends EventEmitter {
#defaultArgs?: RequestOptions;
@@ -187,11 +203,32 @@ export class HttpClient extends EventEmitter {
this.#dispatcher = dispatcher;
}

getDispatcherPoolStats() {
const agent = this.getDispatcher();
// origin => Pool Instance
const clients: Map<string, WeakRef<Pool>> = agent[undiciSymbols.kClients];
const poolStatsMap: Record<string, PoolStat> = {};
for (const [ key, ref ] of clients) {
const pool = ref.deref();
const stats = pool?.stats;
if (!stats) continue;
poolStatsMap[key] = {
connected: stats.connected,
free: stats.free,
pending: stats.pending,
queued: stats.queued,
running: stats.running,
size: stats.size,
} satisfies PoolStat;
}
return poolStatsMap;
}

async request<T = any>(url: RequestURL, options?: RequestOptions) {
return await this.#requestInternal<T>(url, options);
}

// alias to request, keep compatible with urlib@2 HttpClient.curl
// alias to request, keep compatible with urllib@2 HttpClient.curl
async curl<T = any>(url: RequestURL, options?: RequestOptions) {
return await this.request<T>(url, options);
}
24 changes: 14 additions & 10 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -2,25 +2,29 @@ import LRU from 'ylru';
import { HttpClient, HEADER_USER_AGENT } from './HttpClient.js';
import { RequestOptions, RequestURL } from './Request.js';

let httpclient: HttpClient;
const domainSocketHttpclients = new LRU(50);
let httpClient: HttpClient;
const domainSocketHttpClients = new LRU(50);

export function getDefaultHttpClient(): HttpClient {
if (!httpClient) {
httpClient = new HttpClient();
}
return httpClient;
}

export async function request<T = any>(url: RequestURL, options?: RequestOptions) {
if (options?.socketPath) {
let domainSocketHttpclient = domainSocketHttpclients.get<HttpClient>(options.socketPath);
let domainSocketHttpclient = domainSocketHttpClients.get<HttpClient>(options.socketPath);
if (!domainSocketHttpclient) {
domainSocketHttpclient = new HttpClient({
connect: { socketPath: options.socketPath },
});
domainSocketHttpclients.set(options.socketPath, domainSocketHttpclient);
domainSocketHttpClients.set(options.socketPath, domainSocketHttpclient);
}
return await domainSocketHttpclient.request<T>(url, options);
}

if (!httpclient) {
httpclient = new HttpClient({});
}
return await httpclient.request<T>(url, options);
return await getDefaultHttpClient().request<T>(url, options);
}

// export curl method is keep compatible with urllib.curl()
@@ -36,12 +40,12 @@ export {
MockAgent, ProxyAgent, Agent, Dispatcher,
setGlobalDispatcher, getGlobalDispatcher,
} from 'undici';
// HttpClient2 is keep compatible with urlib@2 HttpClient2
// HttpClient2 is keep compatible with urllib@2 HttpClient2
export {
HttpClient, HttpClient as HttpClient2, HEADER_USER_AGENT as USER_AGENT,
RequestDiagnosticsMessage, ResponseDiagnosticsMessage,
} from './HttpClient.js';
// RequestOptions2 is keep compatible with urlib@2 RequestOptions2
// RequestOptions2 is keep compatible with urllib@2 RequestOptions2
export {
RequestOptions, RequestOptions as RequestOptions2, RequestURL, HttpMethod,
FixJSONCtlCharsHandler, FixJSONCtlChars,
3 changes: 2 additions & 1 deletion test/esm/index.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { strict as assert } from 'assert';
import * as urllibStar from 'urllib';
import urllib from 'urllib';
import { request, HttpClient, USER_AGENT } from 'urllib';
import { request, HttpClient, USER_AGENT, getDefaultHttpClient } from 'urllib';

console.log(urllibStar);
console.log(urllibStar.request, urllibStar.HttpClient);
console.log(urllibStar.request, urllibStar.HttpClient);
console.log(urllibStar.USER_AGENT, urllib.USER_AGENT, USER_AGENT);
console.log(request, HttpClient);
console.log('stats %o', getDefaultHttpClient().getDispatcherPoolStats());

assert(urllibStar);
assert.equal(typeof urllibStar.request, 'function');
15 changes: 14 additions & 1 deletion test/index.test.ts
Original file line number Diff line number Diff line change
@@ -2,7 +2,7 @@ import { strict as assert } from 'node:assert';
import { parse as urlparse } from 'node:url';
import { readFileSync } from 'node:fs';
import { describe, it, beforeAll, afterAll, afterEach, beforeEach } from 'vitest';
import urllib, { HttpClient } from '../src';
import urllib, { HttpClient, getDefaultHttpClient } from '../src';
import { MockAgent, setGlobalDispatcher, getGlobalDispatcher } from '../src';
import { startServer } from './fixtures/server';
import { readableToBytes } from './utils';
@@ -20,6 +20,19 @@ describe('index.test.ts', () => {
await close();
});

describe('getDefaultHttpClient()', () => {
it('should work', async () => {
const response = await getDefaultHttpClient().request(`${_url}html`);
assert.equal(response.status, 200);
assert.equal(response.headers['content-type'], 'text/html');
assert(response.headers.date);
assert.equal(response.url, `${_url}html`);
assert(!response.redirected);
assert.equal(getDefaultHttpClient(), getDefaultHttpClient());
console.log('stats %o', getDefaultHttpClient().getDispatcherPoolStats());
});
});

describe('urllib.request()', () => {
it('should work', async () => {
const response = await urllib.request(`${_url}html`);
58 changes: 39 additions & 19 deletions test/keep-alive-header.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { strict as assert } from 'node:assert';
import { describe, it, beforeAll, afterAll } from 'vitest';
import urllib from '../src';
import { HttpClient } from '../src';
import { startServer } from './fixtures/server';
import { sleep } from './utils';

describe('keep-alive-header.test.ts', () => {
// should shorter than server keepalive timeout
// https://zhuanlan.zhihu.com/p/34147188
const keepAliveTimeout = 2000;
const httpClient = new HttpClient();
let close: any;
let _url: string;
beforeAll(async () => {
@@ -25,97 +26,116 @@ describe('keep-alive-header.test.ts', () => {
const max = process.env.TEST_KEEPALIVE_COUNT ? parseInt(process.env.TEST_KEEPALIVE_COUNT) : 3;
let otherSideClosed = 0;
let readECONNRESET = 0;
const origin = _url.substring(0, _url.length - 1);
while (count < max) {
count++;
try {
let response = await urllib.request(_url);
const task = httpClient.request(_url);
console.log('after request stats: %o', httpClient.getDispatcherPoolStats());
assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 1);
assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 1);
let response = await task;
console.log('after response stats: %o', httpClient.getDispatcherPoolStats());
assert.equal(httpClient.getDispatcherPoolStats()[origin].pending, 0);
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 1);
// console.log(response.res.socket);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
assert(parseInt(response.headers['x-requests-persocket'] as string) > 1);
await sleep(keepAliveTimeout / 2);
response = await urllib.request(_url);
response = await httpClient.request(_url);
// console.log(response.res.socket);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
response = await urllib.request(_url);
response = await httpClient.request(_url);
assert.equal(response.status, 200);
// console.log(response.headers);
assert.equal(response.headers.connection, 'keep-alive');
assert.equal(response.headers['keep-alive'], 'timeout=2');
assert(parseInt(response.headers['x-requests-persocket'] as string) > 1);
console.log('before sleep stats: %o', httpClient.getDispatcherPoolStats());
// { connected: 2, free: 1, pending: 0, queued: 0, running: 0, size: 0 }
assert.equal(httpClient.getDispatcherPoolStats()[origin].connected, 2);
assert.equal(httpClient.getDispatcherPoolStats()[origin].free, 1);
await sleep(keepAliveTimeout);
console.log('after sleep stats: %o', httpClient.getDispatcherPoolStats());
// { connected: 0, free: 0, pending: 0, queued: 0, running: 0, size: 0 }
// { connected: 1, free: 1, pending: 0, queued: 0, running: 0, size: 0 }
// { connected: 2, free: 2, pending: 0, queued: 0, running: 0, size: 0 }
assert(httpClient.getDispatcherPoolStats()[origin].connected <= 2);
assert(httpClient.getDispatcherPoolStats()[origin].free <= 2);
assert.equal(httpClient.getDispatcherPoolStats()[origin].size, 0);
} catch (err) {
if (err.message === 'other side closed') {
console.log(err);

0 comments on commit 5f9be29

Please sign in to comment.