Skip to content

Commit

Permalink
add controller::Config and debounce period to scheduler (#1265)
Browse files Browse the repository at this point in the history
* add `controller::Config` and debounce period to scheduler

Add `controller::Config` to allow configuring the behavior of the
controller. Introduce a debounce period for the scheduler to allow for
deduplication of requests. By default, the debounce period is set to 1
second.

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>

* add `scheduler_debounced()` to configure debounce for scheduler

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>

* address clippy warnings

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>

* improve tests and docs

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>

* make `controller::Config::debounce()` a builder

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>

---------

Signed-off-by: Sanskar Jaiswal <jaiswalsanskar078@gmail.com>
Co-authored-by: Eirik A <sszynrae@gmail.com>
  • Loading branch information
aryan9600 and clux committed Aug 8, 2023
1 parent a6499b7 commit 5e98a92
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 25 deletions.
2 changes: 1 addition & 1 deletion kube-client/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ impl Client {
// Error requesting
.or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
// Error from another middleware
.unwrap_or_else(|err| Error::Service(err))
.unwrap_or_else(Error::Service)
})?;
Ok(res)
}
Expand Down
69 changes: 54 additions & 15 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ use crate::{
store::{Store, Writer},
ObjectRef,
},
scheduler::{scheduler, ScheduleRequest},
scheduler::{debounced_scheduler, ScheduleRequest},
utils::{trystream_try_via, CancelableJoinHandle, KubeRuntimeStreamExt, StreamBackoff, WatchStreamExt},
watcher::{self, metadata_watcher, watcher, Config, DefaultBackoff},
watcher::{self, metadata_watcher, watcher, DefaultBackoff},
};
use backoff::backoff::Backoff;
use derivative::Derivative;
Expand Down Expand Up @@ -246,12 +246,14 @@ const APPLIER_REQUEUE_BUF_SIZE: usize = 100;
///
/// This is the "hard-mode" version of [`Controller`], which allows you some more customization
/// (such as triggering from arbitrary [`Stream`]s), at the cost of being a bit more verbose.
#[allow(clippy::needless_pass_by_value)]
pub fn applier<K, QueueStream, ReconcilerFut, Ctx>(
mut reconciler: impl FnMut(Arc<K>, Arc<Ctx>) -> ReconcilerFut,
error_policy: impl Fn(Arc<K>, &ReconcilerFut::Error, Arc<Ctx>) -> Action,
context: Arc<Ctx>,
store: Store<K>,
queue: QueueStream,
config: Config,
) -> impl Stream<Item = Result<(ObjectRef<K>, Action), Error<ReconcilerFut::Error, QueueStream::Error>>>
where
K: Clone + Resource + 'static,
Expand All @@ -276,7 +278,7 @@ where
.map_err(Error::QueueError)
.map_ok(|request| ScheduleRequest {
message: request.into(),
run_at: Instant::now() + Duration::from_millis(1),
run_at: Instant::now(),
})
.on_complete(async move {
// On error: scheduler has already been shut down and there is nothing for us to do
Expand All @@ -291,7 +293,7 @@ where
)),
// all the Oks from the select gets passed through the scheduler stream, and are then executed
move |s| {
Runner::new(scheduler(s), move |request| {
Runner::new(debounced_scheduler(s, config.debounce), move |request| {
let request = request.clone();
match store.get(&request.obj_ref) {
Some(obj) => {
Expand Down Expand Up @@ -417,6 +419,31 @@ where
}
}

/// Accumulates all options that can be used on a [`Controller`] invocation.
#[derive(Clone, Debug, Default)]
pub struct Config {
debounce: Duration,
}

impl Config {
/// The debounce duration used to deduplicate reconciliation requests.
///
/// When set to a non-zero duration, debouncing is enabled in the [`Scheduler`] resulting
/// in __trailing edge debouncing__ of reqonciler requests.
/// This option can help to reduce the amount of unnecessary reconciler calls
/// when using multiple controller relations, or during rapid phase transitions.
///
/// ## Warning
/// This option delays (and keeps delaying) reconcile requests for objects while
/// the object is updated. It can **permanently hide** updates from your reconciler
/// if set too high on objects that are updated frequently (like nodes).
#[must_use]
pub fn debounce(mut self, debounce: Duration) -> Self {
self.debounce = debounce;
self
}
}

/// Controller for a Resource `K`
///
/// A controller is an infinite stream of objects to be reconciled.
Expand Down Expand Up @@ -505,6 +532,7 @@ where
forceful_shutdown_selector: Vec<BoxFuture<'static, ()>>,
dyntype: K::DynamicType,
reader: Store<K>,
config: Config,
}

impl<K> Controller<K>
Expand All @@ -516,11 +544,11 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] controls to the possible subset of objects of `K` that you want to manage
/// The [`watcher::Config`] controls to the possible subset of objects of `K` that you want to manage
/// and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
/// For the full set of objects `K` in the given `Api` scope, you can use [`watcher::Config::default`].
#[must_use]
pub fn new(main_api: Api<K>, wc: Config) -> Self
pub fn new(main_api: Api<K>, wc: watcher::Config) -> Self
where
K::DynamicType: Default,
{
Expand All @@ -531,17 +559,17 @@ where
///
/// Takes an [`Api`] object that determines how the `Controller` listens for changes to the `K`.
///
/// The [`Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// The [`watcher::Config`] lets you define a possible subset of objects of `K` that you want the [`Api`]
/// to watch - in the Api's configured scope - and receive reconcile events for.
/// For the full set of objects `K` in the given `Api` scope, you can use [`Config::default`].
///
/// This variant constructor is for [`dynamic`] types found through discovery. Prefer [`Controller::new`] for static types.
///
/// [`Config`]: crate::watcher::Config
/// [`watcher::Config`]: crate::watcher::Config
/// [`Api`]: kube_client::Api
/// [`dynamic`]: kube_client::core::dynamic
/// [`Config::default`]: crate::watcher::Config::default
pub fn new_with(main_api: Api<K>, wc: Config, dyntype: K::DynamicType) -> Self {
pub fn new_with(main_api: Api<K>, wc: watcher::Config, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut trigger_selector = stream::SelectAll::new();
Expand All @@ -564,6 +592,7 @@ where
],
dyntype,
reader,
config: Default::default(),
}
}

Expand Down Expand Up @@ -649,9 +678,17 @@ where
],
dyntype,
reader,
config: Default::default(),
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
self.config = config;
self
}

/// Specify the backoff policy for "trigger" watches
///
/// This includes the core watch, as well as auxilary watches introduced by [`Self::owns`] and [`Self::watches`].
Expand Down Expand Up @@ -683,7 +720,7 @@ where
pub fn owns<Child: Clone + Resource<DynamicType = ()> + DeserializeOwned + Debug + Send + 'static>(
self,
api: Api<Child>,
wc: Config,
wc: watcher::Config,
) -> Self {
self.owns_with(api, (), wc)
}
Expand All @@ -696,7 +733,7 @@ where
mut self,
api: Api<Child>,
dyntype: Child::DynamicType,
wc: Config,
wc: watcher::Config,
) -> Self
where
Child::DynamicType: Debug + Eq + Hash + Clone,
Expand Down Expand Up @@ -847,7 +884,7 @@ where
pub fn watches<Other, I>(
self,
api: Api<Other>,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand All @@ -867,7 +904,7 @@ where
mut self,
api: Api<Other>,
dyntype: Other::DynamicType,
wc: Config,
wc: watcher::Config,
mapper: impl Fn(Other) -> I + Sync + Send + 'static,
) -> Self
where
Expand Down Expand Up @@ -1214,6 +1251,7 @@ where
self.reader,
StreamBackoff::new(self.trigger_selector, self.trigger_backoff)
.take_until(future::select_all(self.graceful_shutdown_selector)),
self.config,
)
.take_until(futures::future::select_all(self.forceful_shutdown_selector))
}
Expand All @@ -1228,7 +1266,7 @@ mod tests {
applier,
reflector::{self, ObjectRef},
watcher::{self, metadata_watcher, watcher, Event},
Controller,
Config, Controller,
};
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use k8s_openapi::api::core::v1::ConfigMap;
Expand Down Expand Up @@ -1307,6 +1345,7 @@ mod tests {
Arc::new(()),
store_rx,
queue_rx.map(Result::<_, Infallible>::Ok),
Config::default(),
);
pin_mut!(applier);
for i in 0..items {
Expand Down
2 changes: 2 additions & 0 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,8 @@ mod tests {
let mut count = 0;
let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
// The debounce period needs to zero because a debounce period > 0
// will lead to the second request to be discarded.
Runner::new(scheduler(sched_rx), |_| {
count += 1;
// Panic if this ref is already held, to simulate some unsafe action..
Expand Down
2 changes: 1 addition & 1 deletion kube-runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pub mod utils;
pub mod wait;
pub mod watcher;

pub use controller::{applier, Controller};
pub use controller::{applier, Config, Controller};
pub use finalizer::finalizer;
pub use reflector::reflector;
pub use scheduler::scheduler;
Expand Down
98 changes: 90 additions & 8 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::{
hash::Hash,
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use tokio::time::Instant;
use tokio_util::time::delay_queue::{self, DelayQueue};
Expand Down Expand Up @@ -44,15 +45,22 @@ pub struct Scheduler<T, R> {
/// Incoming queue of scheduling requests.
#[pin]
requests: Fuse<R>,
/// Debounce time to allow for deduplication of requests. It is added to the request's
/// initial expiration time. If another request with the same message arrives before
/// the request expires, its added to the new request's expiration time. This allows
/// for a request to be emitted, if the scheduler is "uninterrupted" for the configured
/// debounce period. Its primary purpose to deduplicate requests that expire instantly.
debounce: Duration,
}

impl<T, R: Stream> Scheduler<T, R> {
fn new(requests: R) -> Self {
fn new(requests: R, debounce: Duration) -> Self {
Self {
queue: DelayQueue::new(),
scheduled: HashMap::new(),
pending: HashSet::new(),
requests: requests.fuse(),
debounce,
}
}
}
Expand All @@ -67,12 +75,15 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
return;
}
match self.scheduled.entry(request.message) {
// If new request is supposed to be earlier than the current entry's scheduled
// time (for eg: the new request is user triggered and the current entry is the
// reconciler's usual retry), then give priority to the new request.
Entry::Occupied(mut old_entry) if old_entry.get().run_at >= request.run_at => {
// Old entry will run after the new request, so replace it..
let entry = old_entry.get_mut();
// TODO: this should add a little delay here to actually debounce
self.queue.reset_at(&entry.queue_key, request.run_at);
entry.run_at = request.run_at;
self.queue
.reset_at(&entry.queue_key, request.run_at + *self.debounce);
entry.run_at = request.run_at + *self.debounce;
old_entry.replace_key();
}
Entry::Occupied(_old_entry) => {
Expand All @@ -82,8 +93,8 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
// No old entry, we're free to go!
let message = entry.key().clone();
entry.insert(ScheduledEntry {
run_at: request.run_at,
queue_key: self.queue.insert_at(message, request.run_at),
run_at: request.run_at + *self.debounce,
queue_key: self.queue.insert_at(message, request.run_at + *self.debounce),
});
}
}
Expand Down Expand Up @@ -203,14 +214,29 @@ where
///
/// The [`Scheduler`] terminates as soon as `requests` does.
pub fn scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(requests: S) -> Scheduler<T, S> {
Scheduler::new(requests)
Scheduler::new(requests, Duration::ZERO)
}

/// Stream transformer that delays and deduplicates [`Stream`] items.
///
/// The debounce period lets the scheduler deduplicate requests that ask to be
/// emitted instantly, by making sure we wait for the configured period of time
/// to receive an uninterrupted request before actually emitting it.
///
/// For more info, see [`scheduler()`].
#[allow(clippy::module_name_repetitions)]
pub fn debounced_scheduler<T: Eq + Hash + Clone, S: Stream<Item = ScheduleRequest<T>>>(
requests: S,
debounce: Duration,
) -> Scheduler<T, S> {
Scheduler::new(requests, debounce)
}

#[cfg(test)]
mod tests {
use crate::utils::KubeRuntimeStreamExt;

use super::{scheduler, ScheduleRequest};
use super::{debounced_scheduler, scheduler, ScheduleRequest};
use derivative::Derivative;
use futures::{channel::mpsc, future, pin_mut, poll, stream, FutureExt, SinkExt, StreamExt};
use std::task::Poll;
Expand Down Expand Up @@ -447,4 +473,60 @@ mod tests {
);
assert_eq!(scheduler.map(|msg| msg.0).collect::<Vec<_>>().await, vec![1]);
}

#[tokio::test]
async fn scheduler_should_add_debounce_to_a_request() {
pause();

let now = Instant::now();
let (mut sched_tx, sched_rx) = mpsc::unbounded::<ScheduleRequest<SingletonMessage>>();
let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(2));

sched_tx
.send(ScheduleRequest {
message: SingletonMessage(1),
run_at: now,
})
.await
.unwrap();
advance(Duration::from_secs(1)).await;
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(3)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 1);
}

#[tokio::test]
async fn scheduler_should_dedup_message_within_debounce_period() {
pause();

let mut now = Instant::now();
let (mut sched_tx, sched_rx) = mpsc::unbounded::<ScheduleRequest<SingletonMessage>>();
let mut scheduler = debounced_scheduler(sched_rx, Duration::from_secs(3));

sched_tx
.send(ScheduleRequest {
message: SingletonMessage(1),
run_at: now,
})
.await
.unwrap();
assert!(poll!(scheduler.next()).is_pending());
advance(Duration::from_secs(1)).await;

now = Instant::now();
sched_tx
.send(ScheduleRequest {
message: SingletonMessage(2),
run_at: now,
})
.await
.unwrap();
// Check if the initial request was indeed duplicated.
advance(Duration::from_millis(2500)).await;
assert!(poll!(scheduler.next()).is_pending());

advance(Duration::from_secs(3)).await;
assert_eq!(scheduler.next().now_or_never().unwrap().unwrap().0, 2);
assert!(poll!(scheduler.next()).is_pending());
}
}

0 comments on commit 5e98a92

Please sign in to comment.