diff --git a/ext/fetch/23_request.js b/ext/fetch/23_request.js index 22c17d6d23b841..61cac22d2e2aaa 100644 --- a/ext/fetch/23_request.js +++ b/ext/fetch/23_request.js @@ -281,11 +281,11 @@ class Request { if (signal === undefined) { const signal = newSignal(); this[_signalCache] = signal; - return signal; - } + this[_request].onCancel?.(() => { + signal[signalAbort](signalAbortError); + }); - if (!signal.aborted && this[_request].isCancelled) { - signal[signalAbort](signalAbortError); + return signal; } return signal; diff --git a/ext/http/00_serve.ts b/ext/http/00_serve.ts index 8cfd7ad535e58b..7bf83e49c3dac1 100644 --- a/ext/http/00_serve.ts +++ b/ext/http/00_serve.ts @@ -11,10 +11,10 @@ import { op_http_cancel, op_http_close, op_http_close_after_finish, - op_http_get_request_cancelled, op_http_get_request_headers, op_http_get_request_method_and_url, op_http_read_request_body, + op_http_request_on_cancel, op_http_serve, op_http_serve_on, op_http_set_promise_complete, @@ -375,11 +375,16 @@ class InnerRequest { return this.#external; } - get isCancelled() { + onCancel(callback) { if (this.#external === null) { - return true; + callback(); + return; } - return op_http_get_request_cancelled(this.#external); + + PromisePrototypeThen( + op_http_request_on_cancel(this.#external), + callback, + ); } } diff --git a/ext/http/http_next.rs b/ext/http/http_next.rs index 326478fe7c244e..c55e868352dd33 100644 --- a/ext/http/http_next.rs +++ b/ext/http/http_next.rs @@ -708,6 +708,19 @@ pub fn op_http_get_request_cancelled(external: *const c_void) -> bool { http.cancelled() } +#[op2(async)] +pub async fn op_http_request_on_cancel(external: *const c_void) { + let http = + // SAFETY: op is called with external. + unsafe { clone_external!(external, "op_http_request_on_cancel") }; + let (tx, rx) = tokio::sync::oneshot::channel(); + + http.on_cancel(tx); + drop(http); + + rx.await.ok(); +} + /// Returned promise resolves when body streaming finishes. /// Call [`op_http_close_after_finish`] when done with the external. #[op2(async)] diff --git a/ext/http/lib.rs b/ext/http/lib.rs index 9d71e3ad3ccee8..49893b1b921f21 100644 --- a/ext/http/lib.rs +++ b/ext/http/lib.rs @@ -112,6 +112,7 @@ deno_core::extension!( http_next::op_http_close_after_finish, http_next::op_http_get_request_header, http_next::op_http_get_request_headers, + http_next::op_http_request_on_cancel, http_next::op_http_get_request_method_and_url, http_next::op_http_get_request_cancelled, http_next::op_http_read_request_body, diff --git a/ext/http/service.rs b/ext/http/service.rs index 75f93d77c21684..ce24dea43f372c 100644 --- a/ext/http/service.rs +++ b/ext/http/service.rs @@ -27,6 +27,7 @@ use std::rc::Rc; use std::task::Context; use std::task::Poll; use std::task::Waker; +use tokio::sync::oneshot; pub type Request = hyper::Request; pub type Response = hyper::Response; @@ -211,6 +212,7 @@ pub struct UpgradeUnavailableError; struct HttpRecordInner { server_state: SignallingRc, + closed_channel: Option>, request_info: HttpConnectionProperties, request_parts: http::request::Parts, request_body: Option, @@ -276,6 +278,7 @@ impl HttpRecord { response_body_finished: false, response_body_waker: None, trailers: None, + closed_channel: None, been_dropped: false, finished: false, needs_close_after_finish: false, @@ -312,6 +315,10 @@ impl HttpRecord { RefMut::map(self.self_mut(), |inner| &mut inner.needs_close_after_finish) } + pub fn on_cancel(&self, sender: oneshot::Sender<()>) { + self.self_mut().closed_channel = Some(sender); + } + fn recycle(self: Rc) { assert!( Rc::strong_count(&self) == 1, @@ -390,6 +397,9 @@ impl HttpRecord { inner.been_dropped = true; // The request body might include actual resources. inner.request_body.take(); + if let Some(closed_channel) = inner.closed_channel.take() { + let _ = closed_channel.send(()); + } } /// Complete this record, potentially expunging it if it is fully complete (ie: cancelled as well). diff --git a/tests/unit/serve_test.ts b/tests/unit/serve_test.ts index 9822a0ce2ab0b1..7d8c6ca06d1d37 100644 --- a/tests/unit/serve_test.ts +++ b/tests/unit/serve_test.ts @@ -4299,3 +4299,31 @@ Deno.test({ assert(cancelled); }); + +Deno.test({ + name: "AbortSignal event aborted when request is cancelled", +}, async () => { + const { promise, resolve } = Promise.withResolvers(); + + const server = Deno.serve({ + hostname: "0.0.0.0", + port: servePort, + onListen: () => resolve(), + }, async (request) => { + const { promise: promiseAbort, resolve: resolveAbort } = Promise + .withResolvers(); + request.signal.addEventListener("abort", () => resolveAbort()); + assert(!request.signal.aborted); + + await promiseAbort; + + return new Response("Ok"); + }); + + await promise; + await fetch(`http://localhost:${servePort}/`, { + signal: AbortSignal.timeout(100), + }).catch(() => {}); + + await server.shutdown(); +});