diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 4aa15ad28..ebab02b55 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -18,7 +18,7 @@ default = ["openssl-tls", "kubederive", "ws", "latest", "runtime"] kubederive = ["kube/derive"] openssl-tls = ["kube/client", "kube/openssl-tls"] rustls-tls = ["kube/client", "kube/rustls-tls"] -runtime = ["kube/runtime"] +runtime = ["kube/runtime", "kube/unstable-runtime"] ws = ["kube/ws"] latest = ["k8s-openapi/v1_26"] diff --git a/examples/node_reflector.rs b/examples/node_reflector.rs index 38c2f3a83..c0c9fbf51 100644 --- a/examples/node_reflector.rs +++ b/examples/node_reflector.rs @@ -1,8 +1,8 @@ -use futures::{StreamExt, TryStreamExt}; +use futures::{pin_mut, TryStreamExt}; use k8s_openapi::api::core::v1::Node; use kube::{ api::{Api, ResourceExt}, - runtime::{reflector, watcher, WatchStreamExt}, + runtime::{predicates, reflector, watcher, WatchStreamExt}, Client, }; use tracing::*; @@ -18,7 +18,9 @@ async fn main() -> anyhow::Result<()> { .timeout(10); // short watch timeout in this example let (reader, writer) = reflector::store(); - let rf = reflector(writer, watcher(nodes, wc)); + let rf = reflector(writer, watcher(nodes, wc)) + .applied_objects() + .predicate_filter(predicates::labels); // NB: requires an unstable feature // Periodically read our state in the background tokio::spawn(async move { @@ -29,10 +31,10 @@ async fn main() -> anyhow::Result<()> { } }); - // Drain and log applied events from the reflector - let mut rfa = rf.applied_objects().boxed(); - while let Some(event) = rfa.try_next().await? { - info!("saw {}", event.name_any()); + // Log applied events with changes from the reflector + pin_mut!(rf); + while let Some(node) = rf.try_next().await? { + info!("saw node {} with hitherto unseen labels", node.name_any()); } Ok(()) diff --git a/justfile b/justfile index a42f30274..3bc6c6d86 100644 --- a/justfile +++ b/justfile @@ -15,7 +15,7 @@ fmt: rustfmt +nightly --edition 2021 $(find . -type f -iname *.rs) doc: - RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26 --open + RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26,unstable-runtime --open deny: # might require rm Cargo.lock first to match CI diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index ec1e3f529..bbefb28ab 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -15,8 +15,9 @@ rust-version = "1.63.0" edition = "2021" [features] -unstable-runtime = ["unstable-runtime-subscribe"] +unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates"] unstable-runtime-subscribe = [] +unstable-runtime-predicates = [] [package.metadata.docs.rs] features = ["k8s-openapi/v1_26", "unstable-runtime"] diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index d2f945f49..2f2afbb18 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -35,3 +35,6 @@ pub use reflector::reflector; pub use scheduler::scheduler; pub use utils::WatchStreamExt; pub use watcher::{metadata_watcher, watcher}; + +#[cfg(feature = "unstable-runtime-predicates")] pub use utils::predicates; +pub use wait::conditions; diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index d40dd295e..dddc00571 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -2,12 +2,15 @@ mod backoff_reset_timer; mod event_flatten; +#[cfg(feature = "unstable-runtime-predicates")] mod predicate; mod stream_backoff; #[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe; mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; pub use event_flatten::EventFlatten; +#[cfg(feature = "unstable-runtime-predicates")] +pub use predicate::{predicates, PredicateFilter}; pub use stream_backoff::StreamBackoff; #[cfg(feature = "unstable-runtime-subscribe")] pub use stream_subscribe::StreamSubscribe; diff --git a/kube-runtime/src/utils/predicate.rs b/kube-runtime/src/utils/predicate.rs new file mode 100644 index 000000000..5905365cb --- /dev/null +++ b/kube-runtime/src/utils/predicate.rs @@ -0,0 +1,163 @@ +use crate::{reflector::ObjectRef, watcher::Error}; +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +use futures::{ready, Stream}; +use kube_client::Resource; +use pin_project::pin_project; +use std::{collections::HashMap, hash::Hash}; + +#[allow(clippy::pedantic)] +#[pin_project] +/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method. +#[must_use = "streams do nothing unless polled"] +pub struct PredicateFilter { + #[pin] + stream: St, + predicate: Func, + cache: HashMap, u64>, +} +impl PredicateFilter +where + St: Stream>, + K: Resource, + F: Fn(&K) -> Option + 'static, +{ + pub(super) fn new(stream: St, predicate: F) -> Self { + Self { + stream, + predicate, + cache: HashMap::new(), + } + } +} +impl Stream for PredicateFilter +where + St: Stream>, + K: Resource, + K::DynamicType: Default + Eq + Hash, + F: Fn(&K) -> Option + 'static, +{ + type Item = Result; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + Poll::Ready(loop { + break match ready!(me.stream.as_mut().poll_next(cx)) { + Some(Ok(obj)) => { + if let Some(val) = (me.predicate)(&obj) { + let key = ObjectRef::from_obj(&obj); + let changed = if let Some(old) = me.cache.get(&key) { + *old != val + } else { + true + }; + if let Some(old) = me.cache.get_mut(&key) { + *old = val; + } else { + me.cache.insert(key, val); + } + if changed { + Some(Ok(obj)) + } else { + continue; + } + } else { + // if we can't evaluate predicate, always emit K + Some(Ok(obj)) + } + } + Some(Err(err)) => Some(Err(err)), + None => return Poll::Ready(None), + }; + }) + } +} + +pub mod predicates { + use kube_client::{Resource, ResourceExt}; + use std::{ + collections::hash_map::DefaultHasher, + hash::{Hash, Hasher}, + }; + + // See: https://github.com/kubernetes-sigs/controller-runtime/blob/v0.12.0/pkg/predicate/predicate.go + + fn hash(t: &T) -> u64 { + let mut hasher = DefaultHasher::new(); + t.hash(&mut hasher); + hasher.finish() + } + + /// Hash the generation of a Resource K + pub fn generation(obj: &K) -> Option { + obj.meta().generation.map(|g| hash(&g)) + } + + /// Hash the labels of a Resource K + pub fn labels(obj: &K) -> Option { + Some(hash(obj.labels())) + } + + /// Hash the annotations of a Resource K + pub fn annotations(obj: &K) -> Option { + Some(hash(obj.annotations())) + } +} + +#[cfg(test)] +pub(crate) mod tests { + use std::task::Poll; + + use super::{predicates, Error, PredicateFilter}; + use futures::{pin_mut, poll, stream, FutureExt, StreamExt}; + use kube_client::Resource; + use serde_json::json; + + #[tokio::test] + async fn predicate_filtering_hides_equal_predicate_values() { + use k8s_openapi::api::core::v1::Pod; + let mkobj = |gen: i32| { + let p: Pod = serde_json::from_value(json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "blog", + "generation": Some(gen), + }, + "spec": { + "containers": [{ + "name": "blog", + "image": "clux/blog:0.1.0" + }], + } + })) + .unwrap(); + p + }; + let data = stream::iter([ + Ok(mkobj(1)), + Err(Error::TooManyObjects), + Ok(mkobj(1)), + Ok(mkobj(2)), + ]); + let rx = PredicateFilter::new(data, predicates::generation); + pin_mut!(rx); + + // mkobj(1) passed through + let first = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(first.meta().generation, Some(1)); + + // Error passed through + assert!(matches!( + poll!(rx.next()), + Poll::Ready(Some(Err(Error::TooManyObjects))) + )); + // (no repeat mkobj(1) - same generation) + // mkobj(2) next + let second = rx.next().now_or_never().unwrap().unwrap().unwrap(); + assert_eq!(second.meta().generation, Some(2)); + assert!(matches!(poll!(rx.next()), Poll::Ready(None))); + } +} diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 7cda50245..e2e3e1b50 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,9 +1,13 @@ +#[cfg(feature = "unstable-runtime-predicates")] +use crate::utils::predicate::PredicateFilter; #[cfg(feature = "unstable-runtime-subscribe")] use crate::utils::stream_subscribe::StreamSubscribe; use crate::{ utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff}, watcher, }; +#[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource; + use backoff::backoff::Backoff; use futures::{Stream, TryStream}; @@ -38,6 +42,43 @@ pub trait WatchStreamExt: Stream { EventFlatten::new(self, true) } + + /// Filter out a flattened stream on [`predicates`](crate::predicates). + /// + /// This will filter out repeat calls where the predicate returns the same result. + /// Common use case for this is to avoid repeat events for status updates + /// by filtering on []`predicates::generation`]. + /// + /// ## Usage + /// ```no_run + /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// use kube::{Api, Client, ResourceExt}; + /// use kube_runtime::{watcher, WatchStreamExt, predicates}; + /// use k8s_openapi::api::core::v1::Pod; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client: kube::Client = todo!(); + /// let pods: Api = Api::default_namespaced(client); + /// let changed_pods = watcher(pods, watcher::Config::default()) + /// .applied_objects() + /// .predicate_filter(predicates::generation); + /// pin_mut!(changed_pods); + /// + /// while let Some(pod) = changed_pods.try_next().await? { + /// println!("saw Pod '{} with hitherto unseen generation", pod.name_any()); + /// } + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "unstable-runtime-predicates")] + fn predicate_filter(self, predicate: F) -> PredicateFilter + where + Self: Stream> + Sized, + K: Resource + 'static, + F: Fn(&K) -> Option + 'static, + { + PredicateFilter::new(self, predicate) + } + /// Create a [`StreamSubscribe`] from a [`watcher()`] stream. /// /// The [`StreamSubscribe::subscribe()`] method which allows additional consumers @@ -103,3 +144,36 @@ pub trait WatchStreamExt: Stream { } impl WatchStreamExt for St where St: Stream {} + +// Compile tests +#[cfg(feature = "unstable-runtime-predicates")] +#[cfg(test)] +pub(crate) mod tests { + use super::*; + use crate::predicates; + use futures::StreamExt; + use k8s_openapi::api::core::v1::Pod; + use kube_client::{Api, Resource}; + + fn compile_type() -> T { + unimplemented!("not called - compile test only") + } + + pub fn assert_stream(x: T) -> T + where + T: Stream> + Send, + K: Resource + Clone + Send + 'static, + { + x + } + + // not #[test] because this is only a compile check verification + #[allow(dead_code, unused_must_use)] + fn test_watcher_stream_type_drift() { + let pred_watch = watcher(compile_type::>(), Default::default()) + .touched_objects() + .predicate_filter(predicates::generation) + .boxed(); + assert_stream(pred_watch); + } +} diff --git a/kube/Cargo.toml b/kube/Cargo.toml index e2211e64e..8124d64af 100644 --- a/kube/Cargo.toml +++ b/kube/Cargo.toml @@ -31,7 +31,7 @@ runtime = ["kube-runtime"] unstable-runtime = ["kube-runtime/unstable-runtime"] [package.metadata.docs.rs] -features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"] +features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26", "unstable-runtime"] # Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature. rustdoc-args = ["--cfg", "docsrs"]