1- use futures:: { future, StreamExt } ;
1+ use futures:: { future, stream , StreamExt } ;
22use k8s_openapi:: api:: {
33 apps:: v1:: Deployment ,
44 core:: v1:: { ConfigMap , Secret } ,
55} ;
66use kube:: {
7- runtime:: {
8- reflector,
9- reflector:: { ObjectRef , Store } ,
10- watcher, WatchStreamExt ,
11- } ,
12- Api , Client ,
7+ api:: { ApiResource , DynamicObject , GroupVersionKind } ,
8+ core:: TypedResource ,
9+ runtime:: { reflector:: store:: CacheWriter , watcher, WatchStreamExt } ,
10+ Api , Client , Resource ,
1311} ;
12+ use parking_lot:: RwLock ;
13+ use serde:: de:: DeserializeOwned ;
1414use std:: sync:: Arc ;
1515use tracing:: * ;
1616
17- // This does not work because Resource trait is not dyn safe.
18- /*
19- use std::any::TypeId;
2017use std:: collections:: HashMap ;
21- use k8s_openapi::NamespaceResourceScope;
22- use kube::api::{Resource, ResourceExt};
23- struct MultiStore {
24- stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
25- }
26- impl MultiStore {
27- fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
28- let oref = ObjectRef::<K>::new(name).within(ns);
29- if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
30- store.get(oref)
31- } else {
32- None
33- }
34- }
35- }*/
3618
37- // explicit store can work
38- struct MultiStore {
39- deploys : Store < Deployment > ,
40- cms : Store < ConfigMap > ,
41- secs : Store < Secret > ,
19+ type Cache = Arc < RwLock < HashMap < LookupKey , Arc < DynamicObject > > > > ;
20+
21+ #[ derive( Default , Clone , Hash , PartialEq , Eq , Debug ) ]
22+ struct LookupKey {
23+ gvk : GroupVersionKind ,
24+ name : Option < String > ,
25+ namespace : Option < String > ,
4226}
43- // but using generics to help out won't because the K needs to be concretised
44- /*
45- impl MultiStore {
46- fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
47- let oref = ObjectRef::<K>::new(name).within(ns);
48- let kind = K::kind(&()).to_owned();
49- match kind.as_ref() {
50- "Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
51- "ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
52- "Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
53- _ => None,
27+
28+ impl LookupKey {
29+ fn new < R : TypedResource > ( resource : & R ) -> LookupKey {
30+ let meta = resource. meta ( ) ;
31+ LookupKey {
32+ gvk : resource. gvk ( ) ,
33+ name : meta. name . clone ( ) ,
34+ namespace : meta. namespace . clone ( ) ,
5435 }
55- None
5636 }
5737}
58- */
59- // so left with this
6038
61- impl MultiStore {
62- fn get_deploy ( & self , name : & str , ns : & str ) -> Option < Arc < Deployment > > {
63- self . deploys . get ( & ObjectRef :: < Deployment > :: new ( name ) . within ( ns ) )
64- }
39+ # [ derive ( Default , Clone ) ]
40+ struct MultiCache {
41+ store : Cache ,
42+ }
6543
66- fn get_secret ( & self , name : & str , ns : & str ) -> Option < Arc < Secret > > {
67- self . secs . get ( & ObjectRef :: < Secret > :: new ( name) . within ( ns) )
44+ impl MultiCache {
45+ fn get < K : Resource < DynamicType = impl Default > + DeserializeOwned + Clone > (
46+ & self ,
47+ name : & str ,
48+ ns : & str ,
49+ ) -> Option < Arc < K > > {
50+ let obj = self
51+ . store
52+ . read ( )
53+ . get ( & LookupKey {
54+ gvk : K :: gvk ( & Default :: default ( ) ) ,
55+ name : Some ( name. into ( ) ) ,
56+ namespace : if !ns. is_empty ( ) { Some ( ns. into ( ) ) } else { None } ,
57+ } ) ?
58+ . as_ref ( )
59+ . clone ( ) ;
60+ obj. try_parse ( ) . ok ( ) . map ( Arc :: new)
6861 }
62+ }
6963
70- fn get_cm ( & self , name : & str , ns : & str ) -> Option < Arc < ConfigMap > > {
71- self . cms . get ( & ObjectRef :: < ConfigMap > :: new ( name) . within ( ns) )
64+ impl CacheWriter < DynamicObject > for MultiCache {
65+ /// Applies a single watcher event to the store
66+ fn apply_watcher_event ( & mut self , event : & watcher:: Event < DynamicObject > ) {
67+ match event {
68+ watcher:: Event :: Init | watcher:: Event :: InitDone => { }
69+ watcher:: Event :: Delete ( obj) => {
70+ self . store . write ( ) . remove ( & LookupKey :: new ( obj) ) ;
71+ }
72+ watcher:: Event :: InitApply ( obj) | watcher:: Event :: Apply ( obj) => {
73+ self . store
74+ . write ( )
75+ . insert ( LookupKey :: new ( obj) , Arc :: new ( obj. clone ( ) ) ) ;
76+ }
77+ }
7278 }
7379}
7480
@@ -77,60 +83,62 @@ async fn main() -> anyhow::Result<()> {
7783 tracing_subscriber:: fmt:: init ( ) ;
7884 let client = Client :: try_default ( ) . await ?;
7985
80- let deploys: Api < Deployment > = Api :: default_namespaced ( client. clone ( ) ) ;
81- let cms: Api < ConfigMap > = Api :: default_namespaced ( client. clone ( ) ) ;
82- let secret: Api < Secret > = Api :: default_namespaced ( client. clone ( ) ) ;
86+ // multistore
87+ let mut combo_stream = stream:: select_all ( vec ! [ ] ) ;
88+ combo_stream. push (
89+ watcher:: watcher (
90+ Api :: all_with ( client. clone ( ) , & ApiResource :: erase :: < Deployment > ( & ( ) ) ) ,
91+ Default :: default ( ) ,
92+ )
93+ . boxed ( ) ,
94+ ) ;
95+ combo_stream. push (
96+ watcher:: watcher (
97+ Api :: all_with ( client. clone ( ) , & ApiResource :: erase :: < ConfigMap > ( & ( ) ) ) ,
98+ Default :: default ( ) ,
99+ )
100+ . boxed ( ) ,
101+ ) ;
83102
84- let ( dep_reader, dep_writer) = reflector:: store :: < Deployment > ( ) ;
85- let ( cm_reader, cm_writer) = reflector:: store :: < ConfigMap > ( ) ;
86- let ( sec_reader, sec_writer) = reflector:: store :: < Secret > ( ) ;
103+ // // Duplicate streams with narrowed down selection
104+ combo_stream. push (
105+ watcher:: watcher (
106+ Api :: default_namespaced_with ( client. clone ( ) , & ApiResource :: erase :: < Secret > ( & ( ) ) ) ,
107+ Default :: default ( ) ,
108+ )
109+ . boxed ( ) ,
110+ ) ;
111+ combo_stream. push (
112+ watcher:: watcher (
113+ Api :: all_with ( client. clone ( ) , & ApiResource :: erase :: < Secret > ( & ( ) ) ) ,
114+ Default :: default ( ) ,
115+ )
116+ . boxed ( ) ,
117+ ) ;
87118
88- let cfg = watcher:: Config :: default ( ) ;
89- let dep_watcher = watcher ( deploys, cfg. clone ( ) )
90- . reflect ( dep_writer)
91- . applied_objects ( )
92- . for_each ( |_| future:: ready ( ( ) ) ) ;
93- let cm_watcher = watcher ( cms, cfg. clone ( ) )
94- . reflect ( cm_writer)
119+ let multi_writer = MultiCache :: default ( ) ;
120+ let watcher = combo_stream
121+ . reflect ( multi_writer. clone ( ) )
95122 . applied_objects ( )
96123 . for_each ( |_| future:: ready ( ( ) ) ) ;
97- let sec_watcher = watcher ( secret, cfg)
98- . reflect ( sec_writer)
99- . applied_objects ( )
100- . for_each ( |_| future:: ready ( ( ) ) ) ;
101- // poll these forever
102-
103- // multistore
104- let stores = MultiStore {
105- deploys : dep_reader,
106- cms : cm_reader,
107- secs : sec_reader,
108- } ;
109124
110125 // simulate doing stuff with the stores from some other thread
111126 tokio:: spawn ( async move {
112- // Show state every 5 seconds of watching
113- info ! ( "waiting for them to be ready" ) ;
114- stores. deploys . wait_until_ready ( ) . await . unwrap ( ) ;
115- stores. cms . wait_until_ready ( ) . await . unwrap ( ) ;
116- stores. secs . wait_until_ready ( ) . await . unwrap ( ) ;
117- info ! ( "stores initialised" ) ;
118127 // can use helper accessors
119- info ! (
120- "common cm: {:?}" ,
121- stores. get_cm( "kube-root-ca.crt" , "kube-system" ) . unwrap( )
122- ) ;
123128 loop {
124129 tokio:: time:: sleep ( std:: time:: Duration :: from_secs ( 5 ) ) . await ;
130+ info ! ( "cache content: {:?}" , multi_writer. store. read( ) . keys( ) ) ;
131+ info ! (
132+ "common cm: {:?}" ,
133+ multi_writer. get:: <ConfigMap >( "kube-root-ca.crt" , "kube-system" )
134+ ) ;
125135 // access individual sub stores
126- info ! ( "Current deploys count: {}" , stores . deploys . state ( ) . len( ) ) ;
136+ info ! ( "Current objects count: {}" , multi_writer . store . read ( ) . len( ) ) ;
127137 }
128138 } ) ;
129- // info!("long watches starting");
139+ info ! ( "long watches starting" ) ;
130140 tokio:: select! {
131- r = dep_watcher => println!( "dep watcher exit: {r:?}" ) ,
132- r = cm_watcher => println!( "cm watcher exit: {r:?}" ) ,
133- r = sec_watcher => println!( "sec watcher exit: {r:?}" ) ,
141+ r = watcher => println!( "watcher exit: {r:?}" ) ,
134142 }
135143
136144 Ok ( ( ) )
0 commit comments