diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 9e16a29e9..8814219a8 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -54,6 +54,7 @@ serde_json = "1.0.68" tokio = { version = "1.14.0", features = ["full", "test-util"] } rand = "0.8.0" schemars = "0.8.6" +tracing-subscriber = "0.3.17" [dev-dependencies.k8s-openapi] version = "0.18.0" diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index ee7be9f44..f4a03315d 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -3,7 +3,7 @@ use self::runner::Runner; use crate::{ reflector::{ - reflector, + self, reflector, store::{Store, Writer}, ObjectRef, }, @@ -36,6 +36,8 @@ use tracing::{info_span, Instrument}; mod future_hash_map; mod runner; +pub type RunnerError = runner::Error; + #[derive(Debug, Error)] pub enum Error { #[error("tried to reconcile object {0} that was not found in local store")] @@ -44,6 +46,8 @@ pub enum Error { ReconcilerFailed(#[source] ReconcilerErr, ObjectRef), #[error("event queue error")] QueueError(#[source] QueueErr), + #[error("runner error")] + RunnerError(#[source] RunnerError), } /// Results of the reconciliation attempt @@ -262,6 +266,7 @@ where let (scheduler_tx, scheduler_rx) = channel::mpsc::channel::>>(APPLIER_REQUEUE_BUF_SIZE); let error_policy = Arc::new(error_policy); + let delay_store = store.clone(); // Create a stream of ObjectRefs that need to be reconciled trystream_try_via( // input: stream combining scheduled tasks and user specified inputs event @@ -319,6 +324,13 @@ where None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(), } }) + .delay_tasks_until(async move { + tracing::debug!("applier runner held until store is ready"); + let res = delay_store.wait_until_ready().await; + tracing::debug!("store is ready, starting runner"); + res + }) + .map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err)))) .on_complete(async { tracing::debug!("applier runner terminated") }) }, ) diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index 4d6021a42..b9290d33d 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -1,12 +1,20 @@ use super::future_hash_map::FutureHashMap; use crate::scheduler::{ScheduleRequest, Scheduler}; -use futures::{Future, Stream, StreamExt}; +use futures::{future, Future, FutureExt, Stream, StreamExt}; use pin_project::pin_project; use std::{ + convert::Infallible, hash::Hash, pin::Pin, task::{Context, Poll}, }; +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("readiness gate failed to become ready")] + Readiness(#[source] ReadyErr), +} /// Pulls items from a [`Scheduler`], and runs an action for each item in parallel, /// while making sure to not process [equal](`Eq`) items multiple times at once. @@ -15,11 +23,15 @@ use std::{ /// already being processed then it will be held pending until the current item /// is finished. #[pin_project] -pub struct Runner { +pub struct Runner>> { #[pin] scheduler: Scheduler, run_msg: MkF, slots: FutureHashMap, + #[pin] + ready_to_execute_after: future::Fuse, + is_ready_to_execute: bool, + stopped: bool, } impl Runner @@ -32,35 +44,70 @@ where scheduler, run_msg, slots: FutureHashMap::default(), + ready_to_execute_after: future::ready(Ok(())).fuse(), + is_ready_to_execute: false, + stopped: false, + } + } + + /// Wait for `ready_to_execute_after` to complete before starting to run any scheduled tasks. + /// + /// `scheduler` will still be polled in the meantime. + pub fn delay_tasks_until( + self, + ready_to_execute_after: Ready, + ) -> Runner + where + Ready: Future>, + { + Runner { + scheduler: self.scheduler, + run_msg: self.run_msg, + slots: self.slots, + ready_to_execute_after: ready_to_execute_after.fuse(), + is_ready_to_execute: false, + stopped: false, } } } -impl Stream for Runner +impl Stream for Runner where T: Eq + Hash + Clone + Unpin, R: Stream>, F: Future + Unpin, MkF: FnMut(&T) -> F, + Ready: Future>, { - type Item = F::Output; + type Item = Result>; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut this = self.project(); + if *this.stopped { + return Poll::Ready(None); + } let slots = this.slots; let scheduler = &mut this.scheduler; let has_active_slots = match slots.poll_next_unpin(cx) { - Poll::Ready(Some(result)) => return Poll::Ready(Some(result)), + Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))), Poll::Ready(None) => false, Poll::Pending => true, }; + match this.ready_to_execute_after.poll(cx) { + Poll::Ready(Ok(())) => *this.is_ready_to_execute = true, + Poll::Ready(Err(err)) => { + *this.stopped = true; + return Poll::Ready(Some(Err(Error::Readiness(err)))); + } + Poll::Pending => {} + } loop { // Try to take take a new message that isn't already being processed // leave the already-processing ones in the queue, so that we can take them once // we're free again. let next_msg_poll = scheduler .as_mut() - .hold_unless(|msg| !slots.contains_key(msg)) + .hold_unless(|msg| *this.is_ready_to_execute && !slots.contains_key(msg)) .poll_next_unpin(cx); match next_msg_poll { Poll::Ready(Some(msg)) => { @@ -88,17 +135,25 @@ where #[cfg(test)] mod tests { - use super::Runner; - use crate::scheduler::{scheduler, ScheduleRequest}; + use super::{Error, Runner}; + use crate::{ + scheduler::{scheduler, ScheduleRequest}, + utils::delayed_init::{self, DelayedInit}, + }; use futures::{ channel::{mpsc, oneshot}, - future, poll, SinkExt, StreamExt, + future, poll, stream, FutureExt, SinkExt, StreamExt, TryStreamExt, + }; + use std::{ + cell::RefCell, + collections::{HashMap, HashSet}, + sync::Mutex, + time::Duration, }; - use std::{cell::RefCell, time::Duration}; use tokio::{ runtime::Handle, task::yield_now, - time::{pause, sleep, timeout, Instant}, + time::{error::Elapsed, pause, sleep, timeout, Instant}, }; #[tokio::test] @@ -170,8 +225,114 @@ mod tests { // a timeout here *should* mean that the background task isn't getting awoken properly // when the new message is ready. assert_eq!( - timeout(Duration::from_secs(1), result_rx).await.unwrap().unwrap(), + timeout(Duration::from_secs(1), result_rx) + .await + .unwrap() + .unwrap() + .transpose() + .unwrap(), Some(8) ); } + + #[tokio::test] + async fn runner_should_wait_for_readiness() { + let is_ready = Mutex::new(false); + let (delayed_init, ready) = DelayedInit::<()>::new(); + let mut runner = Box::pin( + Runner::new( + scheduler( + stream::iter([ScheduleRequest { + message: 1u8, + run_at: Instant::now(), + }]) + .chain(stream::pending()), + ), + |msg| { + assert!(*is_ready.lock().unwrap()); + future::ready(*msg) + }, + ) + .delay_tasks_until(ready.get()), + ); + assert!(poll!(runner.next()).is_pending()); + *is_ready.lock().unwrap() = true; + delayed_init.init(()); + assert_eq!(runner.next().await.transpose().unwrap(), Some(1)); + } + + #[tokio::test] + async fn runner_should_dedupe_while_waiting_for_readiness() { + let is_ready = Mutex::new(false); + let (delayed_init, ready) = DelayedInit::<()>::new(); + let mut runner = Box::pin( + Runner::new( + scheduler( + stream::iter([ + ScheduleRequest { + message: 'a', + run_at: Instant::now(), + }, + ScheduleRequest { + message: 'b', + run_at: Instant::now(), + }, + ScheduleRequest { + message: 'a', + run_at: Instant::now(), + }, + ]) + .chain(stream::pending()), + ), + |msg| { + assert!(*is_ready.lock().unwrap()); + future::ready(*msg) + }, + ) + .delay_tasks_until(ready.get()), + ); + assert!(poll!(runner.next()).is_pending()); + *is_ready.lock().unwrap() = true; + delayed_init.init(()); + let mut message_counts = HashMap::new(); + assert!(timeout( + Duration::from_secs(1), + runner.try_for_each(|msg| { + *message_counts.entry(msg).or_default() += 1; + async { Ok(()) } + }) + ) + .await + .is_err()); + assert_eq!(message_counts, HashMap::from([('a', 1), ('b', 1)])); + } + + #[tokio::test] + async fn runner_should_report_readiness_errors() { + let (delayed_init, ready) = DelayedInit::<()>::new(); + let mut runner = Box::pin( + Runner::new( + scheduler( + stream::iter([ScheduleRequest { + message: (), + run_at: Instant::now(), + }]) + .chain(stream::pending()), + ), + |()| { + panic!("run_msg should never be invoked if readiness gate fails"); + // It's "useless", but it helps to direct rustc to the correct types + #[allow(unreachable_code)] + future::ready(()) + }, + ) + .delay_tasks_until(ready.get()), + ); + assert!(poll!(runner.next()).is_pending()); + drop(delayed_init); + assert!(matches!( + runner.try_collect::>().await.unwrap_err(), + Error::Readiness(delayed_init::InitDropped) + )); + } } diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 49032934c..02cac1a8e 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -1,10 +1,14 @@ use super::ObjectRef; -use crate::watcher; +use crate::{ + utils::delayed_init::{self, DelayedInit}, + watcher, +}; use ahash::AHashMap; use derivative::Derivative; use kube_client::Resource; use parking_lot::RwLock; use std::{fmt::Debug, hash::Hash, sync::Arc}; +use thiserror::Error; type Cache = Arc, Arc>>>; @@ -12,14 +16,15 @@ type Cache = Arc, Arc>>>; /// /// This is exclusive since it's not safe to share a single `Store` between multiple reflectors. /// In particular, `Restarted` events will clobber the state of other connected reflectors. -#[derive(Debug, Derivative)] -#[derivative(Default(bound = "K::DynamicType: Default"))] +#[derive(Debug)] pub struct Writer where K::DynamicType: Eq + Hash, { store: Cache, dyntype: K::DynamicType, + ready_tx: Option>, + ready_rx: Arc>, } impl Writer @@ -31,9 +36,12 @@ where /// If the dynamic type is default-able (for example when writer is used with /// `k8s_openapi` types) you can use `Default` instead. pub fn new(dyntype: K::DynamicType) -> Self { + let (ready_tx, ready_rx) = DelayedInit::new(); Writer { store: Default::default(), dyntype, + ready_tx: Some(ready_tx), + ready_rx: Arc::new(ready_rx), } } @@ -45,6 +53,7 @@ where pub fn as_reader(&self) -> Store { Store { store: self.store.clone(), + ready_rx: self.ready_rx.clone(), } } @@ -73,6 +82,20 @@ where *self.store.write() = new_objs; } } + + // Mark as ready after the first event, "releasing" any calls to Store::wait_until_ready() + if let Some(ready_tx) = self.ready_tx.take() { + ready_tx.init(()) + } + } +} +impl Default for Writer +where + K: Resource + Clone + 'static, + K::DynamicType: Default + Eq + Hash + Clone, +{ + fn default() -> Self { + Self::new(K::DynamicType::default()) } } @@ -89,12 +112,27 @@ where K::DynamicType: Hash + Eq, { store: Cache, + ready_rx: Arc>, } +#[derive(Debug, Error)] +#[error("writer was dropped before store became ready")] +pub struct WriterDropped(delayed_init::InitDropped); + impl Store where K::DynamicType: Eq + Hash + Clone, { + /// Wait for the store to be populated by Kubernetes. + /// + /// Note that this will _not_ await the source calling the associated [`Writer`] (such as the [`reflector`]). + /// + /// # Errors + /// Returns an error if the [`Writer`] was dropped before any value was written. + pub async fn wait_until_ready(&self) -> Result<(), WriterDropped> { + self.ready_rx.get().await.map_err(WriterDropped) + } + /// Retrieve a `clone()` of the entry referred to by `key`, if it is in the cache. /// /// `key.namespace` is ignored for cluster-scoped resources. @@ -236,6 +274,7 @@ mod tests { }, ..ConfigMap::default() }; + #[allow(clippy::redundant_clone)] // false positive let mut nsed_cm = cm.clone(); nsed_cm.metadata.namespace = Some("ns".to_string()); let mut store_w = Writer::default(); diff --git a/kube-runtime/src/utils/delayed_init.rs b/kube-runtime/src/utils/delayed_init.rs new file mode 100644 index 000000000..01509ee2d --- /dev/null +++ b/kube-runtime/src/utils/delayed_init.rs @@ -0,0 +1,183 @@ +use std::{fmt::Debug, sync::Mutex, task::Poll}; + +use derivative::Derivative; +use futures::{channel, Future, FutureExt}; +use thiserror::Error; +use tracing::trace; + +/// The sending counterpart to a [`DelayedInit`] +pub struct Initializer(channel::oneshot::Sender); +impl Initializer { + /// Sends `value` to the linked [`DelayedInit`]. + pub fn init(self, value: T) { + // oneshot::Sender::send fails if no recipients remain, this is not really a relevant + // case to signal for our use case + let _ = self.0.send(value); + } +} +impl Debug for Initializer { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("delayed_init::Initializer") + .finish_non_exhaustive() + } +} + +/// A value that must be initialized by an external writer +/// +/// Can be considered equivalent to a [`channel::oneshot`] channel, except for that +/// the value produced is retained for subsequent calls to [`Self::get`]. +#[derive(Derivative)] +#[derivative(Debug)] +pub struct DelayedInit { + state: Mutex>, +} +#[derive(Debug)] +enum ReceiverState { + Waiting(channel::oneshot::Receiver), + Ready(Result), +} +impl DelayedInit { + /// Returns an empty `DelayedInit` that has no value, along with a linked [`Initializer`] + #[must_use] + pub fn new() -> (Initializer, Self) { + let (tx, rx) = channel::oneshot::channel(); + (Initializer(tx), DelayedInit { + state: Mutex::new(ReceiverState::Waiting(rx)), + }) + } +} +impl DelayedInit { + /// Wait for the value to be available and then return it + /// + /// Calling `get` again if a value has already been returned is guaranteed to return (a clone of) + /// the same value. + /// + /// # Errors + /// + /// Fails if the associated [`Initializer`] has been dropped before calling [`Initializer::init`]. + pub async fn get(&self) -> Result { + Get(self).await + } +} + +// Using a manually implemented future because we don't want to hold the lock across poll calls +// since that would mean that an unpolled writer would stall all other tasks from being able to poll it +struct Get<'a, T>(&'a DelayedInit); +impl<'a, T> Future for Get<'a, T> +where + T: Clone, +{ + type Output = Result; + + #[tracing::instrument(name = "DelayedInit::get", level = "trace", skip(self, cx))] + fn poll( + self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + let mut state = self.0.state.lock().unwrap(); + trace!("got lock lock"); + match &mut *state { + ReceiverState::Waiting(rx) => { + trace!("channel still active, polling"); + if let Poll::Ready(value) = rx.poll_unpin(cx).map_err(|_| InitDropped) { + trace!("got value on slow path, memoizing"); + *state = ReceiverState::Ready(value.clone()); + Poll::Ready(value) + } else { + trace!("channel is still pending"); + Poll::Pending + } + } + ReceiverState::Ready(v) => { + trace!("slow path but value was already initialized, another writer already initialized"); + Poll::Ready(v.clone()) + } + } + } +} + +#[derive(Debug, Error, Clone, Copy, PartialEq, Eq)] +#[error("initializer was dropped before value was initialized")] +pub struct InitDropped; + +#[cfg(test)] +mod tests { + use std::{ + sync::{Arc, Mutex}, + task::Poll, + }; + + use futures::{pin_mut, poll}; + use tracing::Level; + use tracing_subscriber::util::SubscriberInitExt; + + use crate::utils::delayed_init::ReceiverState; + + use super::DelayedInit; + + fn setup_tracing() -> tracing::dispatcher::DefaultGuard { + tracing_subscriber::fmt() + .with_max_level(Level::TRACE) + .with_test_writer() + .finish() + .set_default() + } + + #[tokio::test] + async fn must_allow_single_reader() { + let _tracing = setup_tracing(); + let (tx, rx) = DelayedInit::::new(); + let get1 = rx.get(); + pin_mut!(get1); + assert_eq!(poll!(get1.as_mut()), Poll::Pending); + tx.init(1); + assert_eq!(poll!(get1), Poll::Ready(Ok(1))); + } + + #[tokio::test] + async fn must_allow_concurrent_readers_while_waiting() { + let _tracing = setup_tracing(); + let (tx, rx) = DelayedInit::::new(); + let get1 = rx.get(); + let get2 = rx.get(); + let get3 = rx.get(); + pin_mut!(get1, get2, get3); + assert_eq!(poll!(get1.as_mut()), Poll::Pending); + assert_eq!(poll!(get2.as_mut()), Poll::Pending); + assert_eq!(poll!(get3.as_mut()), Poll::Pending); + tx.init(1); + assert_eq!(poll!(get1), Poll::Ready(Ok(1))); + assert_eq!(poll!(get2), Poll::Ready(Ok(1))); + assert_eq!(poll!(get3), Poll::Ready(Ok(1))); + } + + #[tokio::test] + async fn must_allow_reading_after_init() { + let _tracing = setup_tracing(); + let (tx, rx) = DelayedInit::::new(); + let get1 = rx.get(); + pin_mut!(get1); + assert_eq!(poll!(get1.as_mut()), Poll::Pending); + tx.init(1); + assert_eq!(poll!(get1), Poll::Ready(Ok(1))); + assert_eq!(rx.get().await, Ok(1)); + assert_eq!(rx.get().await, Ok(1)); + } + + #[tokio::test] + async fn must_allow_concurrent_readers_in_any_order() { + let _tracing = setup_tracing(); + let (tx, rx) = DelayedInit::::new(); + let get1 = rx.get(); + let get2 = rx.get(); + let get3 = rx.get(); + pin_mut!(get1, get2, get3); + assert_eq!(poll!(get1.as_mut()), Poll::Pending); + assert_eq!(poll!(get2.as_mut()), Poll::Pending); + assert_eq!(poll!(get3.as_mut()), Poll::Pending); + tx.init(1); + assert_eq!(poll!(get3), Poll::Ready(Ok(1))); + assert_eq!(poll!(get2), Poll::Ready(Ok(1))); + assert_eq!(poll!(get1), Poll::Ready(Ok(1))); + } +} diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index f2bc2a866..41d9d10ec 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -1,6 +1,7 @@ //! Helpers for manipulating built-in streams mod backoff_reset_timer; +pub(crate) mod delayed_init; mod event_flatten; #[cfg(feature = "unstable-runtime-predicates")] mod predicate; mod stream_backoff;