Skip to content

Commit

Permalink
Merge pull request #1243 from nightkr/feature/store-readiness
Browse files Browse the repository at this point in the history
Track store readiness
  • Loading branch information
nightkr committed Jul 6, 2023
2 parents bda3b90 + 047a73f commit db585dd
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 16 deletions.
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ serde_json = "1.0.68"
tokio = { version = "1.14.0", features = ["full", "test-util"] }
rand = "0.8.0"
schemars = "0.8.6"
tracing-subscriber = "0.3.17"

[dev-dependencies.k8s-openapi]
version = "0.18.0"
Expand Down
14 changes: 13 additions & 1 deletion kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use self::runner::Runner;
use crate::{
reflector::{
reflector,
self, reflector,
store::{Store, Writer},
ObjectRef,
},
Expand Down Expand Up @@ -36,6 +36,8 @@ use tracing::{info_span, Instrument};
mod future_hash_map;
mod runner;

pub type RunnerError = runner::Error<reflector::store::WriterDropped>;

#[derive(Debug, Error)]
pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
#[error("tried to reconcile object {0} that was not found in local store")]
Expand All @@ -44,6 +46,8 @@ pub enum Error<ReconcilerErr: 'static, QueueErr: 'static> {
ReconcilerFailed(#[source] ReconcilerErr, ObjectRef<DynamicObject>),
#[error("event queue error")]
QueueError(#[source] QueueErr),
#[error("runner error")]
RunnerError(#[source] RunnerError),
}

/// Results of the reconciliation attempt
Expand Down Expand Up @@ -262,6 +266,7 @@ where
let (scheduler_tx, scheduler_rx) =
channel::mpsc::channel::<ScheduleRequest<ReconcileRequest<K>>>(APPLIER_REQUEUE_BUF_SIZE);
let error_policy = Arc::new(error_policy);
let delay_store = store.clone();
// Create a stream of ObjectRefs that need to be reconciled
trystream_try_via(
// input: stream combining scheduled tasks and user specified inputs event
Expand Down Expand Up @@ -319,6 +324,13 @@ where
None => future::err(Error::ObjectNotFound(request.obj_ref.erase())).right_future(),
}
})
.delay_tasks_until(async move {
tracing::debug!("applier runner held until store is ready");
let res = delay_store.wait_until_ready().await;
tracing::debug!("store is ready, starting runner");
res
})
.map(|runner_res| runner_res.unwrap_or_else(|err| Err(Error::RunnerError(err))))
.on_complete(async { tracing::debug!("applier runner terminated") })
},
)
Expand Down
185 changes: 173 additions & 12 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,20 @@
use super::future_hash_map::FutureHashMap;
use crate::scheduler::{ScheduleRequest, Scheduler};
use futures::{Future, Stream, StreamExt};
use futures::{future, Future, FutureExt, Stream, StreamExt};
use pin_project::pin_project;
use std::{
convert::Infallible,
hash::Hash,
pin::Pin,
task::{Context, Poll},
};
use thiserror::Error;

#[derive(Debug, Error)]
pub enum Error<ReadyErr> {
#[error("readiness gate failed to become ready")]
Readiness(#[source] ReadyErr),
}

/// Pulls items from a [`Scheduler`], and runs an action for each item in parallel,
/// while making sure to not process [equal](`Eq`) items multiple times at once.
Expand All @@ -15,11 +23,15 @@ use std::{
/// already being processed then it will be held pending until the current item
/// is finished.
#[pin_project]
pub struct Runner<T, R, F, MkF> {
pub struct Runner<T, R, F, MkF, Ready = future::Ready<Result<(), Infallible>>> {
#[pin]
scheduler: Scheduler<T, R>,
run_msg: MkF,
slots: FutureHashMap<T, F>,
#[pin]
ready_to_execute_after: future::Fuse<Ready>,
is_ready_to_execute: bool,
stopped: bool,
}

impl<T, R, F, MkF> Runner<T, R, F, MkF>
Expand All @@ -32,35 +44,70 @@ where
scheduler,
run_msg,
slots: FutureHashMap::default(),
ready_to_execute_after: future::ready(Ok(())).fuse(),
is_ready_to_execute: false,
stopped: false,
}
}

/// Wait for `ready_to_execute_after` to complete before starting to run any scheduled tasks.
///
/// `scheduler` will still be polled in the meantime.
pub fn delay_tasks_until<Ready, ReadyErr>(
self,
ready_to_execute_after: Ready,
) -> Runner<T, R, F, MkF, Ready>
where
Ready: Future<Output = Result<(), ReadyErr>>,
{
Runner {
scheduler: self.scheduler,
run_msg: self.run_msg,
slots: self.slots,
ready_to_execute_after: ready_to_execute_after.fuse(),
is_ready_to_execute: false,
stopped: false,
}
}
}

