diff --git a/clippy.toml b/clippy.toml index dd2f76ff7..eff324504 100644 --- a/clippy.toml +++ b/clippy.toml @@ -1 +1,8 @@ disallowed-names = [] +disallowed-macros = [ + "futures::ready", # use instead `std::task::ready` + "futures::pin_mut", # use instead `std::pin::pin` +] +disallowed-methods = [ + "futures::future::ready", # use instead `std::future::ready` +] diff --git a/examples/event_watcher.rs b/examples/event_watcher.rs index dcdd64b55..d05de1188 100644 --- a/examples/event_watcher.rs +++ b/examples/event_watcher.rs @@ -1,4 +1,6 @@ -use futures::{pin_mut, TryStreamExt}; +use std::pin::pin; + +use futures::TryStreamExt; use k8s_openapi::{ api::{core::v1::ObjectReference, events::v1::Event}, apimachinery::pkg::apis::meta::v1::Time, @@ -37,7 +39,7 @@ async fn main() -> anyhow::Result<()> { } } let event_stream = watcher(events, conf).default_backoff().applied_objects(); - pin_mut!(event_stream); + let mut event_stream = pin!(event_stream); println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE"); while let Some(ev) = event_stream.try_next().await? { diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 32339bdf3..4444e5757 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -1,4 +1,6 @@ -use futures::{pin_mut, TryStreamExt}; +use std::pin::pin; + +use futures::TryStreamExt; use k8s_openapi::api::core::v1::Node; use kube::{ api::{Api, ResourceExt}, @@ -23,6 +25,7 @@ async fn main() -> anyhow::Result<()> { .reflect(writer) .applied_objects() .predicate_filter(predicates::labels.combine(predicates::annotations)); // NB: requires an unstable feature + let mut stream = pin!(stream); // Periodically read our state in the background tokio::spawn(async move { @@ -35,7 +38,6 @@ async fn main() -> anyhow::Result<()> { }); // Log applied events with changes from the reflector - pin_mut!(stream); while let Some(node) = stream.try_next().await? { info!("saw node {} with new labels/annots", node.name_any()); } diff --git a/examples/node_watcher.rs b/examples/node_watcher.rs index a27756a26..4ed4c0d2e 100644 --- a/examples/node_watcher.rs +++ b/examples/node_watcher.rs @@ -1,4 +1,6 @@ -use futures::{pin_mut, TryStreamExt}; +use std::pin::pin; + +use futures::TryStreamExt; use k8s_openapi::api::core::v1::{Event, Node}; use kube::{ api::{Api, ListParams, ResourceExt}, @@ -21,8 +23,8 @@ async fn main() -> anyhow::Result<()> { watcher::Config::default() }; let obs = watcher(nodes, wc).default_backoff().applied_objects(); + let mut obs = pin!(obs); - pin_mut!(obs); while let Some(n) = obs.try_next().await? { check_for_node_failures(&client, n).await?; } diff --git a/examples/pod_reflector.rs b/examples/pod_reflector.rs index ab41f791f..a93af1b6b 100644 --- a/examples/pod_reflector.rs +++ b/examples/pod_reflector.rs @@ -1,3 +1,5 @@ +use std::pin::pin; + use futures::TryStreamExt; use k8s_openapi::api::core::v1::Pod; use kube::{ @@ -40,7 +42,7 @@ async fn main() -> anyhow::Result<()> { .reflect(writer) .applied_objects() .predicate_filter(predicates::resource_version); // NB: requires an unstable feature - futures::pin_mut!(stream); + let mut stream = pin!(stream); while let Some(pod) = stream.try_next().await? { info!("saw {}", pod.name_any()); diff --git a/kube-client/src/client/middleware/mod.rs b/kube-client/src/client/middleware/mod.rs index a30906c76..e6ef1bbe4 100644 --- a/kube-client/src/client/middleware/mod.rs +++ b/kube-client/src/client/middleware/mod.rs @@ -27,10 +27,9 @@ impl Layer for AuthLayer { mod tests { use super::*; - use std::{matches, sync::Arc}; + use std::{matches, pin::pin, sync::Arc}; use chrono::{Duration, Utc}; - use futures::pin_mut; use http::{header::AUTHORIZATION, HeaderValue, Request, Response}; use secrecy::SecretString; use tokio::sync::Mutex; @@ -52,7 +51,7 @@ mod tests { let spawned = tokio::spawn(async move { // Receive the requests and respond - pin_mut!(handle); + let mut handle = pin!(handle); let (request, send) = handle.next_request().await.expect("service not called"); assert_eq!( request.headers().get(AUTHORIZATION).unwrap(), diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index 300a3bb81..6b65e0941 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -8,7 +8,7 @@ //! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically //! retrieve the resources served by the kubernetes API. use either::{Either, Left, Right}; -use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt}; +use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt}; use http::{self, Request, Response}; use http_body_util::BodyExt; #[cfg(feature = "ws")] use hyper_util::rt::TokioIo; @@ -492,9 +492,10 @@ impl TryFrom for Client { #[cfg(test)] mod tests { + use std::pin::pin; + use crate::{client::Body, Api, Client}; - use futures::pin_mut; use http::{Request, Response}; use k8s_openapi::api::core::v1::Pod; use tower_test::mock; @@ -511,7 +512,7 @@ mod tests { let (mock_service, handle) = mock::pair::, Response>(); let spawned = tokio::spawn(async move { // Receive a request for pod and respond with some data - pin_mut!(handle); + let mut handle = pin!(handle); let (request, send) = handle.next_request().await.expect("service not called"); assert_eq!(request.method(), http::Method::GET); assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test"); diff --git a/kube-runtime/src/controller/future_hash_map.rs b/kube-runtime/src/controller/future_hash_map.rs index 67256c1b2..e3382dad6 100644 --- a/kube-runtime/src/controller/future_hash_map.rs +++ b/kube-runtime/src/controller/future_hash_map.rs @@ -77,10 +77,10 @@ where #[cfg(test)] mod tests { - use std::task::Poll; + use std::{future, task::Poll}; use super::FutureHashMap; - use futures::{channel::mpsc, future, poll, StreamExt}; + use futures::{channel::mpsc, poll, StreamExt}; #[tokio::test] async fn fhm_should_forward_all_values_and_shut_down() { diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index ef4c662a7..b449d5ecc 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -16,16 +16,17 @@ use derivative::Derivative; use futures::{ channel, future::{self, BoxFuture}, - ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, + stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt, }; use kube_client::api::{Api, DynamicObject, Resource}; use pin_project::pin_project; use serde::de::DeserializeOwned; use std::{ fmt::{Debug, Display}, + future::Future, hash::Hash, sync::Arc, - task::Poll, + task::{ready, Poll}, time::Duration, }; use stream::BoxStream; @@ -326,7 +327,8 @@ where .instrument(reconciler_span) .left_future() } - None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), + None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase()))) + .right_future(), } }, ) @@ -1155,7 +1157,7 @@ where /// use kube::{Api, Client, ResourceExt}; /// use kube_runtime::{ /// controller::{Controller, Action}, - /// watcher, + /// watcher, /// }; /// use std::{convert::Infallible, sync::Arc}; /// Controller::new( @@ -1274,7 +1276,7 @@ where #[cfg(test)] mod tests { - use std::{convert::Infallible, sync::Arc, time::Duration}; + use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration}; use super::{Action, APPLIER_REQUEUE_BUF_SIZE}; use crate::{ @@ -1283,7 +1285,7 @@ mod tests { watcher::{self, metadata_watcher, watcher, Event}, Config, Controller, }; - use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + use futures::{Stream, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; use kube_client::{core::ObjectMeta, Api, Resource}; use serde::de::DeserializeOwned; @@ -1348,7 +1350,7 @@ mod tests { let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::>(); let (store_rx, mut store_tx) = reflector::store(); - let applier = applier( + let mut applier = pin!(applier( |obj, _| { Box::pin(async move { // Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately @@ -1361,8 +1363,7 @@ mod tests { store_rx, queue_rx.map(Result::<_, Infallible>::Ok), Config::default(), - ); - pin_mut!(applier); + )); for i in 0..items { let obj = ConfigMap { metadata: ObjectMeta { diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 95ed6f7e4..81900aa83 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -1,9 +1,10 @@ use super::future_hash_map::FutureHashMap; use crate::scheduler::{ScheduleRequest, Scheduler}; -use futures::{future, Future, FutureExt, Stream, StreamExt}; +use futures::{FutureExt, Stream, StreamExt}; use pin_project::pin_project; use std::{ convert::Infallible, + future::{self, Future}, hash::Hash, pin::Pin, task::{Context, Poll}, @@ -29,7 +30,7 @@ pub struct Runner>> { run_msg: MkF, slots: FutureHashMap, #[pin] - ready_to_execute_after: future::Fuse, + ready_to_execute_after: futures::future::Fuse, is_ready_to_execute: bool, stopped: bool, max_concurrent_executions: u16, @@ -163,8 +164,7 @@ mod tests { }; use futures::{ channel::{mpsc, oneshot}, - future::{self}, - poll, stream, Future, SinkExt, StreamExt, TryStreamExt, + future, poll, stream, Future, SinkExt, StreamExt, TryStreamExt, }; use std::{ cell::RefCell, diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 42a7d7431..c25b76721 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -31,9 +31,10 @@ pub use store::{store, Store}; /// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/). /// /// ```no_run +/// use std::future::ready; /// use k8s_openapi::api::core::v1::Node; /// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config}; -/// use futures::{StreamExt, future::ready}; +/// use futures::StreamExt; /// # use kube::api::Api; /// # async fn wrapper() -> Result<(), Box> { /// # let client: kube::Client = todo!(); diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index f972dec34..4c009bce2 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -286,8 +286,8 @@ mod tests { use super::{debounced_scheduler, scheduler, ScheduleRequest}; use derivative::Derivative; - use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt}; - use std::task::Poll; + use futures::{channel::mpsc, future, poll, stream, FutureExt, SinkExt, StreamExt}; + use std::{pin::pin, task::Poll}; use tokio::time::{advance, pause, sleep, Duration, Instant}; fn unwrap_poll(poll: Poll) -> T { @@ -379,7 +379,7 @@ mod tests { #[tokio::test] async fn scheduler_should_emit_items_as_requested() { pause(); - let scheduler = scheduler( + let mut scheduler = pin!(scheduler( stream::iter(vec![ ScheduleRequest { message: 1_u8, @@ -391,8 +391,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - ); - pin_mut!(scheduler); + )); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 1); @@ -406,7 +405,7 @@ mod tests { #[tokio::test] async fn scheduler_dedupe_should_keep_earlier_item() { pause(); - let scheduler = scheduler( + let mut scheduler = pin!(scheduler( stream::iter(vec![ ScheduleRequest { message: (), @@ -418,8 +417,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - ); - pin_mut!(scheduler); + )); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; scheduler.next().now_or_never().unwrap().unwrap(); @@ -430,7 +428,7 @@ mod tests { #[tokio::test] async fn scheduler_dedupe_should_replace_later_item() { pause(); - let scheduler = scheduler( + let mut scheduler = pin!(scheduler( stream::iter(vec![ ScheduleRequest { message: (), @@ -442,8 +440,7 @@ mod tests { }, ]) .on_complete(sleep(Duration::from_secs(5))), - ); - pin_mut!(scheduler); + )); assert!(poll!(scheduler.next()).is_pending()); advance(Duration::from_secs(2)).await; scheduler.next().now_or_never().unwrap().unwrap(); diff --git a/kube-runtime/src/utils/delayed_init.rs b/kube-runtime/src/utils/delayed_init.rs index d544ae1a9..493947a23 100644 --- a/kube-runtime/src/utils/delayed_init.rs +++ b/kube-runtime/src/utils/delayed_init.rs @@ -102,10 +102,10 @@ pub struct InitDropped; #[cfg(test)] mod tests { - use std::task::Poll; + use std::{pin::pin, task::Poll}; use super::DelayedInit; - use futures::{pin_mut, poll}; + use futures::poll; use tracing::Level; use tracing_subscriber::util::SubscriberInitExt; @@ -121,8 +121,7 @@ mod tests { async fn must_allow_single_reader() { let _tracing = setup_tracing(); let (tx, rx) = DelayedInit::::new(); - let get1 = rx.get(); - pin_mut!(get1); + let mut get1 = pin!(rx.get()); assert_eq!(poll!(get1.as_mut()), Poll::Pending); tx.init(1); assert_eq!(poll!(get1), Poll::Ready(Ok(1))); @@ -132,10 +131,9 @@ mod tests { async fn must_allow_concurrent_readers_while_waiting() { let _tracing = setup_tracing(); let (tx, rx) = DelayedInit::::new(); - let get1 = rx.get(); - let get2 = rx.get(); - let get3 = rx.get(); - pin_mut!(get1, get2, get3); + let mut get1 = pin!(rx.get()); + let mut get2 = pin!(rx.get()); + let mut get3 = pin!(rx.get()); assert_eq!(poll!(get1.as_mut()), Poll::Pending); assert_eq!(poll!(get2.as_mut()), Poll::Pending); assert_eq!(poll!(get3.as_mut()), Poll::Pending); @@ -149,8 +147,7 @@ mod tests { async fn must_allow_reading_after_init() { let _tracing = setup_tracing(); let (tx, rx) = DelayedInit::::new(); - let get1 = rx.get(); - pin_mut!(get1); + let mut get1 = pin!(rx.get()); assert_eq!(poll!(get1.as_mut()), Poll::Pending); tx.init(1); assert_eq!(poll!(get1), Poll::Ready(Ok(1))); @@ -162,10 +159,9 @@ mod tests { async fn must_allow_concurrent_readers_in_any_order() { let _tracing = setup_tracing(); let (tx, rx) = DelayedInit::::new(); - let get1 = rx.get(); - let get2 = rx.get(); - let get3 = rx.get(); - pin_mut!(get1, get2, get3); + let mut get1 = pin!(rx.get()); + let mut get2 = pin!(rx.get()); + let mut get3 = pin!(rx.get()); assert_eq!(poll!(get1.as_mut()), Poll::Pending); assert_eq!(poll!(get2.as_mut()), Poll::Pending); assert_eq!(poll!(get3.as_mut()), Poll::Pending); diff --git a/kube-runtime/src/utils/event_flatten.rs b/kube-runtime/src/utils/event_flatten.rs index b4834662b..b73a80b44 100644 --- a/kube-runtime/src/utils/event_flatten.rs +++ b/kube-runtime/src/utils/event_flatten.rs @@ -1,9 +1,9 @@ use crate::watcher::{Error, Event}; use core::{ pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; -use futures::{ready, Stream, TryStream}; +use futures::{Stream, TryStream}; use pin_project::pin_project; #[pin_project] @@ -58,10 +58,10 @@ where #[cfg(test)] pub(crate) mod tests { - use std::task::Poll; + use std::{pin::pin, task::Poll}; use super::{Error, Event, EventFlatten}; - use futures::{pin_mut, poll, stream, StreamExt}; + use futures::{poll, stream, StreamExt}; #[tokio::test] async fn watches_applies_uses_correct_eventflattened_stream() { @@ -74,8 +74,7 @@ pub(crate) mod tests { Err(Error::TooManyObjects), Ok(Event::Applied(2)), ]); - let rx = EventFlatten::new(data, false); - pin_mut!(rx); + let mut rx = pin!(EventFlatten::new(data, false)); assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(0))))); assert!(matches!(poll!(rx.next()), Poll::Ready(Some(Ok(1))))); // NB: no Deleted events here diff --git a/kube-runtime/src/utils/event_modify.rs b/kube-runtime/src/utils/event_modify.rs index 8dfa2275a..09fb4685e 100644 --- a/kube-runtime/src/utils/event_modify.rs +++ b/kube-runtime/src/utils/event_modify.rs @@ -46,10 +46,10 @@ where #[cfg(test)] pub(crate) mod test { - use std::{task::Poll, vec}; + use std::{pin::pin, task::Poll, vec}; use super::{Error, Event, EventModify}; - use futures::{pin_mut, poll, stream, StreamExt}; + use futures::{poll, stream, StreamExt}; #[tokio::test] async fn eventmodify_modifies_innner_value_of_event() { @@ -58,10 +58,9 @@ pub(crate) mod test { Err(Error::TooManyObjects), Ok(Event::Restarted(vec![10])), ]); - let ev_modify = EventModify::new(st, |x| { + let mut ev_modify = pin!(EventModify::new(st, |x| { *x += 1; - }); - pin_mut!(ev_modify); + })); assert!(matches!( poll!(ev_modify.next()), diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index bdf85b227..b5039fab7 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -22,14 +22,13 @@ pub use stream_subscribe::StreamSubscribe; pub use watch_ext::WatchStreamExt; use futures::{ - pin_mut, stream::{self, Peekable}, Future, FutureExt, Stream, StreamExt, TryStream, TryStreamExt, }; use pin_project::pin_project; use std::{ fmt::Debug, - pin::Pin, + pin::{pin, Pin}, sync::{Arc, Mutex}, task::Poll, }; @@ -77,8 +76,7 @@ where // TODO: remove #[allow] once fix reaches nightly. let inner = this.inner.lock().unwrap(); let mut inner = Pin::new(inner); - let inner_peek = inner.as_mut().peek(); - pin_mut!(inner_peek); + let inner_peek = pin!(inner.as_mut().peek()); match inner_peek.poll(cx) { Poll::Ready(Some(x_ref)) => { if (this.should_consume_item)(x_ref) { diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs index 80f37e4ca..e94f64960 100644 --- a/kube-runtime/src/utils/predicate.rs +++ b/kube-runtime/src/utils/predicate.rs @@ -1,9 +1,9 @@ use crate::{reflector::ObjectRef, watcher::Error}; use core::{ pin::Pin, - task::{Context, Poll}, + task::{ready, Context, Poll}, }; -use futures::{ready, Stream}; +use futures::Stream; use kube_client::Resource; use pin_project::pin_project; use std::{ @@ -195,10 +195,10 @@ pub mod predicates { #[cfg(test)] pub(crate) mod tests { - use std::task::Poll; + use std::{pin::pin, task::Poll}; use super::{predicates, Error, PredicateFilter}; - use futures::{pin_mut, poll, stream, FutureExt, StreamExt}; + use futures::{poll, stream, FutureExt, StreamExt}; use kube_client::Resource; use serde_json::json; @@ -229,8 +229,7 @@ pub(crate) mod tests { Ok(mkobj(1)), Ok(mkobj(2)), ]); - let rx = PredicateFilter::new(data, predicates::generation); - pin_mut!(rx); + let mut rx = pin!(PredicateFilter::new(data, predicates::generation)); // mkobj(1) passed through let first = rx.next().now_or_never().unwrap().unwrap().unwrap(); diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs index 43fa65c2a..00c216218 100644 --- a/kube-runtime/src/utils/reflect.rs +++ b/kube-runtime/src/utils/reflect.rs @@ -54,11 +54,11 @@ where #[cfg(test)] pub(crate) mod test { - use std::{task::Poll, vec}; + use std::{pin::pin, task::Poll, vec}; use super::{Error, Event, Reflect}; use crate::reflector; - use futures::{pin_mut, poll, stream, StreamExt}; + use futures::{poll, stream, StreamExt}; use k8s_openapi::api::core::v1::Pod; fn testpod(name: &str) -> Pod { @@ -78,8 +78,7 @@ pub(crate) mod test { ]); let (reader, writer) = reflector::store(); - let reflect = Reflect::new(st, writer); - pin_mut!(reflect); + let mut reflect = pin!(Reflect::new(st, writer)); assert_eq!(reader.len(), 0); assert!(matches!( diff --git a/kube-runtime/src/utils/stream_backoff.rs b/kube-runtime/src/utils/stream_backoff.rs index efb8d6096..01c6c4292 100644 --- a/kube-runtime/src/utils/stream_backoff.rs +++ b/kube-runtime/src/utils/stream_backoff.rs @@ -1,7 +1,7 @@ -use std::{pin::Pin, task::Poll}; +use std::{future::Future, pin::Pin, task::Poll}; use backoff::backoff::Backoff; -use futures::{Future, Stream, TryStream}; +use futures::{Stream, TryStream}; use pin_project::pin_project; use tokio::time::{sleep, Instant, Sleep}; @@ -96,19 +96,18 @@ impl Stream for StreamBackoff { #[cfg(test)] pub(crate) mod tests { - use std::{task::Poll, time::Duration}; + use std::{pin::pin, task::Poll, time::Duration}; use super::StreamBackoff; use backoff::backoff::Backoff; - use futures::{channel::mpsc, pin_mut, poll, stream, StreamExt}; + use futures::{channel::mpsc, poll, stream, StreamExt}; #[tokio::test] async fn stream_should_back_off() { tokio::time::pause(); let tick = Duration::from_secs(1); let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let rx = StreamBackoff::new(rx, backoff::backoff::Constant::new(tick)); - pin_mut!(rx); + let mut rx = pin!(StreamBackoff::new(rx, backoff::backoff::Constant::new(tick))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(1)))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Err(2)))); @@ -124,8 +123,7 @@ pub(crate) mod tests { tokio::time::pause(); let (tx, rx) = mpsc::unbounded(); // let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3)]); - let rx = StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2))); - pin_mut!(rx); + let mut rx = pin!(StreamBackoff::new(rx, LinearBackoff::new(Duration::from_secs(2)))); tx.unbounded_send(Ok(0)).unwrap(); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Ok(0)))); tx.unbounded_send(Ok(1)).unwrap(); diff --git a/kube-runtime/src/utils/stream_subscribe.rs b/kube-runtime/src/utils/stream_subscribe.rs index 45ad18b9b..2f0be443d 100644 --- a/kube-runtime/src/utils/stream_subscribe.rs +++ b/kube-runtime/src/utils/stream_subscribe.rs @@ -99,15 +99,16 @@ impl std::error::Error for Error {} #[cfg(test)] mod tests { + use std::pin::pin; + use super::*; - use futures::{pin_mut, poll, stream, StreamExt}; + use futures::{poll, stream, StreamExt}; #[tokio::test] async fn stream_subscribe_continues_to_propagate_values() { let rx = stream::iter([Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]); - let rx = StreamSubscribe::new(rx); + let mut rx = pin!(StreamSubscribe::new(rx)); - pin_mut!(rx); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(0))))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Ok(1))))); assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(Err(2))))); @@ -120,14 +121,10 @@ mod tests { async fn all_subscribers_get_events() { let events = [Ok(0), Ok(1), Err(2), Ok(3), Ok(4)]; let rx = stream::iter(events); - let rx = StreamSubscribe::new(rx); - - let rx_s1 = rx.subscribe(); - let rx_s2 = rx.subscribe(); + let mut rx = pin!(StreamSubscribe::new(rx)); - pin_mut!(rx); - pin_mut!(rx_s1); - pin_mut!(rx_s2); + let mut rx_s1 = pin!(rx.subscribe()); + let mut rx_s2 = pin!(rx.subscribe()); // Subscribers are pending until we start consuming the stream assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1"); @@ -150,12 +147,9 @@ mod tests { async fn subscribers_can_catch_up_to_the_main_stream() { let events = (0..CHANNEL_CAPACITY).map(Ok::<_, ()>).collect::>(); let rx = stream::iter(events.clone()); - let rx = StreamSubscribe::new(rx); - - let rx_s1 = rx.subscribe(); + let mut rx = pin!(StreamSubscribe::new(rx)); - pin_mut!(rx); - pin_mut!(rx_s1); + let mut rx_s1 = pin!(rx.subscribe()); for item in events.clone() { assert_eq!(poll!(rx.next()), Poll::Ready(Some(Arc::new(item))), "rx",); @@ -177,12 +171,9 @@ mod tests { let overflow = 5; let events = (0..max_capacity + overflow).collect::>(); let rx = stream::iter(events.clone()); - let rx = StreamSubscribe::new(rx); + let mut rx = pin!(StreamSubscribe::new(rx)); - let rx_s1 = rx.subscribe(); - - pin_mut!(rx); - pin_mut!(rx_s1); + let mut rx_s1 = pin!(rx.subscribe()); // Consume the entire stream, overflowing the inner channel for _ in events { @@ -208,14 +199,10 @@ mod tests { let overflow = 5; let events = (0..max_capacity + overflow).collect::>(); let rx = stream::iter(events.clone()); - let rx = StreamSubscribe::new(rx); - - let rx_s1 = rx.subscribe(); - let rx_s2 = rx.subscribe(); + let mut rx = pin!(StreamSubscribe::new(rx)); - pin_mut!(rx); - pin_mut!(rx_s1); - pin_mut!(rx_s2); + let mut rx_s1 = pin!(rx.subscribe()); + let mut rx_s2 = pin!(rx.subscribe()); for event in events { assert_eq!(poll!(rx_s1.next()), Poll::Pending, "rx_s1"); diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 6f9994586..fca3a4a4d 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -61,20 +61,20 @@ pub trait WatchStreamExt: Stream { /// Stream shorthand for `stream.map_ok(|event| { event.modify(f) })`. /// /// ```no_run - /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// # use std::pin::pin; + /// # use futures::{Stream, StreamExt, TryStreamExt}; /// # use kube::{Api, Client, ResourceExt}; /// # use kube_runtime::{watcher, WatchStreamExt}; /// # use k8s_openapi::api::apps::v1::Deployment; /// # async fn wrapper() -> Result<(), Box> { /// # let client: kube::Client = todo!(); /// let deploys: Api = Api::all(client); - /// let truncated_deploy_stream = watcher(deploys, watcher::Config::default()) + /// let mut truncated_deploy_stream = pin!(watcher(deploys, watcher::Config::default()) /// .modify(|deploy| { /// deploy.managed_fields_mut().clear(); /// deploy.status = None; /// }) - /// .applied_objects(); - /// pin_mut!(truncated_deploy_stream); + /// .applied_objects()); /// /// while let Some(d) = truncated_deploy_stream.try_next().await? { /// println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?); @@ -100,17 +100,17 @@ pub trait WatchStreamExt: Stream { /// /// ## Usage /// ```no_run - /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// # use std::pin::pin; + /// # use futures::{Stream, StreamExt, TryStreamExt}; /// use kube::{Api, Client, ResourceExt}; /// use kube_runtime::{watcher, WatchStreamExt, predicates}; /// use k8s_openapi::api::apps::v1::Deployment; /// # async fn wrapper() -> Result<(), Box> { /// # let client: kube::Client = todo!(); /// let deploys: Api = Api::default_namespaced(client); - /// let changed_deploys = watcher(deploys, watcher::Config::default()) + /// let mut changed_deploys = pin!(watcher(deploys, watcher::Config::default()) /// .applied_objects() - /// .predicate_filter(predicates::generation); - /// pin_mut!(changed_deploys); + /// .predicate_filter(predicates::generation)); /// /// while let Some(d) = changed_deploys.try_next().await? { /// println!("saw Deployment '{} with hitherto unseen generation", d.name_any()); @@ -200,7 +200,7 @@ pub trait WatchStreamExt: Stream { /// /// ## Usage /// ```no_run - /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// # use futures::{Stream, StreamExt, TryStreamExt}; /// # use std::time::Duration; /// # use tracing::{info, warn}; /// use kube::{Api, Client, ResourceExt}; diff --git a/kube-runtime/src/wait.rs b/kube-runtime/src/wait.rs index e657cbb36..e7451cb60 100644 --- a/kube-runtime/src/wait.rs +++ b/kube-runtime/src/wait.rs @@ -1,4 +1,6 @@ //! Waits for objects to reach desired states +use std::{future, pin::pin}; + use futures::TryStreamExt; use kube_client::{Api, Resource}; use serde::de::DeserializeOwned; @@ -52,11 +54,10 @@ where K: Clone + Debug + Send + DeserializeOwned + Resource + 'static, { // Skip updates until the condition is satisfied. - let stream = watch_object(api, name).try_skip_while(|obj| { + let mut stream = pin!(watch_object(api, name).try_skip_while(|obj| { let matches = cond.matches_object(obj.as_ref()); - futures::future::ok(!matches) - }); - futures::pin_mut!(stream); + future::ready(Ok(!matches)) + })); // Then take the first update that satisfies the condition. let obj = stream