Skip to content

Commit

Permalink
fix: should got undici:client:sendHeaders message on H2 (#553)
Browse files Browse the repository at this point in the history
closes #510

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced error handling and logging for HTTP requests, improving
traceability of socket-related issues.
- Augmented response diagnostics to include socket information for
better monitoring.

- **Bug Fixes**
- Improved resilience against transient socket issues with refined error
handling and retry logic.

- **Tests**
- Introduced a new test case for tracing socket information with HTTP/2,
ensuring accurate socket reuse tracking.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
  • Loading branch information
fengmk2 authored Dec 17, 2024
1 parent 333f3b8 commit bd19f6d
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 7 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
"mime-types": "^2.1.35",
"qs": "^6.12.1",
"type-fest": "^4.20.1",
"undici": "^7.0.0",
"undici": "^7.1.1",
"ylru": "^2.0.0"
},
"devDependencies": {
Expand Down
4 changes: 2 additions & 2 deletions src/HttpClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -681,8 +681,8 @@ export class HttpClient extends EventEmitter {
res,
};

debug('Request#%d got response, status: %s, headers: %j, timing: %j',
requestId, res.status, res.headers, res.timing);
debug('Request#%d got response, status: %s, headers: %j, timing: %j, socket: %j',
requestId, res.status, res.headers, res.timing, res.socket);

if (args.retry > 0 && requestContext.retries < args.retry) {
const isRetry = args.isRetry ?? defaultIsRetry;
Expand Down
6 changes: 3 additions & 3 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,13 +173,13 @@ export function updateSocketInfo(socketInfo: SocketInfo, internalOpaque: any, er
socketInfo.remotePort = socket.remotePort;
socketInfo.remoteFamily = socket.remoteFamily;
}
if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) {
socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses;
}
socketInfo.bytesRead = socket.bytesRead;
socketInfo.bytesWritten = socket.bytesWritten;
if (socket[symbols.kSocketConnectErrorTime]) {
socketInfo.connectErrorTime = socket[symbols.kSocketConnectErrorTime];
if (Array.isArray(socket.autoSelectFamilyAttemptedAddresses)) {
socketInfo.attemptedRemoteAddresses = socket.autoSelectFamilyAttemptedAddresses;
}
socketInfo.connectProtocol = socket[symbols.kSocketConnectProtocol];
socketInfo.connectHost = socket[symbols.kSocketConnectHost];
socketInfo.connectPort = socket[symbols.kSocketConnectPort];
Expand Down
143 changes: 142 additions & 1 deletion test/diagnostics_channel.test.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import { strict as assert } from 'node:assert';
import diagnosticsChannel from 'node:diagnostics_channel';
import { setTimeout as sleep } from 'node:timers/promises';
import { createSecureServer } from 'node:http2';
import { once } from 'node:events';
import { describe, it, beforeEach, afterEach } from 'vitest';
import urllib from '../src/index.js';
import selfsigned from 'selfsigned';
import urllib, { HttpClient } from '../src/index.js';
import type {
RequestDiagnosticsMessage,
ResponseDiagnosticsMessage,
Expand Down Expand Up @@ -138,6 +141,144 @@ describe('diagnostics_channel.test.ts', () => {
diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage);
});

it('should support trace socket info with H2 by undici:client:sendHeaders and undici:request:trailers', async () => {
const pem = selfsigned.generate();
const server = createSecureServer({
key: pem.private,
cert: pem.cert,
});
server.on('stream', (stream, headers) => {
stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
':status': 200,
});
if (headers[':method'] !== 'HEAD') {
stream.end('hello h2!');
}
});

server.listen(0);
await once(server, 'listening');

const kRequests = Symbol('requests');
let lastRequestOpaque: any;
let kHandler: any;
function onMessage(message: any, name: string | symbol) {
if (name === 'undici:client:connected') {
// console.log('%s %j', name, message.connectParams);
message.socket[kRequests] = 0;
return;
}
const { request, socket } = message;
if (!kHandler) {
const symbols = Object.getOwnPropertySymbols(request);
for (const symbol of symbols) {
if (symbol.description === 'handler') {
kHandler = symbol;
break;
}
}
}
const handler = request[kHandler];
let opaque = handler.opaque || handler.opts?.opaque;
assert(opaque);
opaque = opaque[symbols.kRequestOriginalOpaque];
if (opaque && name === 'undici:client:sendHeaders' && socket) {
socket[kRequests]++;
opaque.tracer.socket = {
localAddress: socket.localAddress,
localPort: socket.localPort,
remoteAddress: socket.remoteAddress,
remotePort: socket.remotePort,
remoteFamily: socket.remoteFamily,
timeout: socket.timeout,
bytesWritten: socket.bytesWritten,
bytesRead: socket.bytesRead,
requests: socket[kRequests],
};
}
// console.log('%s emit, %s %s, opaque: %j', name, request.method, request.origin, opaque);
lastRequestOpaque = opaque;
// console.log(request);
}
diagnosticsChannel.subscribe('undici:client:connected', onMessage);
diagnosticsChannel.subscribe('undici:client:sendHeaders', onMessage);
diagnosticsChannel.subscribe('undici:request:trailers', onMessage);

const httpClient = new HttpClient({
allowH2: true,
connect: {
rejectUnauthorized: false,
},
});

let traceId = `mock-traceid-${Date.now()}`;
_url = `https://localhost:${server.address().port}`;
let response = await httpClient.request(`${_url}?head=true`, {
method: 'HEAD',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert(response.url.startsWith(_url));
assert(!response.redirected);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 1);

// HEAD, GET 请求都走同一个 http2 session socket
await sleep(1);
traceId = `mock-traceid-${Date.now()}`;
response = await httpClient.request(_url, {
method: 'GET',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 2);

await sleep(1);
traceId = `mock-traceid-${Date.now()}`;
response = await httpClient.request(_url, {
method: 'GET',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 3);

// socket 复用 1000 次
let count = 1000;
while (count-- > 0) {
await sleep(1);
traceId = `mock-traceid-${Date.now()}`;
response = await httpClient.request(`${_url}?count=${count}`, {
method: 'GET',
opaque: {
tracer: { traceId },
},
});
assert.equal(response.status, 200);
assert.equal(lastRequestOpaque.tracer.traceId, traceId);
assert(lastRequestOpaque.tracer.socket);
assert.equal(lastRequestOpaque.tracer.socket.requests, 3 + 1000 - count);
}
assert.equal(lastRequestOpaque.tracer.socket.requests, 1003);

diagnosticsChannel.unsubscribe('undici:client:connected', onMessage);
diagnosticsChannel.unsubscribe('undici:client:sendHeaders', onMessage);
diagnosticsChannel.unsubscribe('undici:request:trailers', onMessage);
server.close();
});

it('should support trace request by urllib:request and urllib:response', async () => {
let lastRequestOpaque: any;
let socket: any;
Expand Down

0 comments on commit bd19f6d

Please sign in to comment.