Skip to content

Commit

Permalink
Updated hyper and http (#237)
Browse files Browse the repository at this point in the history
* Updated cargo lock file to latest versions

* Fixed clippy warnings

* Updated dashmap to new release

* Updated hyper and http in test_common

- Major changes. Many of the utility functions have been moved to the new hyper-util
- See https://hyper.rs/guides/1/upgrading/

* Updated root Cargo.toml to latest hyper and http

- Broken build. Still more to go.

* Updated root Cargo.toml to latest hyper and http

- Broken build. Still more to go.

* Updated code to handle hyper v1 and http v1

* Ignore clippy warning that breaks code if fixed as suggested

* Updated hyper, http, and lock file to latest

- Removed minor version constraint on hyper and http
- Added approved license BSD-2 Clause to the deny.toml

* Cleaned up code

- Removed unneeded dependencies
- Removed commented out code
- Cleaned up BoxBody type in test_common
  • Loading branch information
tkmcmaster authored Jul 31, 2024
1 parent 519f780 commit 376feed
Show file tree
Hide file tree
Showing 13 changed files with 286 additions and 146 deletions.
253 changes: 176 additions & 77 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 5 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ for_each_parallel = { path = "./lib/for_each_parallel" }
futures = "0.3"
futures-timer = "3"
hdrhistogram = "7"
http = "0.2"
hyper = { version = "0.14", features = ["client", "http1", "http2", "stream"] }
hyper-tls = "0.5"
http = "1"
hyper = { version = "1", features = ["client", "http1", "http2"] }
hyper-tls = "0.6"
hyper-util = { version = "0.1", features = ["tokio", "client", "http1", "http2"] }
http-body-util = "0.1"
itertools = "0.13"
mod_interval = { path = "./lib/mod_interval" }
native-tls = "0.2"
Expand Down
1 change: 1 addition & 0 deletions deny.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private = { ignore = true }
version = 2
allow = [
"Apache-2.0",
"BSD-2-Clause",
"BSD-3-Clause",
"MIT",
"Unicode-DFS-2016",
Expand Down
2 changes: 1 addition & 1 deletion lib/config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ doctest = false
base64 = "0.22"
ether = { path = "../either" }
futures = "0.3"
http = "0.2"
http = "1"
itertools = "0.13"
jsonpath_lib = "0.3.0"
percent-encoding = "2"
Expand Down
7 changes: 5 additions & 2 deletions lib/test_common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ doctest = false
path = "test_common.rs"

[dependencies]
bytes = "1"
futures = "0.3"
futures-timer = "3"
hyper = { version = "0.14", features = ["server"] }
http = "0.2"
hyper = { version = "1", features = ["http1", "http2"] }
hyper-util = { version = "0.1", features = ["tokio", "server", "http1", "http2"] }
http = "1"
http-body-util = "0.1"
parking_lot = "0.12"
tokio = { version = "1", features = ["full"] }
url = "2"
Expand Down
58 changes: 39 additions & 19 deletions lib/test_common/test_common.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,23 @@
use log::{debug, info};
use std::{future::Future, io, str::FromStr, sync::Arc, time::Duration};
use std::{future::Future, io, net::SocketAddr, str::FromStr, sync::Arc, time::Duration};
use tokio::net::TcpListener;

use bytes::Bytes;
use futures::{channel::oneshot, future::select, FutureExt};
use futures_timer::Delay;
use http::{header, StatusCode};
use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
Body, Error, Request, Response, Server,
use http_body_util::{BodyExt, Empty};
use hyper::{body::Incoming as Body, service::service_fn, Error, Request, Response};
use hyper_util::{
rt::{TokioExecutor, TokioIo},
server::conn::auto::Builder as HyperBuilder,
};
use parking_lot::Mutex;
use url::Url;

async fn echo_route(req: Request<Body>) -> Response<Body> {
type HyperBody = http_body_util::combinators::BoxBody<Bytes, Error>;

async fn echo_route(req: Request<Body>) -> Response<HyperBody> {
let headers = req.headers();
let content_type = headers
.get(header::CONTENT_TYPE)
Expand All @@ -36,24 +41,24 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
if echo.is_some() {
debug!("Echo Body = {}", echo.clone().unwrap_or_default());
}
let mut response = match (req.method(), echo) {
let mut response: Response<HyperBody> = match (req.method(), echo) {
(&http::Method::GET, Some(b)) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(b.into())
.body(b.map_err(|never| match never {}).boxed())
.unwrap(),
(&http::Method::POST, _) | (&http::Method::PUT, _) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, content_type)
.body(req.into_body())
.body(req.into_body().boxed())
.unwrap(),
_ => Response::builder()
.status(StatusCode::NO_CONTENT)
.body(Body::empty())
.body(empty())
.unwrap(),
};
let ms = wait.and_then(|c| FromStr::from_str(&c).ok()).unwrap_or(0);
let old_body = std::mem::replace(response.body_mut(), Body::empty());
let old_body = std::mem::replace(response.body_mut(), empty());
if ms > 0 {
debug!("waiting {} ms", ms);
}
Expand All @@ -62,13 +67,22 @@ async fn echo_route(req: Request<Body>) -> Response<Body> {
response
}

pub fn start_test_server(
fn empty() -> HyperBody {
Empty::<Bytes>::new()
.map_err(|never| match never {})
.boxed()
}

pub async fn start_test_server(
port: Option<u16>,
) -> (u16, oneshot::Sender<()>, impl Future<Output = ()>) {
let port = port.unwrap_or(0);
let address = ([127, 0, 0, 1], port).into();
let address: SocketAddr = ([127, 0, 0, 1], port).into();

let listener = TcpListener::bind(address).await.unwrap();
let local_addr = listener.local_addr().unwrap();

let make_svc = make_service_fn(|_: &AddrStream| async {
let server = tokio::spawn(async move {
let service = service_fn(|req: Request<Body>| async {
debug!("{:?}", req);
let method = req.method().to_string();
Expand All @@ -78,7 +92,7 @@ pub fn start_test_server(
"/" => echo_route(req).await,
_ => Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::empty())
.body(empty())
.unwrap(),
};
debug!("{:?}", response);
Expand All @@ -92,14 +106,20 @@ pub fn start_test_server(
);
Ok::<_, Error>(response)
});
Ok::<_, Error>(service)

loop {
let (stream, _) = listener.accept().await.unwrap();
let stream = TokioIo::new(stream);
tokio::task::spawn(async move {
let builder = HyperBuilder::new(TokioExecutor::new());
builder.serve_connection(stream, service).await.unwrap();
});
}
});

let (tx, rx) = oneshot::channel();

let server = Server::bind(&address).serve(make_svc);

let port = server.local_addr().port();
let port = local_addr.port();

let future = select(server, rx);

Expand Down
2 changes: 1 addition & 1 deletion src/bin/test_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ fn main() {
rt.block_on(async {
let port = std::env::var("PORT").ok().and_then(|s| s.parse().ok());
debug!("port = {}", port.unwrap_or_default());
let (port, rx, handle) = start_test_server(port);
let (port, rx, handle) = start_test_server(port).await;

println!("Listening on port {port}");

Expand Down
20 changes: 14 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,15 @@ use futures::{
stream, FutureExt, Stream, StreamExt,
};
use futures_timer::Delay;
use hyper::{client::HttpConnector, Body, Client};
use http_body_util::combinators::BoxBody;
use hyper_tls::HttpsConnector;
use hyper_util::{
client::legacy::{
connect::{dns::GaiResolver, HttpConnector},
Client,
},
rt::TokioExecutor,
};
use itertools::Itertools;
use line_writer::{blocking_writer, MsgType};
use log::{debug, error, info, warn};
Expand Down Expand Up @@ -57,6 +64,8 @@ use std::{
time::{Duration, Instant},
};

type Body = BoxBody<bytes::Bytes, std::io::Error>;

struct Endpoints {
// yaml index of the endpoint, (endpoint tags, builder)
inner: Vec<(
Expand Down Expand Up @@ -1139,16 +1148,15 @@ fn create_load_test_future(

pub(crate) fn create_http_client(
keepalive: Duration,
) -> Result<
Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>,
TestError,
> {
) -> Result<Client<HttpsConnector<HttpConnector<GaiResolver>>, Body>, TestError> {
let mut http = HttpConnector::new();
http.set_keepalive(Some(keepalive));
http.set_reuse_address(true);
http.enforce_http(false);
let https = HttpsConnector::from((http, TlsConnector::new()?.into()));
Ok(Client::builder().set_host(false).build::<_, Body>(https))
Ok(Client::builder(TokioExecutor::new())
.set_host(false)
.build::<_, Body>(https))
}

type ProvidersResult = Result<(BTreeMap<String, providers::Provider>, BTreeSet<String>), TestError>;
Expand Down
35 changes: 24 additions & 11 deletions src/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@ use futures::{
sink::SinkExt,
stream, FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
};
use http_body_util::{combinators::BoxBody, BodyExt, StreamBody};
use hyper::{
client::HttpConnector,
header::{Entry as HeaderEntry, HeaderName, HeaderValue, CONTENT_DISPOSITION},
Body as HyperBody, Client, Method, Response,
Method, Response,
};
use hyper_tls::HttpsConnector;
use hyper_util::client::legacy::{
connect::{dns::GaiResolver, HttpConnector},
Client,
};
use rand::distributions::{Alphanumeric, Distribution};
use select_any::select_any;
use serde_json as json;
Expand Down Expand Up @@ -55,6 +59,8 @@ use std::{
time::{Duration, Instant},
};

type HyperBody = BoxBody<Bytes, std::io::Error>;

#[derive(Clone)]
pub struct AutoReturn {
send_option: EndpointProvidesSendOptions,
Expand Down Expand Up @@ -203,8 +209,7 @@ pub struct BuilderContext {
#[allow(dead_code)]
pub config_path: PathBuf,
// the http client
pub client:
Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
pub client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
// a mapping of names to their prospective providers
pub providers: Arc<BTreeMap<String, providers::Provider>>,
// a mapping of names to their prospective loggers
Expand Down Expand Up @@ -498,7 +503,7 @@ fn multipart_body_as_hyper_body(
let piece_stream = future::ok(Bytes::from(piece_data)).into_stream();
tweak_path(&mut body, &multipart_body.path);
let a = create_file_hyper_body(body).map_ok(move |(bytes, body)| {
let stream = piece_stream.chain(body).a();
let stream = piece_stream.chain(body.into_data_stream()).a();
(bytes + piece_data_bytes, stream)
});
Either::A(a)
Expand Down Expand Up @@ -539,7 +544,9 @@ fn multipart_body_as_hyper_body(
.flatten()
.chain(stream::once(future::ok(closing_boundary)));

(bytes, HyperBody::wrap_stream(stream))
let body: HyperBody =
BodyExt::boxed(StreamBody::new(stream.map_ok(hyper::body::Frame::data)));
(bytes, body)
});
Ok(ret)
}
Expand Down Expand Up @@ -569,7 +576,9 @@ async fn create_file_hyper_body(filename: String) -> Result<(u64, HyperBody), Te
}
});

let body = HyperBody::wrap_stream(stream);
let body: HyperBody = BodyExt::boxed(StreamBody::new(
stream.map_ok(|x| hyper::body::Frame::data(x.into())),
));
Ok((bytes, body))
}

Expand All @@ -592,7 +601,7 @@ fn body_template_as_hyper_body(
);
return Either3::A(future::ready(r).and_then(|x| x));
}
BodyTemplate::None => return Either3::B(future::ok((0, HyperBody::empty()))),
BodyTemplate::None => return Either3::B(future::ok((0, BoxBody::default()))),
BodyTemplate::String(t) => t,
};
let mut body = match template.evaluate(Cow::Borrowed(template_values.as_json()), None) {
Expand All @@ -609,7 +618,10 @@ fn body_template_as_hyper_body(
if copy_body_value {
*body_value = Some(body.clone());
}
Either3::B(future::ok((body.as_bytes().len() as u64, body.into())))
Either3::B(future::ok((
body.as_bytes().len() as u64,
body.map_err(|never| match never {}).boxed(),
)))
}
}

Expand All @@ -622,7 +634,7 @@ pub type StatsTx = futures_channel::UnboundedSender<stats::StatsMessage>;

pub struct Endpoint {
body: BodyTemplate,
client: Arc<Client<HttpsConnector<HttpConnector<hyper::client::connect::dns::GaiResolver>>>>,
client: Arc<Client<HttpsConnector<HttpConnector<GaiResolver>>, HyperBody>>,
headers: Vec<(String, Template)>,
max_parallel_requests: Option<NonZeroUsize>,
method: Method,
Expand Down Expand Up @@ -919,7 +931,8 @@ mod tests {
let (_, body) = create_file_hyper_body("tests/test.jpg".to_string())
.await
.unwrap();
body.map(|b| stream::iter(b.unwrap()))
body.into_data_stream()
.map(|b| stream::iter(b.unwrap()))
.flatten()
.collect::<Vec<_>>()
.await
Expand Down
Loading

0 comments on commit 376feed

Please sign in to comment.