diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 8a82cd001..f08cacca8 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -31,6 +31,7 @@ snafu = { version = "0.6.8", features = ["futures"] } either = "1.6.0" # Some configuration tweaking require reqwest atm reqwest = { version = "0.10.7", default-features = false, features = ["json", "gzip", "stream"] } +backoff = "0.2.1" [[example]] name = "configmapgen_controller" diff --git a/examples/configmapgen_controller.rs b/examples/configmapgen_controller.rs index 9fe5b5429..ba7ec2dbd 100644 --- a/examples/configmapgen_controller.rs +++ b/examples/configmapgen_controller.rs @@ -97,13 +97,6 @@ async fn reconcile(generator: ConfigMapGenerator, ctx: Context) -> Result< }) } -/// The controller triggers this on reconcile errors -fn error_policy(_error: &Error, _ctx: Context) -> ReconcilerAction { - ReconcilerAction { - requeue_after: Some(Duration::from_secs(1)), - } -} - // Data we want access to in error/reconcile calls struct Data { client: Client, @@ -120,7 +113,11 @@ async fn main() -> Result<()> { Controller::new(cmgs, ListParams::default()) .owns(cms, ListParams::default()) - .run(reconcile, error_policy, Context::new(Data { client })) + .run( + reconcile, + || backoff::backoff::Constant::new(Duration::from_secs(1)), + Context::new(Data { client }), + ) .for_each(|res| async move { match res { Ok(o) => info!("reconciled {:?}", o), diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 6175ede3e..aff75f97b 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -23,6 +23,7 @@ pin-project = "0.4.23" tokio = { version = "0.2.21", features = ["time"] } snafu = { version = "0.6.8", features = ["futures"] } dashmap = "3.11.10" +backoff = "0.2.1" [features] default = ["native-tls"] diff --git a/kube-runtime/src/controller.rs b/kube-runtime/src/controller.rs index aeec69724..82e84d166 100644 --- a/kube-runtime/src/controller.rs +++ b/kube-runtime/src/controller.rs @@ -2,12 +2,13 @@ use crate::{ reflector::{ reflector, store::{Store, Writer}, - ErasedResource, ObjectRef, + ObjectRef, RuntimeResource, }, scheduler::{self, scheduler, ScheduleRequest}, utils::{try_flatten_applied, try_flatten_touched, trystream_try_via}, watcher::{self, watcher}, }; +use backoff::backoff::Backoff; use derivative::Derivative; use futures::{ channel, future, @@ -17,17 +18,23 @@ use futures::{ use kube::api::{Api, ListParams, Meta}; use serde::de::DeserializeOwned; use snafu::{futures::TryStreamExt as SnafuTryStreamExt, Backtrace, OptionExt, ResultExt, Snafu}; -use std::{sync::Arc, time::Duration}; +use std::{collections::HashMap, fmt::Debug, marker::PhantomData, sync::Arc, time::Duration}; use stream::BoxStream; use tokio::time::Instant; -#[derive(Snafu, Debug)] -pub enum Error { +#[derive(Snafu, Derivative)] +#[derivative(Debug(bound = ""))] +pub enum Error< + K: RuntimeResource, + ReconcilerErr: std::error::Error + 'static, + QueueErr: std::error::Error + 'static, +> { ObjectNotFound { - obj_ref: ObjectRef, + obj_ref: ObjectRef, backtrace: Backtrace, }, ReconcilerFailed { + obj_ref: ObjectRef, source: ReconcilerErr, backtrace: Backtrace, }, @@ -92,6 +99,75 @@ where }) } +/// A policy for when to retry reconciliation, after an error has occurred. +pub trait ErrorPolicy { + type Err; + type K: RuntimeResource; + type Ctx; + + /// Notifies that the state for an object should be removed, for example if it is reconciled successfully. + fn reset_object(&mut self, obj_ref: &ObjectRef, ctx: Context); + /// Queries for when to next retry after an error. + fn on_error( + &mut self, + obj_ref: ObjectRef, + error: &Self::Err, + ctx: Context, + ) -> ReconcilerAction; +} + +/// Retries errors based on a `Backoff` policy. +/// +/// A separate backoff tracker is used for each object, and it is +/// reset whenever the object is reconciled successfully. +#[derive(Debug)] +pub struct BackoffErrorPolicy { + make_backoff: MkBackoff, + backoffs: HashMap, B>, + _err: PhantomData, + _ctx: PhantomData, +} + +impl B, B: Backoff, K: RuntimeResource, Err, Ctx> + BackoffErrorPolicy +{ + fn new(make_backoff: MkBackoff) -> Self { + BackoffErrorPolicy { + make_backoff, + backoffs: HashMap::new(), + _err: PhantomData, + _ctx: PhantomData, + } + } +} + +impl B, B: Backoff, K: RuntimeResource, Err, Ctx> ErrorPolicy + for BackoffErrorPolicy +{ + type Err = Err; + type K = K; + type Ctx = Ctx; + + fn reset_object(&mut self, obj_ref: &ObjectRef, _ctx: Context) { + self.backoffs.remove(obj_ref); + } + + fn on_error( + &mut self, + obj_ref: ObjectRef, + _error: &Self::Err, + _ctx: Context, + ) -> ReconcilerAction { + let obj_backoff = self + .backoffs + .entry(obj_ref) + .or_insert_with(&mut self.make_backoff); + ReconcilerAction { + requeue_after: obj_backoff.next_backoff(), + } + } +} + /// A context data type that's passed through to the controllers callbacks /// /// Context gets passed to both the `reconciler` and the `error_policy` callbacks. @@ -133,11 +209,11 @@ impl Context { /// (such as triggering from arbitrary `Stream`s), at the cost of some more verbosity. pub fn applier( mut reconciler: impl FnMut(K, Context) -> ReconcilerFut, - mut error_policy: impl FnMut(&ReconcilerFut::Error, Context) -> ReconcilerAction, + mut error_policy: impl ErrorPolicy, context: Context, store: Store, queue: QueueStream, -) -> impl Stream, ReconcilerAction), Error>> +) -> impl Stream, ReconcilerAction), Error>> where K: Clone + Meta + 'static, ReconcilerFut: TryFuture, @@ -175,22 +251,51 @@ where }) // then reconcile every object .and_then(move |(obj_ref, obj)| { - reconciler(obj, context.clone()) // TODO: add a context argument to the reconcile + reconciler(obj, context.clone()) .into_future() // TryFuture -> impl Future - .map(|result| (obj_ref, result)) // turn into pair and ok wrap - .map(Ok) // (this lets us deal with errors from reconciler below) + .map(|result| match result { + Ok(action) => Ok((obj_ref, action)), + Err(err) => Err(err).context(ReconcilerFailed { obj_ref }), + }) }) // finally, for each completed reconcile call: - .and_then(move |(obj_ref, reconciler_result)| { - let ReconcilerAction { requeue_after } = match &reconciler_result { - Ok(action) => action.clone(), // do what user told us - Err(err) => error_policy(err, err_context.clone()), // reconciler fn call failed + .then(move |reconciler_result| { + let (obj_ref, action, error) = match reconciler_result { + // tell the error policy about the success (to reset backoff timers, for example) + Ok((obj_ref, action)) => { + error_policy.reset_object(&obj_ref, err_context.clone()); + (obj_ref.clone(), action, None) + } + // reconciler fn call failed + Err(Error::ReconcilerFailed { + obj_ref, + source, + backtrace, + }) => ( + obj_ref.clone(), + error_policy.on_error(obj_ref.clone(), &source, err_context.clone()), + Some(Error::ReconcilerFailed { + obj_ref, + source, + backtrace, + }), + ), + // object was deleted, fake a "success" to the error policy, so that it can clean up any bookkeeping and avoid leaking memory + Err(Error::ObjectNotFound { obj_ref, backtrace }) => { + error_policy.reset_object(&obj_ref, err_context.clone()); + ( + obj_ref.clone(), + ReconcilerAction { requeue_after: None }, + Some(Error::ObjectNotFound { obj_ref, backtrace }), + ) + } + // Upstream or internal error, propagate + Err(_) => return future::Either::Left(future::ready(reconciler_result)), }; - // we should always requeue at some point in case of network errors ^ let mut scheduler_tx = scheduler_tx.clone(); - async move { + future::Either::Right(async move { // Transmit the requeue request to the scheduler (picked up again at top) - if let Some(delay) = requeue_after { + if let Some(delay) = action.requeue_after { scheduler_tx .send(ScheduleRequest { message: obj_ref.clone(), @@ -199,11 +304,11 @@ where .await .expect("Message could not be sent to scheduler_rx"); } - // NB: no else clause ^ because we don't allow not requeuing atm. - reconciler_result - .map(|action| (obj_ref, action)) - .context(ReconcilerFailed) - } + match error { + None => Ok((obj_ref, action)), + Some(err) => Err(err), + } + }) }) } @@ -245,12 +350,6 @@ where /// requeue_after: Some(Duration::from_secs(300)), /// }) /// } -/// /// an error handler that will be called when the reconciler fails -/// fn error_policy(_error: &Error, _ctx: Context<()>) -> ReconcilerAction { -/// ReconcilerAction { -/// requeue_after: Some(Duration::from_secs(60)), -/// } -/// } /// /// /// something to drive the controller /// #[tokio::main] @@ -261,7 +360,7 @@ where /// let cms = Api::::all(client.clone()); /// Controller::new(cmgs, ListParams::default()) /// .owns(cms, ListParams::default()) -/// .run(reconcile, error_policy, context) +/// .run(reconcile, || backoff::backoff::Constant::new(Duration::from_secs(60)), context) /// .for_each(|res| async move { /// match res { /// Ok(o) => println!("reconciled {:?}", o), @@ -346,18 +445,25 @@ where /// This creates a stream from all builder calls and starts an applier with /// a specified `reconciler` and `error_policy` callbacks. Each of these will be called /// with a configurable `Context`. - pub fn run( + pub fn run( self, reconciler: impl FnMut(K, Context) -> ReconcilerFut, - error_policy: impl FnMut(&ReconcilerFut::Error, Context) -> ReconcilerAction, + make_backoff: impl Fn() -> B, context: Context, - ) -> impl Stream, ReconcilerAction), Error>> + ) -> impl Stream, ReconcilerAction), Error>> where K: Clone + Meta + 'static, ReconcilerFut: TryFuture, ReconcilerFut::Error: std::error::Error + 'static, + B: Backoff, { - applier(reconciler, error_policy, context, self.reader, self.selector) + applier( + reconciler, + BackoffErrorPolicy::new(make_backoff), + context, + self.reader, + self.selector, + ) } } @@ -367,6 +473,7 @@ mod tests { use crate::Controller; use k8s_openapi::api::core::v1::ConfigMap; use kube::Api; + use snafu::Snafu; fn assert_send(x: T) -> T { x @@ -378,13 +485,16 @@ mod tests { ) } + #[derive(Snafu, Debug)] + enum NoError {} + // not #[test] because we don't want to actually run it, we just want to assert that it typechecks #[allow(dead_code, unused_must_use)] fn test_controller_should_be_send() { assert_send( Controller::new(mock_type::>(), Default::default()).run( - |_, _| async { Ok(mock_type::()) }, - |_: &std::io::Error, _| mock_type::(), + |_, _| async { Ok::<_, NoError>(mock_type::()) }, + || backoff::backoff::Zero {}, Context::new(()), ), );