Skip to content

Commit

Permalink
refactor(http/upgrade): Http11Upgrade is Clone (#3540)
Browse files Browse the repository at this point in the history
* refactor(http/upgrade): `Http11Upgrade::insert_half` matches on `self`

this is a noöp change, to set the stage for subsequent changes to the
internal model of `Http11Upgrade`.

this `inner` field will shortly be an option, and this will make it
easier to only follow these panicking branches when the inner lock is
`Some(_)`.

Signed-off-by: katelyn martin <[email protected]>

* refactor(http/upgrade): `Http11Upgade` stores an `Option<T>`

this commit hinges on this change to the upgrade middleware's `inner`
field.

we still retain a reference-counted copy of the `Inner` state, but now
we may store `None` here.

```
 pub struct Http11Upgrade {
     half: Half,
-    inner: Arc<Inner>,
+    inner: Option<Arc<Inner>>,
 }
```

a new branch is added to the `insert_half` method that consumes the
"sender" and inserts an upgrade future; when this is `None` it will do
nothing, rather than panicking.

Signed-off-by: katelyn martin <[email protected]>

* refactor(http/upgrade): `Half` marker is `Copy`

this type is an empty flag to indicate whether an `Http11Upgrade`
extension corresponds to the server or client half of the upgrade
future channel.

this type is made copy, to facilitate making the `Http11Upgrade`
extension safely cloneable.

Signed-off-by: katelyn martin <[email protected]>

* refactor(http/upgrade): `Http11Upgrade` is `Clone`

this commit makes `Http11Upgrade` a cloneable type.

see <linkerd/linkerd2#8733>.

in the 1.0 interface of the `http` crate, request and response
extensions must now satisfy a `Clone` bound.

`Http11Upgrade` was written before this was the case, and is very
intentionally designed around the idea that it *not* be cloneable.

`insert_half()` in particular could cause the proxy to panic if it were
to clone a request or response's extensions. it might call
`insert_half()` a second time, and discover that the `TryLock<T>` had
already been set.

moreover, holding on to a copy of the extensions would prevent the
`Drop` method for `Inner` from being called. This would cause
connections that negotiate an HTTP/1.1 upgrade to deadlock due to the
`OnUpgrade` futures never being polled, and failing to create a `Duplex`
that acts as the connection's I/O transport.

this commit makes use of the alterations to `Http11Upgrade` made in
previous commits, and adds a *safe* implementation of `Clone`. by
only shallowly copying the extension, we tie the upgrade glue to a
*specific* request/response.

the extension can be cloned, but any generated copies will be inert.

Signed-off-by: katelyn martin <[email protected]>

* chore(http/upgrade): fix broken intradoc links

Signed-off-by: katelyn martin <[email protected]>

* chore(http/upgrade): add `thiserror` dependency

Signed-off-by: katelyn martin <[email protected]>

* refactor(proxy/http): use `.await` syntax

`FutureExt::map_ok()` won't work if we try to return an error from this
block. the `and_then()` adaptor is used to chain futures, and also won't
work given a synchronous closure.

this can be done with the equivalent `.await` syntax, and leaves a nicer
hole for us to propagate other errors here, shortly.

Signed-off-by: katelyn martin <[email protected]>

* review(http/upgrade): propagate `insert_half()` failures

#3540 (comment)

Signed-off-by: katelyn martin <[email protected]>
Co-Authored-By: Oliver Gould <[email protected]>

* docs(http/upgrade): tweak comment

Signed-off-by: katelyn martin <[email protected]>

---------

Signed-off-by: katelyn martin <[email protected]>
Co-authored-by: Oliver Gould <[email protected]>
  • Loading branch information
cratelyn and olix0r authored Jan 22, 2025
1 parent b8d29a2 commit 76b5f10
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 20 deletions.
1 change: 1 addition & 0 deletions Cargo.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1859,6 +1859,7 @@ dependencies = [
"linkerd-io",
"linkerd-stack",
"pin-project",
"thiserror 2.0.11",
"tokio",
"tower",
"tracing",
Expand Down
1 change: 1 addition & 0 deletions linkerd/http/upgrade/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ hyper = { workspace = true, default-features = false, features = [
"client",
] }
pin-project = "1"
thiserror = "2"
tokio = { version = "1", default-features = false }
tower = { version = "0.4", default-features = false }
tracing = "0.1"
Expand Down
7 changes: 6 additions & 1 deletion linkerd/http/upgrade/src/glue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ impl<B> PinnedDrop for UpgradeBody<B> {
let this = self.project();
// If an HTTP/1 upgrade was wanted, send the upgrade future.
if let Some((upgrade, on_upgrade)) = this.upgrade.take() {
upgrade.insert_half(on_upgrade);
if let Err(error) = upgrade.insert_half(on_upgrade) {
tracing::warn!(
?error,
"upgrade body could not send upgrade future upon completion"
);
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion linkerd/http/upgrade/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ pub mod upgrade;
/// > fields' semantics. This includes but is not limited to:
/// >
/// > - `Proxy-Connection` (Appendix C.2.2 of [HTTP/1.1])
/// > - `Keep-Alive` (Section 19.7.1 of [RFC2068])
/// > - `Keep-Alive` (Section 19.7.1 of \[RFC2068\])
/// > - `TE` (Section 10.1.4)
/// > - `Transfer-Encoding` (Section 6.1 of [HTTP/1.1])
/// > - `Upgrade` (Section 7.8)
Expand Down
59 changes: 45 additions & 14 deletions linkerd/http/upgrade/src/upgrade.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ use try_lock::TryLock;
/// inserted into the `Request::extensions()`. If the HTTP1 client service
/// also detects an upgrade, the two `OnUpgrade` futures will be joined
/// together with the glue in this type.
// Note: this relies on there only having been 2 Inner clones, so don't
// implement `Clone` for this type.
pub struct Http11Upgrade {
half: Half,
inner: Arc<Inner>,
inner: Option<Arc<Inner>>,
}

/// A named "tuple" returned by [`Http11Upgade::halves()`] of the two halves of
Expand All @@ -50,7 +48,7 @@ struct Inner {
upgrade_drain_signal: Option<drain::Watch>,
}

#[derive(Debug)]
#[derive(Clone, Copy, Debug)]
enum Half {
Server,
Client,
Expand All @@ -63,6 +61,13 @@ pub struct Service<S> {
upgrade_drain_signal: drain::Watch,
}

#[derive(Debug, thiserror::Error)]
#[error("OnUpgrade future has already been inserted: half={half:?}")]
pub struct AlreadyInserted {
half: Half,
pub upgrade: OnUpgrade,
}

// === impl Http11Upgrade ===

impl Http11Upgrade {
Expand All @@ -80,35 +85,42 @@ impl Http11Upgrade {
Http11UpgradeHalves {
server: Http11Upgrade {
half: Half::Server,
inner: inner.clone(),
inner: Some(inner.clone()),
},
client: Http11Upgrade {
half: Half::Client,
inner,
inner: Some(inner.clone()),
},
}
}

pub fn insert_half(self, upgrade: OnUpgrade) {
match self.half {
Half::Server => {
let mut lock = self
.inner
pub fn insert_half(self, upgrade: OnUpgrade) -> Result<(), AlreadyInserted> {
match self {
Self {
inner: Some(inner),
half: Half::Server,
} => {
let mut lock = inner
.server
.try_lock()
.expect("only Half::Server touches server TryLock");
debug_assert!(lock.is_none());
*lock = Some(upgrade);
Ok(())
}
Half::Client => {
let mut lock = self
.inner
Self {
inner: Some(inner),
half: Half::Client,
} => {
let mut lock = inner
.client
.try_lock()
.expect("only Half::Client touches client TryLock");
debug_assert!(lock.is_none());
*lock = Some(upgrade);
Ok(())
}
Self { inner: None, half } => Err(AlreadyInserted { half, upgrade }),
}
}
}
Expand All @@ -121,6 +133,25 @@ impl fmt::Debug for Http11Upgrade {
}
}

/// An [`Http11Upgrade`] can be cloned.
///
/// NB: Only the original copy of this extension may insert an [`OnUpgrade`] future into its half
/// of the channel. Calling [`insert_half()`][Http11Upgrade::insert_half] on any clones of an
/// upgrade extension will result in an error.
// See the [`Drop`] implementation provided by `Inner` for more information.
impl Clone for Http11Upgrade {
fn clone(&self) -> Self {
Self {
half: self.half,
// We do *NOT* deeply clone our reference to `Inner`.
//
// `Http11Upgrade::insert_half()` and the `Inner` type's `Drop` glue rely on there only
// being one copy of the client and sender halves of the upgrade channel.
inner: None,
}
}
}

/// When both halves have dropped, check if both sides are inserted,
/// and if so, spawn the upgrade task.
impl Drop for Inner {
Expand Down
9 changes: 5 additions & 4 deletions linkerd/proxy/http/src/h1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ where
client.as_ref().unwrap().request(req)
};

Box::pin(rsp_fut.err_into().map_ok(move |mut rsp| {
Box::pin(async move {
let mut rsp = rsp_fut.await?;
if is_http_connect {
// Add an extension to indicate that this a response to a CONNECT request.
debug_assert!(
Expand All @@ -161,14 +162,14 @@ where
if is_upgrade(&rsp) {
trace!("Client response is HTTP/1.1 upgrade");
if let Some(upgrade) = upgrade {
upgrade.insert_half(hyper::upgrade::on(&mut rsp));
upgrade.insert_half(hyper::upgrade::on(&mut rsp))?;
}
} else {
linkerd_http_upgrade::strip_connection_headers(rsp.headers_mut());
}

rsp.map(BoxBody::new)
}))
Ok(rsp.map(BoxBody::new))
})
}
}

Expand Down

0 comments on commit 76b5f10

Please sign in to comment.