Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add shared stream interfaces #1449

Merged
merged 40 commits into from
Apr 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
15ba09d
Add a simple controller example
mateiidavid Jan 31, 2024
fc2fc42
Merge branch 'main' of github.com:kube-rs/kube into matei/arc-watcher
mateiidavid Jan 31, 2024
1657ddc
Add shared stream controller example
mateiidavid Feb 23, 2024
98255dc
Try to get something working
mateiidavid Feb 23, 2024
3685b9e
Rm my notes
mateiidavid Feb 23, 2024
683e77d
Results or objectefs
mateiidavid Feb 29, 2024
af7a309
Working shared stream
mateiidavid Feb 29, 2024
8d4d694
Different way of doing it
mateiidavid Feb 29, 2024
8534770
Switch to async_broadcast
mateiidavid Mar 2, 2024
9bbe8e1
Remove old, unused code
mateiidavid Mar 2, 2024
3f874ce
Remove unused examples
mateiidavid Mar 2, 2024
1e1e347
Gotta state machine this stuff
mateiidavid Mar 7, 2024
15f6e1d
Take 1 with try_recv
mateiidavid Mar 8, 2024
49eaf12
try_recv take 2
mateiidavid Mar 8, 2024
e7aad76
Working on names next
mateiidavid Mar 11, 2024
b6ff97f
Ok surprising this worked
mateiidavid Mar 13, 2024
7a570fd
Write tests and rename file to reflect dispatch
mateiidavid Mar 25, 2024
0256cb0
WIP
mateiidavid Mar 25, 2024
74f09f7
WIP 2
mateiidavid Mar 26, 2024
2d5a3b0
Start working on store side
mateiidavid Mar 26, 2024
0cb816b
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Mar 26, 2024
9bf111c
Tests are green
mateiidavid Mar 26, 2024
04a53d1
rm redundant trait bounds
mateiidavid Mar 26, 2024
6b5bd31
Update example with new interfaces
mateiidavid Mar 26, 2024
def0011
Add comments and a small todo
mateiidavid Mar 27, 2024
d69213a
Remove dispatch mod from utils
mateiidavid Mar 27, 2024
21dbbae
@clux's feedback
mateiidavid Apr 3, 2024
c7fc333
@clux's feedback
mateiidavid Apr 3, 2024
1b81f4c
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 3, 2024
c6d1027
Fix tests & clippy warns
mateiidavid Apr 3, 2024
a30f2e6
Run fmt
mateiidavid Apr 3, 2024
fef5d83
Update examples/shared_stream_controllers.rs
mateiidavid Apr 8, 2024
44c441e
@clux's feedback on examples
mateiidavid Apr 11, 2024
8347103
Fix name in ns
mateiidavid Apr 15, 2024
9f7edd1
Add comments and feature flags
mateiidavid Apr 15, 2024
e2399f1
Merge branch 'main' of github.com:kube-rs/kube into matei/fork-add-fu…
mateiidavid Apr 16, 2024
a14d6b4
Fix CI checks
mateiidavid Apr 16, 2024
de2eda1
Run rustfmt
mateiidavid Apr 16, 2024
276b75e
@clux's feedback
mateiidavid Apr 17, 2024
eca6be1
Run fmt
mateiidavid Apr 17, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ missing_docs = "deny"
ahash = "0.8"
anyhow = "1.0.71"
assert-json-diff = "2.0.2"
async-broadcast = "0.7.0"
async-stream = "0.3.5"
async-trait = "0.1.64"
backoff = "0.4.0"
base64 = "0.22.0"
Expand Down
4 changes: 4 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ crossterm = "0.27.0"
name = "configmapgen_controller"
path = "configmapgen_controller.rs"

[[example]]
name = "shared_stream_controllers"
path = "shared_stream_controllers.rs"

[[example]]
name = "crd_api"
path = "crd_api.rs"
Expand Down
187 changes: 187 additions & 0 deletions examples/shared_stream_controllers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
use std::{sync::Arc, time::Duration};

use futures::StreamExt;
use k8s_openapi::api::core::v1::{Pod, PodCondition};
use kube::{
api::{Patch, PatchParams},
runtime::{
controller::Action,
reflector::{self},
watcher, Config, Controller, WatchStreamExt,
},
Api, Client, ResourceExt,
};
use tracing::{debug, error, info, warn};

use thiserror::Error;

// Helper module that namespaces two constants describing a Kubernetes status condition
pub mod condition {
pub static UNDOCUMENTED_TYPE: &str = "UndocumentedPort";
pub static STATUS_TRUE: &str = "True";
}

const SUBSCRIBE_BUFFER_SIZE: usize = 256;

