Skip to content

Commit

Permalink
Stream produced by watch_object will include an item when the objec…
Browse files Browse the repository at this point in the history
…t isn't in any initial list

fixes #1576
  • Loading branch information
markdingram committed Sep 11, 2024
1 parent 3d2471b commit e97bc20
Showing 1 changed file with 31 additions and 13 deletions.
44 changes: 31 additions & 13 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use kube_client::{
Api, Error as ClientErr,
};
use serde::de::DeserializeOwned;
use std::{clone::Clone, collections::VecDeque, fmt::Debug, time::Duration};
use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration};
use thiserror::Error;
use tracing::{debug, error, warn};

Expand Down Expand Up @@ -844,18 +844,36 @@ pub fn watch_object<K: Resource + Clone + DeserializeOwned + Debug + Send + 'sta
// filtering by object name in given scope, so there's at most one matching object
// footgun: Api::all may generate events from namespaced objects with the same name in different namespaces
let fields = format!("metadata.name={name}");
watcher(api, Config::default().fields(&fields)).filter_map(|event| async {
match event {
// Pass up `Some` for Found / Updated
Ok(Event::Apply(obj) | Event::InitApply(obj)) => Some(Ok(Some(obj))),
// Pass up `None` for Deleted
Ok(Event::Delete(_)) => Some(Ok(None)),
// Ignore marker events
Ok(Event::Init | Event::InitDone) => None,
// Bubble up errors
Err(err) => Some(Err(err)),
}
})
watcher(api, Config::default().fields(&fields))
// track whether the object was seen in each initial listing
.scan(false, |obj_seen, event| {
if matches!(event, Ok(Event::Init)) {
*obj_seen = false;
} else if matches!(event, Ok(Event::InitApply(_))) {
*obj_seen = true;
}
future::ready(Some((*obj_seen, event)))
})
.filter_map(|(obj_seen, event)| async move {
match event {
// Pass up `Some` for Found / Updated
Ok(Event::Apply(obj)) | Ok(Event::InitApply(obj)) => Some(Ok(Some(obj))),
// Pass up `None` for Deleted
Ok(Event::Delete(_)) => Some(Ok(None)),
// Ignore marker event
Ok(Event::Init) => None,
// Pass up `None` if the object wasn't seen in any initial list
Ok(Event::InitDone) => {
if obj_seen {
None
} else {
Some(Ok(None))
}
}
// Bubble up errors
Err(err) => Some(Err(err)),
}
})
}

/// Default watcher backoff inspired by Kubernetes' client-go.
Expand Down

0 comments on commit e97bc20

Please sign in to comment.