diff --git a/Cargo.lock b/Cargo.lock index 238d831641..81c3308e62 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1859,6 +1859,7 @@ dependencies = [ "linkerd-io", "linkerd-stack", "pin-project", + "thiserror 2.0.11", "tokio", "tower", "tracing", diff --git a/linkerd/http/upgrade/Cargo.toml b/linkerd/http/upgrade/Cargo.toml index 1787fddb6e..81cc2b380e 100644 --- a/linkerd/http/upgrade/Cargo.toml +++ b/linkerd/http/upgrade/Cargo.toml @@ -17,6 +17,7 @@ http = { workspace = true } http-body = { workspace = true } hyper = { workspace = true, default-features = false, features = ["deprecated", "client"] } pin-project = "1" +thiserror = "2" tokio = { version = "1", default-features = false } tower = { version = "0.4", default-features = false } tracing = "0.1" diff --git a/linkerd/http/upgrade/src/glue.rs b/linkerd/http/upgrade/src/glue.rs index 459099e835..90bc58df79 100644 --- a/linkerd/http/upgrade/src/glue.rs +++ b/linkerd/http/upgrade/src/glue.rs @@ -126,7 +126,12 @@ impl PinnedDrop for UpgradeBody { let this = self.project(); // If an HTTP/1 upgrade was wanted, send the upgrade future. if let Some((upgrade, on_upgrade)) = this.upgrade.take() { - upgrade.insert_half(on_upgrade); + if let Err(error) = upgrade.insert_half(on_upgrade) { + tracing::warn!( + ?error, + "upgrade body could not send upgrade future upon completion" + ); + } } } } diff --git a/linkerd/http/upgrade/src/lib.rs b/linkerd/http/upgrade/src/lib.rs index ae09d3d6fe..d3167d7518 100644 --- a/linkerd/http/upgrade/src/lib.rs +++ b/linkerd/http/upgrade/src/lib.rs @@ -44,7 +44,7 @@ pub mod upgrade; /// > fields' semantics. This includes but is not limited to: /// > /// > - `Proxy-Connection` (Appendix C.2.2 of [HTTP/1.1]) -/// > - `Keep-Alive` (Section 19.7.1 of [RFC2068]) +/// > - `Keep-Alive` (Section 19.7.1 of \[RFC2068\]) /// > - `TE` (Section 10.1.4) /// > - `Transfer-Encoding` (Section 6.1 of [HTTP/1.1]) /// > - `Upgrade` (Section 7.8) diff --git a/linkerd/http/upgrade/src/upgrade.rs b/linkerd/http/upgrade/src/upgrade.rs index 39a498336e..14bbc6ffa1 100644 --- a/linkerd/http/upgrade/src/upgrade.rs +++ b/linkerd/http/upgrade/src/upgrade.rs @@ -22,11 +22,9 @@ use try_lock::TryLock; /// inserted into the `Request::extensions()`. If the HTTP1 client service /// also detects an upgrade, the two `OnUpgrade` futures will be joined /// together with the glue in this type. -// Note: this relies on there only having been 2 Inner clones, so don't -// implement `Clone` for this type. pub struct Http11Upgrade { half: Half, - inner: Arc, + inner: Option>, } /// A named "tuple" returned by [`Http11Upgade::halves()`] of the two halves of @@ -50,7 +48,7 @@ struct Inner { upgrade_drain_signal: Option, } -#[derive(Debug)] +#[derive(Clone, Copy, Debug)] enum Half { Server, Client, @@ -63,6 +61,13 @@ pub struct Service { upgrade_drain_signal: drain::Watch, } +#[derive(Debug, thiserror::Error)] +#[error("OnUpgrade future has already been inserted: half={half:?}")] +pub struct AlreadyInserted { + half: Half, + pub upgrade: OnUpgrade, +} + // === impl Http11Upgrade === impl Http11Upgrade { @@ -80,35 +85,42 @@ impl Http11Upgrade { Http11UpgradeHalves { server: Http11Upgrade { half: Half::Server, - inner: inner.clone(), + inner: Some(inner.clone()), }, client: Http11Upgrade { half: Half::Client, - inner, + inner: Some(inner.clone()), }, } } - pub fn insert_half(self, upgrade: OnUpgrade) { - match self.half { - Half::Server => { - let mut lock = self - .inner + pub fn insert_half(self, upgrade: OnUpgrade) -> Result<(), AlreadyInserted> { + match self { + Self { + inner: Some(inner), + half: Half::Server, + } => { + let mut lock = inner .server .try_lock() .expect("only Half::Server touches server TryLock"); debug_assert!(lock.is_none()); *lock = Some(upgrade); + Ok(()) } - Half::Client => { - let mut lock = self - .inner + Self { + inner: Some(inner), + half: Half::Client, + } => { + let mut lock = inner .client .try_lock() .expect("only Half::Client touches client TryLock"); debug_assert!(lock.is_none()); *lock = Some(upgrade); + Ok(()) } + Self { inner: None, half } => Err(AlreadyInserted { half, upgrade }), } } } @@ -121,6 +133,25 @@ impl fmt::Debug for Http11Upgrade { } } +/// An [`Http11Upgrade`] can be cloned. +/// +/// NB: Only the original copy of this extension may insert an [`OnUpgrade`] future into its half +/// of the channel. Calling [`insert_half()`][Http11Upgrade::insert_half] on any clones of an +/// upgrade extension will result in an error. +// See the [`Drop`] implementation provided by `Inner` for more information. +impl Clone for Http11Upgrade { + fn clone(&self) -> Self { + Self { + half: self.half, + // We do *NOT* deeply clone our reference to `Inner`. + // + // `Http11Upgrade::insert_half()` and the `Inner` type's `Drop` glue rely on there only + // being one copy of the client and sender halves of the upgrade channel. + inner: None, + } + } +} + /// When both halves have dropped, check if both sides are inserted, /// and if so, spawn the upgrade task. impl Drop for Inner { diff --git a/linkerd/proxy/http/src/h1.rs b/linkerd/proxy/http/src/h1.rs index becb3730eb..be3d3fbb1c 100644 --- a/linkerd/proxy/http/src/h1.rs +++ b/linkerd/proxy/http/src/h1.rs @@ -136,7 +136,8 @@ where client.as_ref().unwrap().request(req) }; - Box::pin(rsp_fut.err_into().map_ok(move |mut rsp| { + Box::pin(async move { + let mut rsp = rsp_fut.await?; if is_http_connect { // Add an extension to indicate that this a response to a CONNECT request. debug_assert!( @@ -161,14 +162,14 @@ where if is_upgrade(&rsp) { trace!("Client response is HTTP/1.1 upgrade"); if let Some(upgrade) = upgrade { - upgrade.insert_half(hyper::upgrade::on(&mut rsp)); + upgrade.insert_half(hyper::upgrade::on(&mut rsp))?; } } else { linkerd_http_upgrade::strip_connection_headers(rsp.headers_mut()); } - rsp.map(BoxBody::new) - })) + Ok(rsp.map(BoxBody::new)) + }) } }