Skip to content

Commit b822ee6

Browse files
authored
Introduce support for persistent metadata watches (#1145)
* Introduce support for persistent metadata watches The `watch` (and `watch_metadata` respectively) functions on the Api type are fallible, and watches are not recovered. Errors may happen for any reason, such as network induced errors, restarts (etcd can only cache so many resourve versions), and so on. To get around these failures, we have a `watcher()` utility in the runtime crate that manages the underlying stream in a persistent way, recovering on failure. This change introduces support for persistent metadata watches, through a `metadata_watcher` function in the same crate. Watches may be established on any type of resources, the main difference is that the returned types no longer correspond to the type of the Api. Instead, a concrete metadata type is returned. To support this with no breaking changes and to allow for more maintable code, a few utility functions and traits are introduced in the `runtime` crate. Signed-off-by: Matei David <[email protected]> * Run clippy Signed-off-by: Matei David <[email protected]> * Make closure arg generic Signed-off-by: Matei David <[email protected]> * Fix doc test Signed-off-by: Matei David <[email protected]> * Bump MSRV to 1.63.0 Signed-off-by: Matei David <[email protected]> * Rename AsyncFn to StepFn Signed-off-by: Matei David <[email protected]> * Add a compile-time typecheck and a meta example to dynamic watcher Signed-off-by: Matei David <[email protected]> * Rename watch_metadata to metadata_watcher and allow module rep Signed-off-by: Matei David <[email protected]> * Add trait to specialize Api calls instead of relying on closures Signed-off-by: Matei David <[email protected]> * Change meta watcher fn name in example Signed-off-by: Matei David <[email protected]> * Parse evar as 1 Signed-off-by: Matei David <[email protected]> * Refactor dynamic_watcher example Signed-off-by: Matei David <[email protected]> --------- Signed-off-by: Matei David <[email protected]>
1 parent 5afa31e commit b822ee6

File tree

5 files changed

+207
-21
lines changed

5 files changed

+207
-21
lines changed

examples/dynamic_watcher.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,23 @@
1-
use futures::{StreamExt, TryStreamExt};
1+
use futures::{Stream, StreamExt, TryStreamExt};
22
use kube::{
33
api::{Api, DynamicObject, GroupVersionKind, ListParams, ResourceExt},
4-
discovery::{self, Scope},
5-
runtime::{watcher, WatchStreamExt},
4+
discovery::{self, ApiCapabilities, Scope},
5+
runtime::{metadata_watcher, watcher, WatchStreamExt},
66
Client,
77
};
8+
use serde::de::DeserializeOwned;
89
use tracing::*;
910

10-
use std::env;
11+
use std::{env, fmt::Debug};
1112

1213
#[tokio::main]
1314
async fn main() -> anyhow::Result<()> {
1415
tracing_subscriber::fmt::init();
1516
let client = Client::try_default().await?;
1617

18+
// If set will receive only the metadata for watched resources
19+
let watch_metadata = env::var("WATCH_METADATA").map(|s| s == "1").unwrap_or(false);
20+
1721
// Take dynamic resource identifiers:
1822
let group = env::var("GROUP").unwrap_or_else(|_| "clux.dev".into());
1923
let version = env::var("VERSION").unwrap_or_else(|_| "v1".into());
@@ -27,14 +31,29 @@ async fn main() -> anyhow::Result<()> {
2731
// Use the full resource info to create an Api with the ApiResource as its DynamicType
2832
let api = Api::<DynamicObject>::all_with(client, &ar);
2933

34+
// Start a metadata or a full resource watch
35+
if watch_metadata {
36+
handle_events(metadata_watcher(api, ListParams::default()), caps).await?
37+
} else {
38+
handle_events(watcher(api, ListParams::default()), caps).await?
39+
}
40+
41+
Ok(())
42+
}
43+
44+
async fn handle_events<K: kube::Resource + Clone + Debug + Send + DeserializeOwned + 'static>(
45+
stream: impl Stream<Item = watcher::Result<watcher::Event<K>>> + Send + 'static,
46+
api_caps: ApiCapabilities,
47+
) -> anyhow::Result<()> {
3048
// Fully compatible with kube-runtime
31-
let mut items = watcher(api, ListParams::default()).applied_objects().boxed();
49+
let mut items = stream.applied_objects().boxed();
3250
while let Some(p) = items.try_next().await? {
33-
if caps.scope == Scope::Cluster {
51+
if api_caps.scope == Scope::Cluster {
3452
info!("saw {}", p.name_any());
3553
} else {
3654
info!("saw {} in {}", p.name_any(), p.namespace().unwrap());
3755
}
3856
}
57+
3958
Ok(())
4059
}

kube-runtime/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ json-patch = "0.3.0"
3939
serde_json = "1.0.68"
4040
thiserror = "1.0.29"
4141
backoff = "0.4.0"
42+
async-trait = "0.1.64"
4243

4344
[dependencies.k8s-openapi]
4445
version = "0.17.0"

kube-runtime/src/controller/mod.rs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -895,17 +895,29 @@ mod tests {
895895
use crate::{
896896
applier,
897897
reflector::{self, ObjectRef},
898-
watcher, Controller,
898+
watcher::{self, metadata_watcher, watcher, Event},
899+
Controller,
899900
};
900-
use futures::{pin_mut, StreamExt, TryStreamExt};
901+
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
901902
use k8s_openapi::api::core::v1::ConfigMap;
902-
use kube_client::{core::ObjectMeta, Api};
903+
use kube_client::{core::ObjectMeta, Api, Resource};
904+
use serde::de::DeserializeOwned;
903905
use tokio::time::timeout;
904906

905907
fn assert_send<T: Send>(x: T) -> T {
906908
x
907909
}
908910

911+
// Used to typecheck that a type T is a generic type that implements Stream
912+
// and returns a WatchEvent generic over a resource `K`
913+
fn assert_stream<T, K>(x: T) -> T
914+
where
915+
T: Stream<Item = watcher::Result<Event<K>>> + Send,
916+
K: Resource + Clone + DeserializeOwned + std::fmt::Debug + Send + 'static,
917+
{
918+
x
919+
}
920+
909921
fn mock_type<T>() -> T {
910922
unimplemented!(
911923
"mock_type is not supposed to be called, only used for filling holes in type assertions"
@@ -924,6 +936,20 @@ mod tests {
924936
);
925937
}
926938

939+
// not #[test] because we don't want to actually run it, we just want to
940+
// assert that it typechecks
941+
//
942+
// will check return types for `watcher` and `watch_metadata` do not drift
943+
// given an arbitrary K that implements `Resource` (e.g ConfigMap)
944+
#[allow(dead_code, unused_must_use)]
945+
fn test_watcher_stream_type_drift() {
946+
assert_stream(watcher(mock_type::<Api<ConfigMap>>(), Default::default()));
947+
assert_stream(metadata_watcher(
948+
mock_type::<Api<ConfigMap>>(),
949+
Default::default(),
950+
));
951+
}
952+
927953
#[tokio::test]
928954
async fn applier_must_not_deadlock_if_reschedule_buffer_fills() {
929955
// This tests that `applier` handles reschedule queue backpressure correctly, by trying to flood it with no-op reconciles

kube-runtime/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,4 @@ pub use finalizer::finalizer;
3232
pub use reflector::reflector;
3333
pub use scheduler::scheduler;
3434
pub use utils::WatchStreamExt;
35-
pub use watcher::watcher;
35+
pub use watcher::{metadata_watcher, watcher};

kube-runtime/src/watcher.rs

Lines changed: 151 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,13 @@
33
//! See [`watcher`] for the primary entry point.
44
55
use crate::utils::ResetTimerBackoff;
6+
use async_trait::async_trait;
67
use backoff::{backoff::Backoff, ExponentialBackoff};
78
use derivative::Derivative;
89
use futures::{stream::BoxStream, Stream, StreamExt};
910
use kube_client::{
1011
api::{ListParams, Resource, ResourceExt, WatchEvent},
12+
core::{metadata::PartialObjectMeta, ObjectList},
1113
error::ErrorResponse,
1214
Api, Error as ClientErr,
1315
};
@@ -114,7 +116,7 @@ impl<K> Event<K> {
114116
#[derive(Derivative)]
115117
#[derivative(Debug)]
116118
/// The internal finite state machine driving the [`watcher`]
117-
enum State<K: Resource + Clone> {
119+
enum State<K> {
118120
/// The Watcher is empty, and the next [`poll`](Stream::poll_next) will start the initial LIST to get all existing objects
119121
Empty,
120122
/// The initial LIST was successful, so we should move on to starting the actual watch.
@@ -132,15 +134,85 @@ enum State<K: Resource + Clone> {
132134
},
133135
}
134136

137+
/// Used to control whether the watcher receives the full object, or only the
138+
/// metadata
139+
#[async_trait]
140+
trait ApiMode {
141+
type Value: Clone;
142+
143+
async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
144+
async fn watch(
145+
&self,
146+
lp: &ListParams,
147+
version: &str,
148+
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
149+
}
150+
151+
/// A wrapper around the `Api` of a `Resource` type that when used by the
152+
/// watcher will return the entire (full) object
153+
struct FullObject<'a, K> {
154+
api: &'a Api<K>,
155+
}
156+
157+
#[async_trait]
158+
impl<K> ApiMode for FullObject<'_, K>
159+
where
160+
K: Clone + Debug + DeserializeOwned + Send + 'static,
161+
{
162+
type Value = K;
163+
164+
async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
165+
self.api.list(lp).await
166+
}
167+
168+
async fn watch(
169+
&self,
170+
lp: &ListParams,
171+
version: &str,
172+
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
173+
self.api.watch(lp, version).await.map(StreamExt::boxed)
174+
}
175+
}
176+
177+
/// A wrapper around the `Api` of a `Resource` type that when used by the
178+
/// watcher will return only the metadata associated with an object
179+
struct MetaOnly<'a, K> {
180+
api: &'a Api<K>,
181+
}
182+
183+
#[async_trait]
184+
impl<K> ApiMode for MetaOnly<'_, K>
185+
where
186+
K: Clone + Debug + DeserializeOwned + Send + 'static,
187+
{
188+
type Value = PartialObjectMeta;
189+
190+
async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>> {
191+
self.api.list_metadata(lp).await
192+
}
193+
194+
async fn watch(
195+
&self,
196+
lp: &ListParams,
197+
version: &str,
198+
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>> {
199+
self.api.watch_metadata(lp, version).await.map(StreamExt::boxed)
200+
}
201+
}
202+
135203
/// Progresses the watcher a single step, returning (event, state)
136204
///
137205
/// This function should be trampolined: if event == `None`
138206
/// then the function should be called again until it returns a Some.
139-
async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
140-
api: &Api<K>,
207+
async fn step_trampolined<A>(
208+
api: &A,
141209
list_params: &ListParams,
142-
state: State<K>,
143-
) -> (Option<Result<Event<K>>>, State<K>) {
210+
state: State<A::Value>,
211+
) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
212+
where
213+
A: ApiMode,
214+
A::Value: Resource + 'static,
215+
{
144216
match state {
145217
State::Empty => match api.list(list_params).await {
146218
Ok(list) => {
@@ -164,7 +236,7 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
164236
State::InitListed { resource_version } => match api.watch(list_params, &resource_version).await {
165237
Ok(stream) => (None, State::Watching {
166238
resource_version,
167-
stream: stream.boxed(),
239+
stream,
168240
}),
169241
Err(err) => {
170242
if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) {
@@ -234,11 +306,15 @@ async fn step_trampolined<K: Resource + Clone + DeserializeOwned + Debug + Send
234306
}
235307

236308
/// Trampoline helper for `step_trampolined`
237-
async fn step<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
238-
api: &Api<K>,
309+
async fn step<A>(
310+
api: &A,
239311
list_params: &ListParams,
240-
mut state: State<K>,
241-
) -> (Result<Event<K>>, State<K>) {
312+
mut state: State<A::Value>,
313+
) -> (Result<Event<A::Value>>, State<A::Value>)
314+
where
315+
A: ApiMode,
316+
A::Value: Resource + 'static,
317+
{
242318
loop {
243319
match step_trampolined(api, list_params, state).await {
244320
(Some(result), new_state) => return (result, new_state),
@@ -303,7 +379,71 @@ pub fn watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
303379
futures::stream::unfold(
304380
(api, list_params, State::Empty),
305381
|(api, list_params, state)| async {
306-
let (event, state) = step(&api, &list_params, state).await;
382+
let (event, state) = step(&FullObject { api: &api }, &list_params, state).await;
383+
Some((event, (api, list_params, state)))
384+
},
385+
)
386+
}
387+
388+
/// Watches a Kubernetes Resource for changes continuously and receives only the
389+
/// metadata
390+
///
391+
/// Compared to [`Api::watch`], this automatically tries to recover the stream upon errors.
392+
///
393+
/// Errors from the underlying watch are propagated, after which the stream will go into recovery mode on the next poll.
394+
/// You can apply your own backoff by not polling the stream for a duration after errors.
395+
/// Keep in mind that some [`TryStream`](futures::TryStream) combinators (such as
396+
/// [`try_for_each`](futures::TryStreamExt::try_for_each) and [`try_concat`](futures::TryStreamExt::try_concat))
397+
/// will terminate eagerly as soon as they receive an [`Err`].
398+
///
399+
/// This is intended to provide a safe and atomic input interface for a state store like a [`reflector`].
400+
/// Direct users may want to flatten composite events via [`WatchStreamExt`]:
401+
///
402+
/// ```no_run
403+
/// use kube::{
404+
/// api::{Api, ListParams, ResourceExt}, Client,
405+
/// runtime::{watcher, metadata_watcher, WatchStreamExt}
406+
/// };
407+
/// use k8s_openapi::api::core::v1::Pod;
408+
/// use futures::{StreamExt, TryStreamExt};
409+
/// #[tokio::main]
410+
/// async fn main() -> Result<(), watcher::Error> {
411+
/// let client = Client::try_default().await.unwrap();
412+
/// let pods: Api<Pod> = Api::namespaced(client, "apps");
413+
///
414+
/// metadata_watcher(pods, ListParams::default()).applied_objects()
415+
/// .try_for_each(|p| async move {
416+
/// println!("Applied: {}", p.name_any());
417+
/// Ok(())
418+
/// })
419+
/// .await?;
420+
/// Ok(())
421+
/// }
422+
/// ```
423+
/// [`WatchStreamExt`]: super::WatchStreamExt
424+
/// [`reflector`]: super::reflector::reflector
425+
/// [`Api::watch`]: kube_client::Api::watch
426+
///
427+
/// # Recovery
428+
///
429+
/// The stream will attempt to be recovered on the next poll after an [`Err`] is returned.
430+
/// This will normally happen immediately, but you can use [`StreamBackoff`](crate::utils::StreamBackoff)
431+
/// to introduce an artificial delay. [`default_backoff`] returns a suitable default set of parameters.
432+
///
433+
/// If the watch connection is interrupted, then `watcher` will attempt to restart the watch using the last
434+
/// [resource version](https://kubernetes.io/docs/reference/using-api/api-concepts/#efficient-detection-of-changes)
435+
/// that we have seen on the stream. If this is successful then the stream is simply resumed from where it left off.
436+
/// If this fails because the resource version is no longer valid then we start over with a new stream, starting with
437+
/// an [`Event::Restarted`]. The internals mechanics of recovery should be considered an implementation detail.
438+
#[allow(clippy::module_name_repetitions)]
439+
pub fn metadata_watcher<K: Resource + Clone + DeserializeOwned + Debug + Send + 'static>(
440+
api: Api<K>,
441+
list_params: ListParams,
442+
) -> impl Stream<Item = Result<Event<PartialObjectMeta>>> + Send {
443+
futures::stream::unfold(
444+
(api, list_params, State::Empty),
445+
|(api, list_params, state)| async {
446+
let (event, state) = step(&MetaOnly { api: &api }, &list_params, state).await;
307447
Some((event, (api, list_params, state)))
308448
},
309449
)

0 commit comments

Comments
 (0)