#[derive(Debug, Error)]
enum Error {
#[error("Failed to patch pod: {0}")]
WriteFailed(#[source] kube::Error),

#[error("Missing po field: {0}")]
MissingField(&'static str),
}

#[derive(Clone)]
struct Data {
client: Client,
}

/// A simple reconciliation function that will copy a pod's labels into the annotations.
async fn reconcile_metadata(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
let namespace = &pod.namespace().unwrap_or_default();
if namespace == "kube-system" {
return Ok(Action::await_change());
}

let mut pod = (*pod).clone();
pod.metadata.managed_fields = None;
// combine labels and annotations into a new map
let labels = pod.labels().clone().into_iter();
pod.annotations_mut().extend(labels);

let pod_api = Api::<Pod>::namespaced(
ctx.client.clone(),
pod.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
);

pod_api
.patch(
&pod.name_any(),
&PatchParams::apply("controller-1"),
&Patch::Apply(&pod),
)
.await
.map_err(Error::WriteFailed)?;

Ok(Action::requeue(Duration::from_secs(300)))
}

/// Another reconiliation function that will add an 'UndocumentedPort' condition to pods that do
/// do not have any ports declared across all containers.
async fn reconcile_status(pod: Arc<Pod>, ctx: Arc<Data>) -> Result<Action, Error> {
for container in pod.spec.clone().unwrap_or_default().containers.iter() {
if container.ports.clone().unwrap_or_default().len() != 0 {
debug!(name = %pod.name_any(), "Skipped updating pod with documented ports");
return Ok(Action::await_change());
}
}

let pod_api = Api::<Pod>::namespaced(
ctx.client.clone(),
pod.metadata
.namespace
.as_ref()
.ok_or_else(|| Error::MissingField(".metadata.name"))?,
);

let undocumented_condition = PodCondition {
type_: condition::UNDOCUMENTED_TYPE.into(),
status: condition::STATUS_TRUE.into(),
..Default::default()
};
let value = serde_json::json!({
"status": {
"name": pod.name_any(),
"kind": "Pod",
"conditions": vec![undocumented_condition]
}
});
pod_api
.patch_status(
&pod.name_any(),
&PatchParams::apply("controller-2"),
&Patch::Strategic(value),
)
.await
.map_err(Error::WriteFailed)?;

Ok(Action::requeue(Duration::from_secs(300)))
}

fn error_policy(obj: Arc<Pod>, error: &Error, _ctx: Arc<Data>) -> Action {
error!(%error, name = %obj.name_any(), "Failed reconciliation");
Action::requeue(Duration::from_secs(10))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();

let client = Client::try_default().await?;
let pods = Api::<Pod>::namespaced(client.clone(), "default");
let config = Config::default().concurrency(2);
let ctx = Arc::new(Data { client });

// Create a shared store with a predefined buffer that will be shared between subscribers.
let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE);
// Before threading an object watch through the store, create a subscriber.
// Any number of subscribers can be created from one writer.
let subscriber = writer
.subscribe()
.expect("subscribers can only be created from shared stores");

// Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to
// be able to consume updates, the reflector must be shared.
let pod_watch = watcher(pods.clone(), Default::default())
.default_backoff()
.reflect_shared(writer)
.for_each(|res| async move {
match res {
Ok(event) => debug!("Received event on root stream {event:?}"),
Err(error) => error!(%error, "Unexpected error when watching resource"),
}
});

// Create the first controller using the reconcile_metadata function. Controllers accept
// subscribers through a dedicated interface.
let metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader)
.with_config(config.clone())
.shutdown_on_signal()
.run(reconcile_metadata, error_policy, ctx.clone())
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled metadata {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile metadata"),
}
});

// Subscribers can be used to get a read handle on the store, if the initial handle has been
// moved or dropped.
let reader = subscriber.reader();
// Create the second controller using the reconcile_status function.
let status_controller = Controller::for_shared_stream(subscriber, reader)
.with_config(config)
.shutdown_on_signal()
.run(reconcile_status, error_policy, ctx)
.for_each(|res| async move {
match res {
Ok(v) => info!("Reconciled status {v:?}"),
Err(error) => warn!(%error, "Failed to reconcile status"),
}
});

// Drive streams to readiness. The initial watch (that is reflected) needs to be driven to
// consume events from the API Server and forward them to subscribers.
//
// Both controllers will operate on shared objects.
tokio::select! {
_ = futures::future::join(metadata_controller, status_controller) => {},
_ = pod_watch => {}
}

Ok(())
}
2 changes: 2 additions & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ backoff.workspace = true
async-trait.workspace = true
hashbrown.workspace = true
k8s-openapi.workspace = true
async-broadcast.workspace = true
async-stream.workspace = true

[dev-dependencies]
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }
Expand Down
133 changes: 133 additions & 0 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,28 @@
})
}

