Skip to content

Commit

Permalink
replace futures crate api with standard library (kube-rs#1461)
Browse files Browse the repository at this point in the history
* replace futures crate api with standard library

Signed-off-by: tottoto <[email protected]>

* Move pin value in examples to independent sentence

Signed-off-by: tottoto <[email protected]>

---------

Signed-off-by: tottoto <[email protected]>
  • Loading branch information
tottoto authored Apr 5, 2024
1 parent cb77247 commit dac48d9
Show file tree
Hide file tree
Showing 22 changed files with 117 additions and 127 deletions.
7 changes: 7 additions & 0 deletions clippy.toml
Original file line number Diff line number Diff line change
@@ -1 +1,8 @@
disallowed-names = []
disallowed-macros = [
"futures::ready", # use instead `std::task::ready`
"futures::pin_mut", # use instead `std::pin::pin`
]
disallowed-methods = [
"futures::future::ready", # use instead `std::future::ready`
]
6 changes: 4 additions & 2 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::{pin_mut, TryStreamExt};
use std::pin::pin;

use futures::TryStreamExt;
use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event},
apimachinery::pkg::apis::meta::v1::Time,
Expand Down Expand Up @@ -37,7 +39,7 @@ async fn main() -> anyhow::Result<()> {
}
}
let event_stream = watcher(events, conf).default_backoff().applied_objects();
pin_mut!(event_stream);
let mut event_stream = pin!(event_stream);

