Skip to content

Commit 65994a3

Browse files
committed
Incrementally update sessionspaces current sessions
1 parent 05192af commit 65994a3

File tree

2 files changed

+60
-61
lines changed

2 files changed

+60
-61
lines changed

sessionspaces/src/main.rs

Lines changed: 60 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ mod resources;
1212

1313
use crate::permissionables::Sessions;
1414
use clap::Parser;
15-
use ldap3::LdapConnAsync;
16-
use resources::update_resources;
17-
use sqlx::mysql::MySqlPoolOptions;
15+
use ldap3::{Ldap, LdapConnAsync};
16+
use resources::{create_configmap, create_namespace, delete_namespace};
17+
use sqlx::{mysql::MySqlPoolOptions, MySqlPool};
18+
use std::collections::BTreeSet;
1819
use tokio::time::{sleep_until, Instant};
19-
use tracing::warn;
20+
use tracing::{info, warn};
2021
use url::Url;
2122

2223
/// SessionSpaces periodically polls the authorization bundle server and applies templates to the cluster accordingly
@@ -31,9 +32,6 @@ struct Cli {
3132
/// The period to wait after a succesful bundle server request
3233
#[clap(long, env, default_value = "60s")]
3334
update_interval: humantime::Duration,
34-
/// The period to wait after an unsuccesful bundle server request
35-
#[arg(long, env, default_value = "10s")]
36-
retry_interval: humantime::Duration,
3735
/// The [`tracing::Level`] to log at
3836
#[arg(long, env="LOG_LEVEL", default_value_t=tracing::Level::INFO)]
3937
log_level: tracing::Level,
@@ -61,17 +59,64 @@ async fn main() {
6159
let mut request_at = Instant::now();
6260
loop {
6361
sleep_until(request_at).await;
64-
if let Ok(new_sessions) = async {
65-
let new_sessions = Sessions::fetch(&ispyb_pool, &mut ldap_connection).await?;
66-
update_resources(k8s_client.clone(), &current_sessions, &new_sessions).await?;
67-
Ok::<_, anyhow::Error>(new_sessions)
68-
}
62+
if let Ok(()) = perform_updates(
63+
&mut current_sessions,
64+
&ispyb_pool,
65+
&mut ldap_connection,
66+
&k8s_client,
67+
)
6968
.await
7069
{
71-
current_sessions = new_sessions;
7270
request_at = request_at.checked_add(*args.update_interval).unwrap();
73-
} else {
74-
request_at = request_at.checked_add(*args.retry_interval).unwrap();
7571
};
7672
}
7773
}
74+
75+
/// Fetches new [`Sessions`] and updates k8s resources according to the observed changes.
76+
async fn perform_updates(
77+
current_sessions: &mut Sessions,
78+
ispyb_pool: &MySqlPool,
79+
ldap_connection: &mut Ldap,
80+
k8s_client: &kube::Client,
81+
) -> Result<(), anyhow::Error> {
82+
let mut new_sessions = Sessions::fetch(ispyb_pool, ldap_connection).await?;
83+
84+
let current_session_names = current_sessions.keys().cloned().collect::<BTreeSet<_>>();
85+
let new_session_names = new_sessions.keys().cloned().collect::<BTreeSet<_>>();
86+
let to_update = current_session_names
87+
.union(&new_session_names)
88+
.collect::<BTreeSet<_>>();
89+
90+
info!("Updating {} SessionSpaces", to_update.len());
91+
for namespace in to_update.into_iter() {
92+
match (
93+
current_sessions.get(namespace),
94+
new_sessions.remove(namespace),
95+
) {
96+
(Some(_), None) => {
97+
info!("Deleting Namespace: {}", namespace);
98+
delete_namespace(namespace, k8s_client.clone()).await?;
99+
current_sessions.remove(namespace);
100+
}
101+
(None, Some(new_session)) => {
102+
info!(
103+
"Creating Namespace, {}, with Config: {}",
104+
namespace, new_session
105+
);
106+
create_namespace(namespace.clone(), k8s_client.clone()).await?;
107+
create_configmap(namespace, new_session.clone(), k8s_client.clone()).await?;
108+
current_sessions.insert(namespace.clone(), new_session);
109+
}
110+
(Some(current_session), Some(new_session)) if current_session != &new_session => {
111+
info!(
112+
"Updating Namespace, {}, with Config: {}",
113+
namespace, new_session
114+
);
115+
create_configmap(namespace, new_session.clone(), k8s_client.clone()).await?;
116+
current_sessions.insert(namespace.clone(), new_session);
117+
}
118+
(_, _) => {}
119+
}
120+
}
121+
Ok(())
122+
}

sessionspaces/src/resources/mod.rs

Lines changed: 0 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -12,49 +12,3 @@ pub use self::{
1212
config_maps::create_configmap,
1313
namespace::{create_namespace, delete_namespace},
1414
};
15-
use crate::permissionables::Sessions;
16-
use std::collections::BTreeSet;
17-
use tracing::{info, instrument};
18-
19-
/// Requests a new bundle from the bundle server and performs templating accordingly
20-
#[instrument(skip_all, err(level=tracing::Level::WARN))]
21-
pub async fn update_resources(
22-
k8s_client: kube::Client,
23-
current_sessions: &Sessions,
24-
new_sessions: &Sessions,
25-
) -> std::result::Result<(), anyhow::Error> {
26-
let current_session_names = current_sessions.keys().cloned().collect::<BTreeSet<_>>();
27-
let session_names = new_sessions.keys().cloned().collect::<BTreeSet<_>>();
28-
let to_update = current_session_names
29-
.union(&session_names)
30-
.collect::<BTreeSet<_>>();
31-
32-
info!("Updating {} SessionSpaces", to_update.len());
33-
for namespace in to_update.into_iter() {
34-
let session_info = new_sessions.get(namespace);
35-
let current_sesssion_info = current_sessions.get(namespace);
36-
match (current_sesssion_info, session_info) {
37-
(Some(_), None) => {
38-
info!("Deleting Namespace: {}", namespace);
39-
delete_namespace(namespace, k8s_client.clone()).await?;
40-
}
41-
(None, Some(session_info)) => {
42-
info!(
43-
"Creating Namespace, {}, with Config: {}",
44-
namespace, session_info
45-
);
46-
create_namespace(namespace.clone(), k8s_client.clone()).await?;
47-
create_configmap(namespace, session_info.clone(), k8s_client.clone()).await?;
48-
}
49-
(Some(current_info), Some(session_info)) if current_info != session_info => {
50-
info!(
51-
"Updating Namespace, {}, with Config: {}",
52-
namespace, session_info
53-
);
54-
create_configmap(namespace, session_info.clone(), k8s_client.clone()).await?;
55-
}
56-
(_, _) => {}
57-
}
58-
}
59-
Ok(())
60-
}

0 commit comments

Comments
 (0)