/// Enqueues the object itself for reconciliation when the object is behind a
/// shared pointer
#[cfg(feature = "unstable-runtime-subscribe")]
fn trigger_self_shared<K, S>(
stream: S,
dyntype: K::DynamicType,
) -> impl Stream<Item = Result<ReconcileRequest<K>, S::Error>>
where
// Input stream has item as some Arc'd Resource (via
// Controller::for_shared_stream)
S: TryStream<Ok = Arc<K>>,
K: Resource,
K::DynamicType: Clone,
{
trigger_with(stream, move |obj| {
Some(ReconcileRequest {
obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()),
reason: ReconcileReason::ObjectUpdated,

Check warning on line 143 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L140-L143

Added lines #L140 - L143 were not covered by tests
})
})
}

/// Enqueues any mapper returned `K` types for reconciliation
fn trigger_others<S, K, I>(
stream: S,
Expand Down Expand Up @@ -703,6 +725,117 @@
}
}

/// This is the same as [`Controller::for_stream`]. Instead of taking an
/// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
/// streams can be created out-of-band by subscribing on a store `Writer`.
/// Through this interface, multiple controllers can use the same root
/// (shared) input stream of resources to keep memory overheads smaller.
///
/// **N.B**: This constructor requires an
/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
/// feature.
///
/// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
/// need to share the stream.
///
/// ## Warning:
///
/// You **must** ensure the root stream (i.e. stream created through a `reflector()`)
/// is driven to readiness independently of this controller to ensure the
/// watcher never deadlocks.
///
/// # Example:
///
/// ```no_run
/// # use futures::StreamExt;
/// # use k8s_openapi::api::apps::v1::Deployment;
/// # use kube::runtime::controller::{Action, Controller};
/// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt};
/// # use kube::{Api, Client, Error, ResourceExt};
/// # use std::sync::Arc;
/// # async fn reconcile(_: Arc<Deployment>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
/// # fn error_policy(_: Arc<Deployment>, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() }
/// # async fn doc(client: kube::Client) {
/// let api: Api<Deployment> = Api::default_namespaced(client);
/// let (reader, writer) = reflector::store_shared(128);
/// let subscriber = writer
/// .subscribe()
/// .expect("subscribers can only be created from shared stores");
/// let deploys = watcher(api, watcher::Config::default())
/// .default_backoff()
/// .reflect(writer)
/// .applied_objects()
/// .for_each(|ev| async move {
/// match ev {
/// Ok(obj) => tracing::info!("got obj {obj:?}"),
/// Err(error) => tracing::error!(%error, "received error")
/// }
/// });
///
/// let controller = Controller::for_shared_stream(subscriber, reader)
/// .run(reconcile, error_policy, Arc::new(()))
/// .for_each(|ev| async move {
/// tracing::info!("reconciled {ev:?}")
/// });
///
/// // Drive streams using a select statement
/// tokio::select! {
/// _ = deploys => {},
/// _ = controller => {},
/// }
/// # }
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn for_shared_stream(trigger: impl Stream<Item = Arc<K>> + Send + 'static, reader: Store<K>) -> Self
where
K::DynamicType: Default,
{
Self::for_shared_stream_with(trigger, reader, Default::default())

Check warning on line 792 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L792

Added line #L792 was not covered by tests
}

/// This is the same as [`Controller::for_stream`]. Instead of taking an
/// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared
/// streams can be created out-of-band by subscribing on a store `Writer`.
/// Through this interface, multiple controllers can use the same root
/// (shared) input stream of resources to keep memory overheads smaller.
///
/// **N.B**: This constructor requires an
/// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21)
/// feature.
///
/// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not
/// need to share the stream.
///
/// This variant constructor is used for [`dynamic`] types found through
/// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e.
/// known at compile time).
///
/// [`dynamic`]: kube_client::core::dynamic
#[cfg(feature = "unstable-runtime-subscribe")]
pub fn for_shared_stream_with(
trigger: impl Stream<Item = Arc<K>> + Send + 'static,
reader: Store<K>,
dyntype: K::DynamicType,
) -> Self {
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed();
trigger_selector.push(self_watcher);

Check warning on line 821 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L819-L821

Added lines #L819 - L821 were not covered by tests
Self {
trigger_selector,
trigger_backoff: Box::<DefaultBackoff>::default(),
graceful_shutdown_selector: vec![

Check warning on line 825 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L824-L825

Added lines #L824 - L825 were not covered by tests
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
forceful_shutdown_selector: vec![

Check warning on line 829 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L829

Added line #L829 was not covered by tests
// Fallback future, ensuring that we never terminate if no additional futures are added to the selector
future::pending().boxed(),
],
dyntype,
reader,
config: Default::default(),

Check warning on line 835 in kube-runtime/src/controller/mod.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/mod.rs#L835

Added line #L835 was not covered by tests
}
}

/// Specify the configuration for the controller's behavior.
#[must_use]
pub fn with_config(mut self, config: Config) -> Self {
Expand Down
Loading
Loading