diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 30aa932d5..0703baa10 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -14,7 +14,7 @@ use kube_client::{ Api, Error as ClientErr, }; use serde::de::DeserializeOwned; -use std::{clone::Clone, collections::VecDeque, fmt::Debug, time::Duration}; +use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration}; use thiserror::Error; use tracing::{debug, error, warn}; @@ -844,18 +844,36 @@ pub fn watch_object Some(Ok(Some(obj))), - // Pass up `None` for Deleted - Ok(Event::Delete(_)) => Some(Ok(None)), - // Ignore marker events - Ok(Event::Init | Event::InitDone) => None, - // Bubble up errors - Err(err) => Some(Err(err)), - } - }) + watcher(api, Config::default().fields(&fields)) + // track whether the object was seen in each initial listing + .scan(false, |obj_seen, event| { + if matches!(event, Ok(Event::Init)) { + *obj_seen = false; + } else if matches!(event, Ok(Event::InitApply(_))) { + *obj_seen = true; + } + future::ready(Some((*obj_seen, event))) + }) + .filter_map(|(obj_seen, event)| async move { + match event { + // Pass up `Some` for Found / Updated + Ok(Event::Apply(obj)) | Ok(Event::InitApply(obj)) => Some(Ok(Some(obj))), + // Pass up `None` for Deleted + Ok(Event::Delete(_)) => Some(Ok(None)), + // Ignore marker event + Ok(Event::Init) => None, + // Pass up `None` if the object wasn't seen in any initial list + Ok(Event::InitDone) => { + if obj_seen { + None + } else { + Some(Ok(None)) + } + } + // Bubble up errors + Err(err) => Some(Err(err)), + } + }) } /// Default watcher backoff inspired by Kubernetes' client-go.