Skip to content

Commit

Permalink
Implement k8s event to remove auth key inside the ogmios proxy (#26)
Browse files Browse the repository at this point in the history
* chore: adjusted handlers

* chore: added update keys for k8s events
  • Loading branch information
paulobressan authored Feb 28, 2024
1 parent e5f9889 commit 38a0181
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 32 deletions.
49 changes: 34 additions & 15 deletions proxy/src/auth.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::{collections::HashMap, sync::Arc};

use futures_util::TryStreamExt;
use operator::{
Expand All @@ -8,32 +8,24 @@ use operator::{
watcher::{self, Config},
WatchStreamExt,
},
Api, Client,
Api, Client, ResourceExt,
},
OgmiosPort,
};
use tokio::{pin, sync::RwLock};
use tracing::error;

use crate::State;
use crate::{Consumer, State};

pub async fn start(state: Arc<RwLock<State>>) {
let client = Client::try_default()
.await
.expect("failed to create kube client");

let api = Api::<OgmiosPort>::all(client.clone());
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(error = err.to_string(), "error to get crds");
std::process::exit(1);
}
update_auth(state.clone(), api.clone()).await;

for crd in result.unwrap().items.iter() {
state.write().await.add_auth_token(crd);
}

let stream = watcher::watcher(api, Config::default()).applied_objects();
let stream = watcher::watcher(api.clone(), Config::default()).touched_objects();
pin!(stream);

loop {
Expand All @@ -42,8 +34,35 @@ pub async fn start(state: Arc<RwLock<State>>) {
error!(error = err.to_string(), "fail crd auth watcher");
continue;
}
if let Some(crd) = result.unwrap() {
state.write().await.add_auth_token(&crd);

update_auth(state.clone(), api.clone()).await;
}
}

async fn update_auth(state: Arc<RwLock<State>>, api: Api<OgmiosPort>) {
let result = api.list(&ListParams::default()).await;
if let Err(err) = result {
error!(
error = err.to_string(),
"error to get crds while updating auth keys"
);
return;
}

let mut consumers = HashMap::new();
for crd in result.unwrap().items.iter() {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version;
let auth_token = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();

let hash_key = format!("{}.{}.{}", network, version, auth_token);
let consumer = Consumer::new(namespace, port_name);

consumers.insert(hash_key, consumer);
}
}
state.write().await.consumers = consumers;
}
17 changes: 0 additions & 17 deletions proxy/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
use config::Config;
use dotenv::dotenv;
use metrics::Metrics;
use operator::kube::ResourceExt;
use operator::OgmiosPort;
use prometheus::Registry;
use regex::Regex;
use std::collections::HashMap;
Expand Down Expand Up @@ -76,21 +74,6 @@ impl State {
})
}

pub fn add_auth_token(&mut self, crd: &OgmiosPort) {
if crd.status.is_some() {
let network = crd.spec.network.to_string();
let version = crd.spec.version;
let auth_token = crd.status.as_ref().unwrap().auth_token.clone();
let namespace = crd.metadata.namespace.as_ref().unwrap().clone();
let port_name = crd.name_any();

let hash_key = format!("{}.{}.{}", network, version, auth_token);
let consumer = Consumer::new(namespace, port_name);

self.consumers.insert(hash_key, consumer);
}
}

pub fn get_auth_token(&self, network: &str, version: &str, token: &str) -> Option<Consumer> {
let hash_key = format!("{}.{}.{}", network, version, token);
self.consumers.get(&hash_key).cloned()
Expand Down

0 comments on commit 38a0181

Please sign in to comment.