Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add predicates to allow filtering watcher streams #911

Merged
merged 20 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ default = ["openssl-tls", "kubederive", "ws", "latest", "runtime"]
kubederive = ["kube/derive"]
openssl-tls = ["kube/client", "kube/openssl-tls"]
rustls-tls = ["kube/client", "kube/rustls-tls"]
runtime = ["kube/runtime"]
runtime = ["kube/runtime", "kube/unstable-runtime"]
ws = ["kube/ws"]
latest = ["k8s-openapi/v1_26"]

Expand Down
16 changes: 9 additions & 7 deletions examples/node_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use futures::{StreamExt, TryStreamExt};
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Node;
use kube::{
api::{Api, ResourceExt},
runtime::{reflector, watcher, WatchStreamExt},
runtime::{predicates, reflector, watcher, WatchStreamExt},
Client,
};
use tracing::*;
Expand All @@ -18,7 +18,9 @@ async fn main() -> anyhow::Result<()> {
.timeout(10); // short watch timeout in this example

let (reader, writer) = reflector::store();
let rf = reflector(writer, watcher(nodes, wc));
let rf = reflector(writer, watcher(nodes, wc))
.applied_objects()
.predicate_filter(predicates::labels); // NB: requires an unstable feature

// Periodically read our state in the background
tokio::spawn(async move {
Expand All @@ -29,10 +31,10 @@ async fn main() -> anyhow::Result<()> {
}
});

// Drain and log applied events from the reflector
let mut rfa = rf.applied_objects().boxed();
while let Some(event) = rfa.try_next().await? {
info!("saw {}", event.name_any());
// Log applied events with changes from the reflector
pin_mut!(rf);
while let Some(node) = rf.try_next().await? {
info!("saw node {} with hitherto unseen labels", node.name_any());
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion justfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fmt:
rustfmt +nightly --edition 2021 $(find . -type f -iname *.rs)

doc:
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26 --open
RUSTDOCFLAGS="--cfg docsrs" cargo +nightly doc --lib --workspace --features=derive,ws,oauth,jsonpatch,client,derive,runtime,admission,k8s-openapi/v1_26,unstable-runtime --open

deny:
# might require rm Cargo.lock first to match CI
Expand Down
3 changes: 2 additions & 1 deletion kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ rust-version = "1.63.0"
edition = "2021"

[features]
unstable-runtime = ["unstable-runtime-subscribe"]
unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-predicates"]
unstable-runtime-subscribe = []
unstable-runtime-predicates = []

[package.metadata.docs.rs]
features = ["k8s-openapi/v1_26", "unstable-runtime"]
Expand Down
3 changes: 3 additions & 0 deletions kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,6 @@ pub use reflector::reflector;
pub use scheduler::scheduler;
pub use utils::WatchStreamExt;
pub use watcher::{metadata_watcher, watcher};

#[cfg(feature = "unstable-runtime-predicates")] pub use utils::predicates;
pub use wait::conditions;
3 changes: 3 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@

mod backoff_reset_timer;
mod event_flatten;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod stream_backoff;
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
#[cfg(feature = "unstable-runtime-predicates")]
pub use predicate::{predicates, PredicateFilter};
pub use stream_backoff::StreamBackoff;
#[cfg(feature = "unstable-runtime-subscribe")]
pub use stream_subscribe::StreamSubscribe;
Expand Down
163 changes: 163 additions & 0 deletions kube-runtime/src/utils/predicate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use crate::{reflector::ObjectRef, watcher::Error};
use core::{
pin::Pin,
task::{Context, Poll},
};
use futures::{ready, Stream};
use kube_client::Resource;
use pin_project::pin_project;
use std::{collections::HashMap, hash::Hash};

#[allow(clippy::pedantic)]
#[pin_project]
/// Stream returned by the [`predicate_filter`](super::WatchStreamExt::predicate_filter) method.
#[must_use = "streams do nothing unless polled"]
pub struct PredicateFilter<St, K: Resource, Func> {
#[pin]
stream: St,
predicate: Func,
cache: HashMap<ObjectRef<K>, u64>,
}
impl<St, K, F> PredicateFilter<St, K, F>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
F: Fn(&K) -> Option<u64> + 'static,
{
pub(super) fn new(stream: St, predicate: F) -> Self {
Self {
stream,
predicate,
cache: HashMap::new(),
}
}
}
impl<St, K, F> Stream for PredicateFilter<St, K, F>
where
St: Stream<Item = Result<K, Error>>,
K: Resource,
K::DynamicType: Default + Eq + Hash,
F: Fn(&K) -> Option<u64> + 'static,
{
type Item = Result<K, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
Poll::Ready(loop {
break match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(obj)) => {
if let Some(val) = (me.predicate)(&obj) {
let key = ObjectRef::from_obj(&obj);
let changed = if let Some(old) = me.cache.get(&key) {
*old != val
} else {
true
};
if let Some(old) = me.cache.get_mut(&key) {
*old = val;
} else {
me.cache.insert(key, val);
}
if changed {
Some(Ok(obj))
} else {
continue;
}
} else {
// if we can't evaluate predicate, always emit K
Some(Ok(obj))
}
}
Some(Err(err)) => Some(Err(err)),
None => return Poll::Ready(None),
};
})
}
}

