From 240968146e226df852b47838aa9ec91d4ed430c3 Mon Sep 17 00:00:00 2001
From: fengmk2 <suqian.yf@antgroup.com>
Date: Sat, 7 Dec 2024 18:50:54 +0800
Subject: [PATCH] fix: set opaque on request

---
 src/HttpClient.ts           |  2 +-
 src/Response.ts             |  2 +-
 src/diagnosticsChannel.ts   | 22 ++++++++++++----------
 src/fetch.ts                | 13 +++++++++----
 src/symbols.ts              |  1 +
 test/fetch.test.ts          | 24 ++++++++++++++++++------
 test/options.timing.test.ts |  7 ++++---
 7 files changed, 46 insertions(+), 25 deletions(-)

diff --git a/src/HttpClient.ts b/src/HttpClient.ts
index 1d0f0b1e..6da6611f 100644
--- a/src/HttpClient.ts
+++ b/src/HttpClient.ts
@@ -300,7 +300,7 @@ export class HttpClient extends EventEmitter {
       // socket assigned
       queuing: 0,
       // dns lookup time
-      // dnslookup: 0,
+      dnslookup: 0,
       // socket connected
       connected: 0,
       // request headers sent
diff --git a/src/Response.ts b/src/Response.ts
index 66618a8d..fcf4b615 100644
--- a/src/Response.ts
+++ b/src/Response.ts
@@ -28,7 +28,7 @@ export type Timing = {
   // socket assigned
   queuing: number;
   // dns lookup time
-  // dnslookup: number;
+  dnslookup: number;
   // socket connected
   connected: number;
   // request headers sent
diff --git a/src/diagnosticsChannel.ts b/src/diagnosticsChannel.ts
index 3092f85f..3529f3df 100644
--- a/src/diagnosticsChannel.ts
+++ b/src/diagnosticsChannel.ts
@@ -89,7 +89,9 @@ export function initDiagnosticsChannel() {
     const opaque = getRequestOpaque(request, kHandler);
     // ignore non HttpClient Request
     if (!opaque || !opaque[symbols.kRequestId]) return;
-    debug('[%s] Request#%d %s %s, path: %s, headers: %o',
+
+    Reflect.set(request, symbols.kRequestInternalOpaque, opaque);
+    debug('[%s] Request#%d %s %s, path: %s, headers: %j',
       name, opaque[symbols.kRequestId], request.method, request.origin, request.path, request.headers);
     if (!opaque[symbols.kEnableRequestTiming]) return;
     opaque[symbols.kRequestTiming].queuing = performanceTime(opaque[symbols.kRequestStartTime]);
@@ -114,10 +116,10 @@ export function initDiagnosticsChannel() {
       sock[symbols.kSocketConnectProtocol] = connectParams.protocol;
       sock[symbols.kSocketConnectHost] = connectParams.host;
       sock[symbols.kSocketConnectPort] = connectParams.port;
-      debug('[%s] Socket#%d connectError, connectParams: %o, error: %s, (sock: %o)',
+      debug('[%s] Socket#%d connectError, connectParams: %j, error: %s, (sock: %j)',
         name, sock[symbols.kSocketId], connectParams, (error as Error).message, formatSocket(sock));
     } else {
-      debug('[%s] connectError, connectParams: %o, error: %o',
+      debug('[%s] connectError, connectParams: %j, error: %o',
         name, connectParams, error);
     }
   });
@@ -136,13 +138,13 @@ export function initDiagnosticsChannel() {
     socket[symbols.kSocketConnectProtocol] = connectParams.protocol;
     socket[symbols.kSocketConnectHost] = connectParams.host;
     socket[symbols.kSocketConnectPort] = connectParams.port;
-    debug('[%s] Socket#%d connected (sock: %o)', name, socket[symbols.kSocketId], formatSocket(socket));
+    debug('[%s] Socket#%d connected (sock: %j)', name, socket[symbols.kSocketId], formatSocket(socket));
   });
 
   // This message is published right before the first byte of the request is written to the socket.
   subscribe('undici:client:sendHeaders', (message, name) => {
     const { request, socket } = message as DiagnosticsChannel.ClientSendHeadersMessage & { socket: SocketExtend };
-    const opaque = getRequestOpaque(request, kHandler);
+    const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
     if (!opaque || !opaque[symbols.kRequestId]) {
       debug('[%s] opaque not found', name);
       return;
@@ -151,7 +153,7 @@ export function initDiagnosticsChannel() {
     (socket[symbols.kHandledRequests] as number)++;
     // attach socket to opaque
     opaque[symbols.kRequestSocket] = socket;
-    debug('[%s] Request#%d send headers on Socket#%d (handled %d requests, sock: %o)',
+    debug('[%s] Request#%d send headers on Socket#%d (handled %d requests, sock: %j)',
       name, opaque[symbols.kRequestId], socket[symbols.kSocketId], socket[symbols.kHandledRequests],
       formatSocket(socket));
 
@@ -167,7 +169,7 @@ export function initDiagnosticsChannel() {
 
   subscribe('undici:request:bodySent', (message, name) => {
     const { request } = message as DiagnosticsChannel.RequestBodySentMessage;
-    const opaque = getRequestOpaque(request, kHandler);
+    const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
     if (!opaque || !opaque[symbols.kRequestId]) {
       debug('[%s] opaque not found', name);
       return;
@@ -181,7 +183,7 @@ export function initDiagnosticsChannel() {
   // This message is published after the response headers have been received, i.e. the response has been completed.
   subscribe('undici:request:headers', (message, name) => {
     const { request, response } = message as DiagnosticsChannel.RequestHeadersMessage;
-    const opaque = getRequestOpaque(request, kHandler);
+    const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
     if (!opaque || !opaque[symbols.kRequestId]) {
       debug('[%s] opaque not found', name);
       return;
@@ -191,7 +193,7 @@ export function initDiagnosticsChannel() {
     const socket = opaque[symbols.kRequestSocket];
     if (socket) {
       socket[symbols.kHandledResponses]++;
-      debug('[%s] Request#%d get %s response headers on Socket#%d (handled %d responses, sock: %o)',
+      debug('[%s] Request#%d get %s response headers on Socket#%d (handled %d responses, sock: %j)',
         name, opaque[symbols.kRequestId], response.statusCode, socket[symbols.kSocketId], socket[symbols.kHandledResponses],
         formatSocket(socket));
     } else {
@@ -206,7 +208,7 @@ export function initDiagnosticsChannel() {
   // This message is published after the response body and trailers have been received, i.e. the response has been completed.
   subscribe('undici:request:trailers', (message, name) => {
     const { request } = message as DiagnosticsChannel.RequestTrailersMessage;
-    const opaque = getRequestOpaque(request, kHandler);
+    const opaque = Reflect.get(request, symbols.kRequestInternalOpaque);
     if (!opaque || !opaque[symbols.kRequestId]) {
       debug('[%s] opaque not found', name);
       return;
diff --git a/src/fetch.ts b/src/fetch.ts
index 0165453f..b1f63bf2 100644
--- a/src/fetch.ts
+++ b/src/fetch.ts
@@ -1,4 +1,5 @@
 import { AsyncLocalStorage } from 'node:async_hooks';
+import { debuglog } from 'node:util';
 import {
   fetch as UndiciFetch,
   RequestInfo,
@@ -41,6 +42,8 @@ import { RawResponseWithMeta, SocketInfo } from './Response.js';
 import { IncomingHttpHeaders } from './IncomingHttpHeaders.js';
 import { BaseAgent, BaseAgentOptions } from './BaseAgent.js';
 
+const debug = debuglog('urllib:fetch');
+
 export interface UrllibRequestInit extends RequestInit {
   // default is true
   timing?: boolean;
@@ -137,7 +140,7 @@ export class FetchFactory {
       // socket assigned
       queuing: 0,
       // dns lookup time
-      // dnslookup: 0,
+      dnslookup: 0,
       // socket connected
       connected: 0,
       // request headers sent
@@ -218,8 +221,9 @@ export class FetchFactory {
         res = await UndiciFetch(input, init);
       });
     } catch (e: any) {
-      updateSocketInfo(socketInfo, internalOpaque /* , rawError */);
+      updateSocketInfo(socketInfo, internalOpaque, e);
       urllibResponse.rt = performanceTime(requestStartTime);
+      debug('Request#%d throw error: %s', requestId, e);
       channels.fetchResponse.publish({
         fetch: fetchMeta,
         error: e,
@@ -234,7 +238,7 @@ export class FetchFactory {
 
     // get undici internal response
     const state = getResponseState(res!);
-    updateSocketInfo(socketInfo, internalOpaque /* , rawError */);
+    updateSocketInfo(socketInfo, internalOpaque);
 
     urllibResponse.headers = convertHeader(res!.headers);
     urllibResponse.status = urllibResponse.statusCode = res!.status;
@@ -243,7 +247,8 @@ export class FetchFactory {
       urllibResponse.size = parseInt(urllibResponse.headers['content-length']);
     }
     urllibResponse.rt = performanceTime(requestStartTime);
-
+    debug('Request#%d got response, status: %s, headers: %j, timing: %j, socket: %j',
+      requestId, urllibResponse.status, urllibResponse.headers, timing, urllibResponse.socket);
     channels.fetchResponse.publish({
       fetch: fetchMeta,
       timingInfo: state.timingInfo,
diff --git a/src/symbols.ts b/src/symbols.ts
index 406a6636..7890ff41 100644
--- a/src/symbols.ts
+++ b/src/symbols.ts
@@ -17,5 +17,6 @@ export default {
   kEnableRequestTiming: Symbol('enable request timing or not'),
   kRequestTiming: Symbol('request timing'),
   kRequestOriginalOpaque: Symbol('request original opaque'),
+  kRequestInternalOpaque: Symbol('request internal opaque'),
   kErrorSocket: Symbol('socket of error'),
 };
diff --git a/test/fetch.test.ts b/test/fetch.test.ts
index 6171e220..8c186763 100644
--- a/test/fetch.test.ts
+++ b/test/fetch.test.ts
@@ -1,5 +1,6 @@
 import assert from 'node:assert/strict';
 import diagnosticsChannel from 'node:diagnostics_channel';
+import { setTimeout as sleep } from 'node:timers/promises';
 import { describe, it, beforeAll, afterAll } from 'vitest';
 import { startServer } from './fixtures/server.js';
 import {
@@ -20,7 +21,6 @@ describe('fetch.test.ts', () => {
     await close();
   });
 
-
   it('fetch should work', async () => {
     let requestDiagnosticsMessage: RequestDiagnosticsMessage;
     let responseDiagnosticsMessage: ResponseDiagnosticsMessage;
@@ -40,12 +40,13 @@ describe('fetch.test.ts', () => {
     });
     FetchFactory.setClientOptions({});
 
-    const response = await fetch(`${_url}html`);
+    let response = await fetch(`${_url}html`);
 
     assert(response);
     assert(requestDiagnosticsMessage!.request);
     assert(responseDiagnosticsMessage!.request);
     assert(responseDiagnosticsMessage!.response);
+    assert(responseDiagnosticsMessage!.response.socket.localAddress);
     assert([ '127.0.0.1', '::1' ].includes(responseDiagnosticsMessage!.response.socket.localAddress));
 
     assert(fetchDiagnosticsMessage!.fetch);
@@ -53,6 +54,13 @@ describe('fetch.test.ts', () => {
     assert(fetchResponseDiagnosticsMessage!.response);
     assert(fetchResponseDiagnosticsMessage!.timingInfo);
 
+    await sleep(1);
+    // again, keep alive
+    response = await fetch(`${_url}html`);
+    // console.log(responseDiagnosticsMessage!.response.socket);
+    assert(responseDiagnosticsMessage!.response.socket.handledRequests > 1);
+    assert(responseDiagnosticsMessage!.response.socket.handledResponses > 1);
+
     const stats = FetchFactory.getDispatcherPoolStats();
     assert(stats);
     assert(Object.keys(stats).length > 0);
@@ -77,17 +85,21 @@ describe('fetch.test.ts', () => {
     });
     FetchFactory.setClientOptions({});
 
-    try {
+    await assert.rejects(async () => {
       await fetch(`${_url}html?timeout=9999`, {
         signal: AbortSignal.timeout(100),
       });
-    } catch (error) {
-      console.log(error);
-    }
+    }, (err: any) => {
+      assert.equal(err.name, 'TimeoutError');
+      assert.equal(err.message, 'The operation was aborted due to timeout');
+      return true;
+    });
 
     assert(requestDiagnosticsMessage!.request);
     assert(responseDiagnosticsMessage!.request);
     assert(responseDiagnosticsMessage!.response);
+    // console.log(responseDiagnosticsMessage!.response.socket);
+    assert(responseDiagnosticsMessage!.response.socket.localAddress);
     assert([ '127.0.0.1', '::1' ].includes(responseDiagnosticsMessage!.response.socket.localAddress));
 
     assert(fetchDiagnosticsMessage!.fetch);
diff --git a/test/options.timing.test.ts b/test/options.timing.test.ts
index 680686c2..e1ae0c51 100644
--- a/test/options.timing.test.ts
+++ b/test/options.timing.test.ts
@@ -32,7 +32,7 @@ describe('options.timing.test.ts', () => {
     assert(res.timing.contentDownload > 0);
     assert(res.timing.contentDownload > res.timing.waiting);
     assert(res.timing.contentDownload <= res.rt);
-    assert(res.socket.handledResponses === 1);
+    assert.equal(res.socket.handledResponses, 1);
 
     // again connected should be zero
     await sleep(1);
@@ -45,13 +45,14 @@ describe('options.timing.test.ts', () => {
     res = response.res as RawResponseWithMeta;
     assert(res.timing.waiting > 0);
     assert(res.timing.queuing > 0);
-    assert(res.timing.connected === 0);
+    assert.equal(res.timing.connected, 0);
     assert(res.timing.requestHeadersSent > 0);
     assert(res.timing.requestSent > 0);
     assert(res.timing.contentDownload > 0);
     assert(res.timing.contentDownload > res.timing.waiting);
     assert(res.timing.contentDownload <= res.rt);
-    assert(res.socket.handledResponses === 2);
+    assert.equal(res.socket.handledResponses, 2);
+    // console.log(res.timing);
   });
 
   it('should timing = false work', async () => {