impl<T, R, F, MkF> Stream for Runner<T, R, F, MkF>
impl<T, R, F, MkF, Ready, ReadyErr> Stream for Runner<T, R, F, MkF, Ready>
where
T: Eq + Hash + Clone + Unpin,
R: Stream<Item = ScheduleRequest<T>>,
F: Future + Unpin,
MkF: FnMut(&T) -> F,
Ready: Future<Output = Result<(), ReadyErr>>,
{
type Item = F::Output;
type Item = Result<F::Output, Error<ReadyErr>>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if *this.stopped {
return Poll::Ready(None);
}
let slots = this.slots;
let scheduler = &mut this.scheduler;
let has_active_slots = match slots.poll_next_unpin(cx) {
Poll::Ready(Some(result)) => return Poll::Ready(Some(result)),
Poll::Ready(Some(result)) => return Poll::Ready(Some(Ok(result))),
Poll::Ready(None) => false,
Poll::Pending => true,
};
match this.ready_to_execute_after.poll(cx) {
Poll::Ready(Ok(())) => *this.is_ready_to_execute = true,
Poll::Ready(Err(err)) => {
*this.stopped = true;
return Poll::Ready(Some(Err(Error::Readiness(err))));
}
Poll::Pending => {}
}
loop {
// Try to take take a new message that isn't already being processed
// leave the already-processing ones in the queue, so that we can take them once
// we're free again.
let next_msg_poll = scheduler
.as_mut()
.hold_unless(|msg| !slots.contains_key(msg))
.hold_unless(|msg| *this.is_ready_to_execute && !slots.contains_key(msg))
.poll_next_unpin(cx);
match next_msg_poll {
Poll::Ready(Some(msg)) => {
Expand Down Expand Up @@ -88,17 +135,25 @@ where

#[cfg(test)]
mod tests {
use super::Runner;
use crate::scheduler::{scheduler, ScheduleRequest};
use super::{Error, Runner};
use crate::{
scheduler::{scheduler, ScheduleRequest},
utils::delayed_init::{self, DelayedInit},
};
use futures::{
channel::{mpsc, oneshot},
future, poll, SinkExt, StreamExt,
future, poll, stream, FutureExt, SinkExt, StreamExt, TryStreamExt,
};
use std::{
cell::RefCell,
collections::{HashMap, HashSet},
sync::Mutex,
time::Duration,
};
use std::{cell::RefCell, time::Duration};
use tokio::{
runtime::Handle,
task::yield_now,
time::{pause, sleep, timeout, Instant},
time::{error::Elapsed, pause, sleep, timeout, Instant},
};

#[tokio::test]
Expand Down Expand Up @@ -170,8 +225,114 @@ mod tests {
// a timeout here *should* mean that the background task isn't getting awoken properly
// when the new message is ready.
assert_eq!(
timeout(Duration::from_secs(1), result_rx).await.unwrap().unwrap(),
timeout(Duration::from_secs(1), result_rx)
.await
.unwrap()
.unwrap()
.transpose()
.unwrap(),
Some(8)
);
}

#[tokio::test]
async fn runner_should_wait_for_readiness() {
let is_ready = Mutex::new(false);
let (delayed_init, ready) = DelayedInit::<()>::new();
let mut runner = Box::pin(
Runner::new(
scheduler(
stream::iter([ScheduleRequest {
message: 1u8,
run_at: Instant::now(),
}])
.chain(stream::pending()),
),
|msg| {
assert!(*is_ready.lock().unwrap());
future::ready(*msg)
},
)
.delay_tasks_until(ready.get()),
);
assert!(poll!(runner.next()).is_pending());
*is_ready.lock().unwrap() = true;
delayed_init.init(());
assert_eq!(runner.next().await.transpose().unwrap(), Some(1));
}

#[tokio::test]
async fn runner_should_dedupe_while_waiting_for_readiness() {
let is_ready = Mutex::new(false);
let (delayed_init, ready) = DelayedInit::<()>::new();
let mut runner = Box::pin(
Runner::new(
scheduler(
stream::iter([
ScheduleRequest {
message: 'a',
run_at: Instant::now(),
},
ScheduleRequest {
message: 'b',
run_at: Instant::now(),
},
ScheduleRequest {
message: 'a',
run_at: Instant::now(),
},
])
.chain(stream::pending()),
),
|msg| {
assert!(*is_ready.lock().unwrap());
future::ready(*msg)
},
)
.delay_tasks_until(ready.get()),
);
assert!(poll!(runner.next()).is_pending());
*is_ready.lock().unwrap() = true;
delayed_init.init(());
let mut message_counts = HashMap::new();
assert!(timeout(
Duration::from_secs(1),
runner.try_for_each(|msg| {
*message_counts.entry(msg).or_default() += 1;
async { Ok(()) }
})
)
.await
.is_err());
assert_eq!(message_counts, HashMap::from([('a', 1), ('b', 1)]));
}

#[tokio::test]
async fn runner_should_report_readiness_errors() {
let (delayed_init, ready) = DelayedInit::<()>::new();
let mut runner = Box::pin(
Runner::new(
scheduler(
stream::iter([ScheduleRequest {
message: (),
run_at: Instant::now(),
}])
.chain(stream::pending()),
),
|()| {
panic!("run_msg should never be invoked if readiness gate fails");
// It's "useless", but it helps to direct rustc to the correct types
#[allow(unreachable_code)]
future::ready(())
},
)
.delay_tasks_until(ready.get()),
);
assert!(poll!(runner.next()).is_pending());
drop(delayed_init);
assert!(matches!(
runner.try_collect::<Vec<_>>().await.unwrap_err(),
Error::Readiness(delayed_init::InitDropped)
));
}
}
Loading

0 comments on commit db585dd

Please sign in to comment.