pub mod predicates {
use kube_client::{Resource, ResourceExt};
use std::{
collections::hash_map::DefaultHasher,
hash::{Hash, Hasher},
};

// See: https://github.com/kubernetes-sigs/controller-runtime/blob/v0.12.0/pkg/predicate/predicate.go

fn hash<T: Hash>(t: &T) -> u64 {
let mut hasher = DefaultHasher::new();
t.hash(&mut hasher);
hasher.finish()
}

/// Hash the generation of a Resource K
pub fn generation<K: Resource>(obj: &K) -> Option<u64> {
obj.meta().generation.map(|g| hash(&g))
}

/// Hash the labels of a Resource K
pub fn labels<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.labels()))
}

/// Hash the annotations of a Resource K
pub fn annotations<K: Resource>(obj: &K) -> Option<u64> {
Some(hash(obj.annotations()))
}
}

#[cfg(test)]
pub(crate) mod tests {
use std::task::Poll;

use super::{predicates, Error, PredicateFilter};
use futures::{pin_mut, poll, stream, FutureExt, StreamExt};
use kube_client::Resource;
use serde_json::json;

#[tokio::test]
async fn predicate_filtering_hides_equal_predicate_values() {
use k8s_openapi::api::core::v1::Pod;
let mkobj = |gen: i32| {
let p: Pod = serde_json::from_value(json!({
"apiVersion": "v1",
"kind": "Pod",
"metadata": {
"name": "blog",
"generation": Some(gen),
},
"spec": {
"containers": [{
"name": "blog",
"image": "clux/blog:0.1.0"
}],
}
}))
.unwrap();
p
};
let data = stream::iter([
Ok(mkobj(1)),
Err(Error::TooManyObjects),
Ok(mkobj(1)),
Ok(mkobj(2)),
]);
let rx = PredicateFilter::new(data, predicates::generation);
pin_mut!(rx);

// mkobj(1) passed through
let first = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(first.meta().generation, Some(1));

// Error passed through
assert!(matches!(
poll!(rx.next()),
Poll::Ready(Some(Err(Error::TooManyObjects)))
));
// (no repeat mkobj(1) - same generation)
// mkobj(2) next
let second = rx.next().now_or_never().unwrap().unwrap().unwrap();
assert_eq!(second.meta().generation, Some(2));
assert!(matches!(poll!(rx.next()), Poll::Ready(None)));
}
}
74 changes: 74 additions & 0 deletions kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
#[cfg(feature = "unstable-runtime-predicates")]
use crate::utils::predicate::PredicateFilter;
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
watcher,
};
#[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource;

use backoff::backoff::Backoff;
use futures::{Stream, TryStream};

Expand Down Expand Up @@ -38,6 +42,43 @@ pub trait WatchStreamExt: Stream {
EventFlatten::new(self, true)
}


/// Filter out a flattened stream on [`predicates`](crate::predicates).
///
/// This will filter out repeat calls where the predicate returns the same result.
/// Common use case for this is to avoid repeat events for status updates
/// by filtering on []`predicates::generation`].
///
/// ## Usage
/// ```no_run
/// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
/// use kube::{Api, Client, ResourceExt};
/// use kube_runtime::{watcher, WatchStreamExt, predicates};
/// use k8s_openapi::api::core::v1::Pod;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let pods: Api<Pod> = Api::default_namespaced(client);
/// let changed_pods = watcher(pods, watcher::Config::default())
/// .applied_objects()
/// .predicate_filter(predicates::generation);
/// pin_mut!(changed_pods);
///
/// while let Some(pod) = changed_pods.try_next().await? {
/// println!("saw Pod '{} with hitherto unseen generation", pod.name_any());
/// }
/// # Ok(())
/// # }
/// ```
#[cfg(feature = "unstable-runtime-predicates")]
fn predicate_filter<K, F>(self, predicate: F) -> PredicateFilter<Self, K, F>
where
Self: Stream<Item = Result<K, watcher::Error>> + Sized,
K: Resource + 'static,
F: Fn(&K) -> Option<u64> + 'static,
{
PredicateFilter::new(self, predicate)
}

/// Create a [`StreamSubscribe`] from a [`watcher()`] stream.
///
/// The [`StreamSubscribe::subscribe()`] method which allows additional consumers
Expand Down Expand Up @@ -103,3 +144,36 @@ pub trait WatchStreamExt: Stream {
}

impl<St: ?Sized> WatchStreamExt for St where St: Stream {}

// Compile tests
#[cfg(feature = "unstable-runtime-predicates")]
#[cfg(test)]
pub(crate) mod tests {
use super::*;
use crate::predicates;
use futures::StreamExt;
use k8s_openapi::api::core::v1::Pod;
use kube_client::{Api, Resource};

fn compile_type<T>() -> T {
unimplemented!("not called - compile test only")
}

pub fn assert_stream<T, K>(x: T) -> T
where
T: Stream<Item = watcher::Result<K>> + Send,
K: Resource + Clone + Send + 'static,
{
x
}

// not #[test] because this is only a compile check verification
#[allow(dead_code, unused_must_use)]
fn test_watcher_stream_type_drift() {
let pred_watch = watcher(compile_type::<Api<Pod>>(), Default::default())
.touched_objects()
.predicate_filter(predicates::generation)
.boxed();
assert_stream(pred_watch);
}
}
2 changes: 1 addition & 1 deletion kube/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ runtime = ["kube-runtime"]
unstable-runtime = ["kube-runtime/unstable-runtime"]

[package.metadata.docs.rs]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26"]
features = ["client", "rustls-tls", "openssl-tls", "derive", "ws", "oauth", "jsonpatch", "admission", "runtime", "k8s-openapi/v1_26", "unstable-runtime"]
# Define the configuration attribute `docsrs`. Used to enable `doc_cfg` feature.
rustdoc-args = ["--cfg", "docsrs"]

Expand Down