diff --git a/kube-client/src/client/mod.rs b/kube-client/src/client/mod.rs index 62d3de510..83d87e74a 100644 --- a/kube-client/src/client/mod.rs +++ b/kube-client/src/client/mod.rs @@ -156,7 +156,7 @@ impl Client { // Error requesting .or_else(|err| err.downcast::().map(|err| Error::HyperError(*err))) // Error from another middleware - .unwrap_or_else(|err| Error::Service(err)) + .unwrap_or_else(Error::Service) })?; Ok(res) } diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 7ab3bc95e..ec254d4bc 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -7,9 +7,9 @@ use crate::{ store::{Store, Writer}, ObjectRef, }, - scheduler::{scheduler, ScheduleRequest}, + scheduler::{debounced_scheduler, ScheduleRequest}, utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt}, - watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff}, + watcher::{self, metadata_watcher, watcher, DefaultBackoff}, }; use backoff::backoff::Backoff; use derivative::Derivative; @@ -246,12 +246,14 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100; /// /// This is the "hard-mode" version of [`Controller`], which allows you some more customization /// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose. +#[allow(clippy::needless_pass_by_value)] pub fn applier( mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut, error_policy: impl Fn(Arc, &ReconcilerFut::Error, Arc) -> Action, context: Arc, store: Store, queue: QueueStream, + config: Config, ) -> impl Stream, Action), Error>> where K: Clone + Resource + 'static, @@ -276,7 +278,7 @@ where .map_err(Error::QueueError) .map_ok(|request| ScheduleRequest { message: request.into(), - run_at: Instant::now() + Duration::from_millis(1), + run_at: Instant::now(), }) .on_complete(async move { // On error: scheduler has already been shut down and there is nothing for us to do @@ -291,7 +293,7 @@ where )), // all the Oks from the select gets passed through the scheduler stream, and are then executed move |s| { - Runner::new(scheduler(s), move |request| { + Runner::new(debounced_scheduler(s, config.debounce), move |request| { let request = request.clone(); match store.get(&request.obj_ref) { Some(obj) => { @@ -417,6 +419,31 @@ where } } +/// Accumulates all options that can be used on a [`Controller`] invocation. +#[derive(Clone, Debug, Default)] +pub struct Config { + debounce: Duration, +} + +impl Config { + /// The debounce duration used to deduplicate reconciliation requests. + /// + /// When set to a non-zero duration, debouncing is enabled in the [`Scheduler`] resulting + /// in __trailing edge debouncing__ of reqonciler requests. + /// This option can help to reduce the amount of unnecessary reconciler calls + /// when using multiple controller relations, or during rapid phase transitions. + /// + /// ## Warning + /// This option delays (and keeps delaying) reconcile requests for objects while + /// the object is updated. It can **permanently hide** updates from your reconciler + /// if set too high on objects that are updated frequently (like nodes). + #[must_use] + pub fn debounce(mut self, debounce: Duration) -> Self { + self.debounce = debounce; + self + } +} + /// Controller for a Resource `K` /// /// A controller is an infinite stream of objects to be reconciled. @@ -505,6 +532,7 @@ where forceful_shutdown_selector: Vec>, dyntype: K::DynamicType, reader: Store, + config: Config, } impl Controller @@ -516,11 +544,11 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// - /// The [`Config`] controls to the possible subset of objects of `K` that you want to manage + /// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage /// and receive reconcile events for. - /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. + /// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`]. #[must_use] - pub fn new(main_api: Api, wc: Config) -> Self + pub fn new(main_api: Api, wc: watcher::Config) -> Self where K::DynamicType: Default, { @@ -531,17 +559,17 @@ where /// /// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`. /// - /// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`] + /// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`] /// to watch - in the Api's configured scope - and receive reconcile events for. /// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`]. /// /// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types. /// - /// [`Config`]: crate::watcher::Config + /// [`watcher::Config`]: crate::watcher::Config /// [`Api`]: kube_client::Api /// [`dynamic`]: kube_client::core::dynamic /// [`Config::default`]: crate::watcher::Config::default - pub fn new_with(main_api: Api, wc: Config, dyntype: K::DynamicType) -> Self { + pub fn new_with(main_api: Api, wc: watcher::Config, dyntype: K::DynamicType) -> Self { let writer = Writer::::new(dyntype.clone()); let reader = writer.as_reader(); let mut trigger_selector = stream::SelectAll::new(); @@ -564,6 +592,7 @@ where ], dyntype, reader, + config: Default::default(), } } @@ -649,9 +678,17 @@ where ], dyntype, reader, + config: Default::default(), } } + /// Specify the configuration for the controller's behavior. + #[must_use] + pub fn with_config(mut self, config: Config) -> Self { + self.config = config; + self + } + /// Specify the backoff policy for "trigger" watches /// /// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`]. @@ -683,7 +720,7 @@ where pub fn owns + DeserializeOwned + Debug + Send + 'static>( self, api: Api, - wc: Config, + wc: watcher::Config, ) -> Self { self.owns_with(api, (), wc) } @@ -696,7 +733,7 @@ where mut self, api: Api, dyntype: Child::DynamicType, - wc: Config, + wc: watcher::Config, ) -> Self where Child::DynamicType: Debug + Eq + Hash + Clone, @@ -847,7 +884,7 @@ where pub fn watches( self, api: Api, - wc: Config, + wc: watcher::Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where @@ -867,7 +904,7 @@ where mut self, api: Api, dyntype: Other::DynamicType, - wc: Config, + wc: watcher::Config, mapper: impl Fn(Other) -> I + Sync + Send + 'static, ) -> Self where @@ -1214,6 +1251,7 @@ where self.reader, StreamBackoff::new(self.trigger_selector, self.trigger_backoff) .take_until(future::select_all(self.graceful_shutdown_selector)), + self.config, ) .take_until(futures::future::select_all(self.forceful_shutdown_selector)) } @@ -1228,7 +1266,7 @@ mod tests { applier, reflector::{self, ObjectRef}, watcher::{self, metadata_watcher, watcher, Event}, - Controller, + Config, Controller, }; use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; use k8s_openapi::api::core::v1::ConfigMap; @@ -1307,6 +1345,7 @@ mod tests { Arc::new(()), store_rx, queue_rx.map(Result::<_, Infallible>::Ok), + Config::default(), ); pin_mut!(applier); for i in 0..items { diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 02fe35a74..3e33e8df3 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -158,6 +158,8 @@ mod tests { let mut count = 0; let (mut sched_tx, sched_rx) = mpsc::unbounded(); let mut runner = Box::pin( + // The debounce period needs to zero because a debounce period > 0 + // will lead to the second request to be discarded. Runner::new(scheduler(sched_rx), |_| { count += 1; // Panic if this ref is already held, to simulate some unsafe action.. diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 2df6d039d..f59d6d607 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -29,7 +29,7 @@ pub mod utils; pub mod wait; pub mod watcher; -pub use controller::{applier, Controller}; +pub use controller::{applier, Config, Controller}; pub use finalizer::finalizer; pub use reflector::reflector; pub use scheduler::scheduler; diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 7ff1cd2e1..3899207ff 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -8,6 +8,7 @@ use std::{ hash::Hash, pin::Pin, task::{Context, Poll}, + time::Duration, }; use tokio::time::Instant; use tokio_util::time::delay_queue::{self, DelayQueue}; @@ -44,15 +45,22 @@ pub struct Scheduler { /// Incoming queue of scheduling requests. #[pin] requests: Fuse, + /// Debounce time to allow for deduplication of requests. It is added to the request's + /// initial expiration time. If another request with the same message arrives before + /// the request expires, its added to the new request's expiration time. This allows + /// for a request to be emitted, if the scheduler is "uninterrupted" for the configured + /// debounce period. Its primary purpose to deduplicate requests that expire instantly. + debounce: Duration, } impl Scheduler { - fn new(requests: R) -> Self { + fn new(requests: R, debounce: Duration) -> Self { Self { queue: DelayQueue::new(), scheduled: HashMap::new(), pending: HashSet::new(), requests: requests.fuse(), + debounce, } } } @@ -67,12 +75,15 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { return; } match self.scheduled.entry(request.message) { + // If new request is supposed to be earlier than the current entry's scheduled + // time (for eg: the new request is user triggered and the current entry is the + // reconciler's usual retry), then give priority to the new request. Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => { // Old entry will run after the new request, so replace it.. let entry = old_entry.get_mut(); - // TODO: this should add a little delay here to actually debounce - self.queue.reset_at(&entry.queue_key, request.run_at); - entry.run_at = request.run_at; + self.queue + .reset_at(&entry.queue_key, request.run_at + *self.debounce); + entry.run_at = request.run_at + *self.debounce; old_entry.replace_key(); } Entry::Occupied(_old_entry) => { @@ -82,8 +93,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { // No old entry, we're free to go! let message = entry.key().clone(); entry.insert(ScheduledEntry { - run_at: request.run_at, - queue_key: self.queue.insert_at(message, request.run_at), + run_at: request.run_at + *self.debounce, + queue_key: self.queue.insert_at(message, request.run_at + *self.debounce), }); } } @@ -203,14 +214,29 @@ where /// /// The [`Scheduler`] terminates as soon as `requests` does. pub fn scheduler>>(requests: S) -> Scheduler { - Scheduler::new(requests) + Scheduler::new(requests, Duration::ZERO) +} + +/// Stream transformer that delays and deduplicates [`Stream`] items. +/// +/// The debounce period lets the scheduler deduplicate requests that ask to be +/// emitted instantly, by making sure we wait for the configured period of time +/// to receive an uninterrupted request before actually emitting it. +/// +/// For more info, see [`scheduler()`]. +#[allow(clippy::module_name_repetitions)] +pub fn debounced_scheduler>>( + requests: S, + debounce: Duration, +) -> Scheduler { + Scheduler::new(requests, debounce) } #[cfg(test)] mod tests { use crate::utils::KubeRuntimeStreamExt; - use super::{scheduler, ScheduleRequest}; + use super::{debounced_scheduler, scheduler, ScheduleRequest}; use derivative::Derivative; use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt}; use std::task::Poll; @@ -447,4 +473,60 @@ mod tests { ); assert_eq!(scheduler.map(|msg| msg.0).collect::>().await, vec![1]); } + + #[tokio::test] + async fn scheduler_should_add_debounce_to_a_request() { + pause(); + + let now = Instant::now(); + let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); + let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2)); + + sched_tx + .send(ScheduleRequest { + message: SingletonMessage(1), + run_at: now, + }) + .await + .unwrap(); + advance(Duration::from_secs(1)).await; + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(3)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 1); + } + + #[tokio::test] + async fn scheduler_should_dedup_message_within_debounce_period() { + pause(); + + let mut now = Instant::now(); + let (mut sched_tx, sched_rx) = mpsc::unbounded::>(); + let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(3)); + + sched_tx + .send(ScheduleRequest { + message: SingletonMessage(1), + run_at: now, + }) + .await + .unwrap(); + assert!(poll!(scheduler.next()).is_pending()); + advance(Duration::from_secs(1)).await; + + now = Instant::now(); + sched_tx + .send(ScheduleRequest { + message: SingletonMessage(2), + run_at: now, + }) + .await + .unwrap(); + // Check if the initial request was indeed duplicated. + advance(Duration::from_millis(2500)).await; + assert!(poll!(scheduler.next()).is_pending()); + + advance(Duration::from_secs(3)).await; + assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 2); + assert!(poll!(scheduler.next()).is_pending()); + } }