From b939cfe9977dd96dc448e40a87fdaf4301e79a55 Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Sat, 7 Sep 2019 18:45:10 +0800 Subject: [PATCH 1/4] Fix broken examples, remove use of RetryPolicy --- Cargo.toml | 1 + src/http-client.rs | 20 ++++++-------------- src/http-server.rs | 11 ++++------- 3 files changed, 11 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 09bffa8..f80c21d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,3 +18,4 @@ hyper = "0.12.27" tokio = "0.1.19" tower = { git = "https://github.com/tower-rs/tower" } tower-hyper = { git = "https://github.com/tower-rs/tower-hyper" } +tower-reconnect = { git = "https://github.com/tower-rs/tower" } diff --git a/src/http-client.rs b/src/http-client.rs index 52f4f64..b05ffa8 100644 --- a/src/http-client.rs +++ b/src/http-client.rs @@ -4,12 +4,9 @@ use hyper::{ Request, Response, Uri, }; use std::time::Duration; -use tower::{builder::ServiceBuilder, reconnect::Reconnect, Service, ServiceExt}; -use tower_hyper::{ - client::{Builder, Connect}, - retry::{Body, RetryPolicy}, - util::Connector, -}; +use tower::{builder::ServiceBuilder, Service, ServiceExt}; +use tower_hyper::{client::Connect, util::Connector, Body}; +use tower_reconnect::Reconnect; fn main() { let fut = futures::lazy(|| { @@ -22,25 +19,20 @@ fn main() { fn request() -> impl Future, Error = ()> { let connector = Connector::new(HttpConnector::new(1)); - let hyper = Connect::new(connector, Builder::new()); + let hyper = Connect::new(connector); - // RetryPolicy is a very simple policy that retries `n` times - // if the response has a 500 status code. Here, `n` is 5. - let policy = RetryPolicy::new(5); // We're calling the tower/examples/server.rs. let dst = Destination::try_from_uri(Uri::from_static("http://127.0.0.1:3000")).unwrap(); // Now, to build the service! We use two BufferLayers in order to: // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer - // - meet `RetryLayer`'s requirement that our service implement `Service + Clone` // - ..and to provide cheap clones on the service. let maker = ServiceBuilder::new() .buffer(5) .rate_limit(5, Duration::from_secs(1)) .concurrency_limit(5) - .retry(policy) .buffer(5) - .make_service(hyper); + .service(hyper); // `Reconnect` accepts a destination and a MakeService, creating a new service // any time the connection encounters an error. @@ -57,7 +49,7 @@ fn request() -> impl Future, Error = ()> { .map_err(|e| panic!("Service is not ready: {:?}", e)) .and_then(|mut c| { c.call(request) - .map(|res| res.map(|b| b.into_inner())) + // .map(|res| res.map(|b| b.into_inner())) .map_err(|e| panic!("{:?}", e)) }) } diff --git a/src/http-server.rs b/src/http-server.rs index c6e0f41..0a3a28d 100644 --- a/src/http-server.rs +++ b/src/http-server.rs @@ -2,7 +2,7 @@ use futures::{future, Future, Poll, Stream}; use hyper::{self, Body, Request, Response}; use tokio::net::TcpListener; use tower::{builder::ServiceBuilder, Service}; -use tower_hyper::{body::LiftBody, server::Server}; +use tower_hyper::server::Server; fn main() { hyper::rt::run(future::lazy(|| { @@ -11,10 +11,7 @@ fn main() { println!("Listening on http://{}", addr); - let maker = ServiceBuilder::new() - .concurrency_limit(5) - .make_service(MakeSvc); - + let maker = ServiceBuilder::new().concurrency_limit(5).service(MakeSvc); let server = Server::new(maker); bind.incoming() @@ -37,7 +34,7 @@ fn main() { } struct Svc; -impl Service>> for Svc { +impl Service> for Svc { type Response = Response<&'static str>; type Error = hyper::Error; type Future = future::FutureResult; @@ -46,7 +43,7 @@ impl Service>> for Svc { Ok(().into()) } - fn call(&mut self, _req: Request>) -> Self::Future { + fn call(&mut self, _req: Request) -> Self::Future { let res = Response::new("Hello World!"); future::ok(res) } From b2dbc4b052730e12bb389097efebebce33ddb77e Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Sat, 7 Sep 2019 18:47:27 +0800 Subject: [PATCH 2/4] Delete commented code --- src/http-client.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/http-client.rs b/src/http-client.rs index b05ffa8..f848fbc 100644 --- a/src/http-client.rs +++ b/src/http-client.rs @@ -47,9 +47,5 @@ fn request() -> impl Future, Error = ()> { client .ready() .map_err(|e| panic!("Service is not ready: {:?}", e)) - .and_then(|mut c| { - c.call(request) - // .map(|res| res.map(|b| b.into_inner())) - .map_err(|e| panic!("{:?}", e)) - }) + .and_then(|mut c| c.call(request).map_err(|e| panic!("{:?}", e))) } From 49b6c8b4cbf8e164c945beb09c9ecb33cf998206 Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Sat, 7 Sep 2019 18:49:53 +0800 Subject: [PATCH 3/4] Remove outdated reference to MakeService --- src/http-client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/http-client.rs b/src/http-client.rs index f848fbc..6153f3a 100644 --- a/src/http-client.rs +++ b/src/http-client.rs @@ -27,16 +27,16 @@ fn request() -> impl Future, Error = ()> { // Now, to build the service! We use two BufferLayers in order to: // - provide backpressure for the RateLimitLayer, and ConcurrencyLimitLayer // - ..and to provide cheap clones on the service. - let maker = ServiceBuilder::new() + let service = ServiceBuilder::new() .buffer(5) .rate_limit(5, Duration::from_secs(1)) .concurrency_limit(5) .buffer(5) .service(hyper); - // `Reconnect` accepts a destination and a MakeService, creating a new service + // `Reconnect` accepts a destination and a Service, creating a new service // any time the connection encounters an error. - let client = Reconnect::new(maker, dst); + let client = Reconnect::new(service, dst); let request = Request::builder() .method("GET") From 8c665550cfc43a573d8896311da311ddc596c192 Mon Sep 17 00:00:00 2001 From: Eyal Kalderon Date: Mon, 9 Sep 2019 01:32:52 +0800 Subject: [PATCH 4/4] Return Response from server --- src/http-server.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/http-server.rs b/src/http-server.rs index 0a3a28d..c34695f 100644 --- a/src/http-server.rs +++ b/src/http-server.rs @@ -35,7 +35,7 @@ fn main() { struct Svc; impl Service> for Svc { - type Response = Response<&'static str>; + type Response = Response; type Error = hyper::Error; type Future = future::FutureResult; @@ -44,7 +44,7 @@ impl Service> for Svc { } fn call(&mut self, _req: Request) -> Self::Future { - let res = Response::new("Hello World!"); + let res = Response::new(Body::from("Hello World!")); future::ok(res) } }