println!("{0:<6} {1:<15} {2:<55} {3}", "AGE", "REASON", "OBJECT", "MESSAGE");
while let Some(ev) = event_stream.try_next().await? {
Expand Down
6 changes: 4 additions & 2 deletions examples/node_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::{pin_mut, TryStreamExt};
use std::pin::pin;

use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Node;
use kube::{
api::{Api, ResourceExt},
Expand All @@ -23,6 +25,7 @@ async fn main() -> anyhow::Result<()> {
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::labels.combine(predicates::annotations)); // NB: requires an unstable feature
let mut stream = pin!(stream);

// Periodically read our state in the background
tokio::spawn(async move {
Expand All @@ -35,7 +38,6 @@ async fn main() -> anyhow::Result<()> {
});

// Log applied events with changes from the reflector
pin_mut!(stream);
while let Some(node) = stream.try_next().await? {
info!("saw node {} with new labels/annots", node.name_any());
}
Expand Down
6 changes: 4 additions & 2 deletions examples/node_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::{pin_mut, TryStreamExt};
use std::pin::pin;

use futures::TryStreamExt;
use k8s_openapi::api::core::v1::{Event, Node};
use kube::{
api::{Api, ListParams, ResourceExt},
Expand All @@ -21,8 +23,8 @@ async fn main() -> anyhow::Result<()> {
watcher::Config::default()
};
let obs = watcher(nodes, wc).default_backoff().applied_objects();
let mut obs = pin!(obs);

pin_mut!(obs);
while let Some(n) = obs.try_next().await? {
check_for_node_failures(&client, n).await?;
}
Expand Down
4 changes: 3 additions & 1 deletion examples/pod_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::pin::pin;

use futures::TryStreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube::{
Expand Down Expand Up @@ -40,7 +42,7 @@ async fn main() -> anyhow::Result<()> {
.reflect(writer)
.applied_objects()
.predicate_filter(predicates::resource_version); // NB: requires an unstable feature
futures::pin_mut!(stream);
let mut stream = pin!(stream);

while let Some(pod) = stream.try_next().await? {
info!("saw {}", pod.name_any());
Expand Down
5 changes: 2 additions & 3 deletions kube-client/src/client/middleware/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ impl<S> Layer<S> for AuthLayer {
mod tests {
use super::*;

use std::{matches, sync::Arc};
use std::{matches, pin::pin, sync::Arc};

use chrono::{Duration, Utc};
use futures::pin_mut;
use http::{header::AUTHORIZATION, HeaderValue, Request, Response};
use secrecy::SecretString;
use tokio::sync::Mutex;
Expand All @@ -52,7 +51,7 @@ mod tests {

let spawned = tokio::spawn(async move {
// Receive the requests and respond
pin_mut!(handle);
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(
request.headers().get(AUTHORIZATION).unwrap(),
Expand Down
7 changes: 4 additions & 3 deletions kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
//! retrieve the resources served by the kubernetes API.
use either::{Either, Left, Right};
use futures::{self, AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt};
use http::{self, Request, Response};
use http_body_util::BodyExt;
#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
Expand Down Expand Up @@ -492,9 +492,10 @@ impl TryFrom<Config> for Client {

#[cfg(test)]
mod tests {
use std::pin::pin;

use crate::{client::Body, Api, Client};

use futures::pin_mut;
use http::{Request, Response};
use k8s_openapi::api::core::v1::Pod;
use tower_test::mock;
Expand All @@ -511,7 +512,7 @@ mod tests {
let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
let spawned = tokio::spawn(async move {
// Receive a request for pod and respond with some data
pin_mut!(handle);
let mut handle = pin!(handle);
let (request, send) = handle.next_request().await.expect("service not called");
assert_eq!(request.method(), http::Method::GET);
assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/future_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,10 @@ where

#[cfg(test)]
mod tests {
use std::task::Poll;
use std::{future, task::Poll};

use super::FutureHashMap;
use futures::{channel::mpsc, future, poll, StreamExt};
use futures::{channel::mpsc, poll, StreamExt};

#[tokio::test]
async fn fhm_should_forward_all_values_and_shut_down() {
Expand Down
19 changes: 10 additions & 9 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@ use derivative::Derivative;
use futures::{
channel,
future::{self, BoxFuture},
ready, stream, Future, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
stream, FutureExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
};
use kube_client::api::{Api, DynamicObject, Resource};
use pin_project::pin_project;
use serde::de::DeserializeOwned;
use std::{
fmt::{Debug, Display},
future::Future,
hash::Hash,
sync::Arc,
task::Poll,
task::{ready, Poll},
time::Duration,
};
use stream::BoxStream;
Expand Down Expand Up @@ -326,7 +327,8 @@ where
.instrument(reconciler_span)
.left_future()
}
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
None => std::future::ready(Err(Error::ObjectNotFound(request.obj_ref.erase())))
.right_future(),
}
},
)
Expand Down Expand Up @@ -1155,7 +1157,7 @@ where
/// use kube::{Api, Client, ResourceExt};
/// use kube_runtime::{
/// controller::{Controller, Action},
/// watcher,
/// watcher,
/// };
/// use std::{convert::Infallible, sync::Arc};
/// Controller::new(
Expand Down Expand Up @@ -1274,7 +1276,7 @@ where

#[cfg(test)]
mod tests {
use std::{convert::Infallible, sync::Arc, time::Duration};
use std::{convert::Infallible, pin::pin, sync::Arc, time::Duration};

use super::{Action, APPLIER_REQUEUE_BUF_SIZE};
use crate::{
Expand All @@ -1283,7 +1285,7 @@ mod tests {
watcher::{self, metadata_watcher, watcher, Event},
Config, Controller,
};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
use kube_client::{core::ObjectMeta, Api, Resource};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -1348,7 +1350,7 @@ mod tests {

let (queue_tx, queue_rx) = futures::channel::mpsc::unbounded::<ObjectRef<ConfigMap>>();
let (store_rx, mut store_tx) = reflector::store();
let applier = applier(
let mut applier = pin!(applier(
|obj, _| {
Box::pin(async move {
// Try to flood the rescheduling buffer buffer by just putting it back in the queue immediately
Expand All @@ -1361,8 +1363,7 @@ mod tests {
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
);
pin_mut!(applier);
));
for i in 0..items {
let obj = ConfigMap {
metadata: ObjectMeta {
Expand Down
8 changes: 4 additions & 4 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::future_hash_map::FutureHashMap;
use crate::scheduler::{ScheduleRequest, Scheduler};
use futures::{future, Future, FutureExt, Stream, StreamExt};
use futures::{FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use std::{
convert::Infallible,
future::{self, Future},
hash::Hash,
pin::Pin,
task::{Context, Poll},
Expand All @@ -29,7 +30,7 @@ pub struct Runner<T, R, F, MkF, Ready = future::Ready<Result<(), Infallible>>> {
run_msg: MkF,
slots: FutureHashMap<T, F>,
#[pin]
ready_to_execute_after: future::Fuse<Ready>,
ready_to_execute_after: futures::future::Fuse<Ready>,
is_ready_to_execute: bool,
stopped: bool,
max_concurrent_executions: u16,
Expand Down Expand Up @@ -163,8 +164,7 @@ mod tests {
};
use futures::{
channel::{mpsc, oneshot},
future::{self},
poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
future, poll, stream, Future, SinkExt, StreamExt, TryStreamExt,
};
use std::{
cell::RefCell,
Expand Down
3 changes: 2 additions & 1 deletion kube-runtime/src/reflector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ pub use store::{store, Store};
/// or [controller-rs](https://github.com/kube-rs/controller-rs) for the similar controller integration with [actix-web](https://actix.rs/).
///
/// ```no_run
/// use std::future::ready;
/// use k8s_openapi::api::core::v1::Node;
/// use kube::runtime::{reflector, watcher, WatchStreamExt, watcher::Config};
/// use futures::{StreamExt, future::ready};
/// use futures::StreamExt;
/// # use kube::api::Api;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
Expand Down
19 changes: 8 additions & 11 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ mod tests {

use super::{debounced_scheduler, scheduler, ScheduleRequest};
use derivative::Derivative;
use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt};
use std::task::Poll;
use futures::{channel::mpsc, future, poll, stream, FutureExt, SinkExt, StreamExt};
use std::{pin::pin, task::Poll};
use tokio::time::{advance, pause, sleep, Duration, Instant};

fn unwrap_poll<T>(poll: Poll<T>) -> T {
Expand Down Expand Up @@ -379,7 +379,7 @@ mod tests {
#[tokio::test]
async fn scheduler_should_emit_items_as_requested() {
pause();
let scheduler = scheduler(
let mut scheduler = pin!(scheduler(
stream::iter(vec![
ScheduleRequest {
message: 1_u8,
Expand All @@ -391,8 +391,7 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
pin_mut!(scheduler);
));
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap(), 1);
Expand All @@ -406,7 +405,7 @@ mod tests {
#[tokio::test]
async fn scheduler_dedupe_should_keep_earlier_item() {
pause();
let scheduler = scheduler(
let mut scheduler = pin!(scheduler(
stream::iter(vec![
ScheduleRequest {
message: (),
Expand All @@ -418,8 +417,7 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
pin_mut!(scheduler);
));
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap();
Expand All @@ -430,7 +428,7 @@ mod tests {
#[tokio::test]
async fn scheduler_dedupe_should_replace_later_item() {
pause();
let scheduler = scheduler(
let mut scheduler = pin!(scheduler(
stream::iter(vec![
ScheduleRequest {
message: (),
Expand All @@ -442,8 +440,7 @@ mod tests {
},
])
.on_complete(sleep(Duration::from_secs(5))),
);
pin_mut!(scheduler);
));
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(2)).await;
scheduler.next().now_or_never().unwrap().unwrap();
Expand Down
24 changes: 10 additions & 14 deletions kube-runtime/src/utils/delayed_init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ pub struct InitDropped;

#[cfg(test)]
mod tests {
use std::task::Poll;
use std::{pin::pin, task::Poll};

use super::DelayedInit;
use futures::{pin_mut, poll};
use futures::poll;
use tracing::Level;
use tracing_subscriber::util::SubscriberInitExt;

Expand All @@ -121,8 +121,7 @@ mod tests {
async fn must_allow_single_reader() {
let _tracing = setup_tracing();
let (tx, rx) = DelayedInit::<u8>::new();
let get1 = rx.get();
pin_mut!(get1);
let mut get1 = pin!(rx.get());
assert_eq!(poll!(get1.as_mut()), Poll::Pending);
tx.init(1);
assert_eq!(poll!(get1), Poll::Ready(Ok(1)));
Expand All @@ -132,10 +131,9 @@ mod tests {
async fn must_allow_concurrent_readers_while_waiting() {
let _tracing = setup_tracing();
let (tx, rx) = DelayedInit::<u8>::new();
let get1 = rx.get();
let get2 = rx.get();
let get3 = rx.get();
pin_mut!(get1, get2, get3);
let mut get1 = pin!(rx.get());
let mut get2 = pin!(rx.get());
let mut get3 = pin!(rx.get());
assert_eq!(poll!(get1.as_mut()), Poll::Pending);
assert_eq!(poll!(get2.as_mut()), Poll::Pending);
assert_eq!(poll!(get3.as_mut()), Poll::Pending);
Expand All @@ -149,8 +147,7 @@ mod tests {
async fn must_allow_reading_after_init() {
let _tracing = setup_tracing();
let (tx, rx) = DelayedInit::<u8>::new();
let get1 = rx.get();
pin_mut!(get1);
let mut get1 = pin!(rx.get());
assert_eq!(poll!(get1.as_mut()), Poll::Pending);
tx.init(1);
assert_eq!(poll!(get1), Poll::Ready(Ok(1)));
Expand All @@ -162,10 +159,9 @@ mod tests {
async fn must_allow_concurrent_readers_in_any_order() {
let _tracing = setup_tracing();
let (tx, rx) = DelayedInit::<u8>::new();
let get1 = rx.get();
let get2 = rx.get();
let get3 = rx.get();
pin_mut!(get1, get2, get3);
let mut get1 = pin!(rx.get());
let mut get2 = pin!(rx.get());
let mut get3 = pin!(rx.get());
assert_eq!(poll!(get1.as_mut()), Poll::Pending);
assert_eq!(poll!(get2.as_mut()), Poll::Pending);
assert_eq!(poll!(get3.as_mut()), Poll::Pending);
Expand Down
Loading

0 comments on commit dac48d9

Please sign in to comment.