Skip to content

Commit 5f4bdd4

Browse files
authored
feat: change db username and metrics collector (#14)
* feat: improved drop user and change username to hash * chore: added signal to exit * feat: updated pg connection and metrics collector * chore: changed metrics drop user fn name * chore: adjusted clippy
1 parent e5f3885 commit 5f4bdd4

File tree

9 files changed

+382
-229
lines changed

9 files changed

+382
-229
lines changed

operator/Cargo.lock

Lines changed: 62 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

operator/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,10 @@ tracing-subscriber = "0.3.18"
2222
rand = "0.8.5"
2323
prometheus = "0.13.3"
2424
actix-web = "4.4.0"
25+
bech32 = "0.9.1"
26+
sha3 = "0.10.8"
27+
lazy_static = "1.4.0"
28+
deadpool-postgres = "0.12.1"
2529

2630
[[bin]]
2731
name = "controller"

operator/src/config.rs

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use lazy_static::lazy_static;
2+
use std::{env, time::Duration};
3+
4+
lazy_static! {
5+
static ref CONTROLLER_CONFIG: Config = Config::from_env();
6+
}
7+
8+
pub fn get_config() -> &'static Config {
9+
&CONTROLLER_CONFIG
10+
}
11+
12+
#[derive(Debug, Clone)]
13+
pub struct Config {
14+
pub db_url_mainnet: String,
15+
pub db_url_preprod: String,
16+
pub db_url_preview: String,
17+
18+
pub dcu_per_second_mainnet: f64,
19+
pub dcu_per_second_preprod: f64,
20+
pub dcu_per_second_preview: f64,
21+
22+
pub metrics_delay: Duration,
23+
}
24+
25+
impl Config {
26+
pub fn from_env() -> Self {
27+
let db_url_mainnet = env::var("DB_URL_MAINNET").expect("DB_URL_MAINNET must be set");
28+
let db_url_preprod = env::var("DB_URL_PREPROD").expect("DB_URL_PREPROD must be set");
29+
let db_url_preview = env::var("DB_URL_PREVIEW").expect("DB_URL_PREVIEW must be set");
30+
31+
let metrics_delay = Duration::from_secs(
32+
env::var("METRICS_DELAY")
33+
.expect("METRICS_DELAY must be set")
34+
.parse::<u64>()
35+
.expect("METRICS_DELAY must be a number"),
36+
);
37+
38+
let dcu_per_second_mainnet = env::var("DCU_PER_SECOND_MAINNET")
39+
.expect("DCU_PER_SECOND_MAINNET must be set")
40+
.parse::<f64>()
41+
.expect("DCU_PER_SECOND_MAINNET must be a number");
42+
let dcu_per_second_preprod = env::var("DCU_PER_SECOND_PREPROD")
43+
.expect("DCU_PER_SECOND_PREPROD must be set")
44+
.parse::<f64>()
45+
.expect("DCU_PER_SECOND_PREPROD must be a number");
46+
let dcu_per_second_preview = env::var("DCU_PER_SECOND_PREVIEW")
47+
.expect("DCU_PER_SECOND_PREVIEW must be set")
48+
.parse::<f64>()
49+
.expect("DCU_PER_SECOND_PREVIEW must be a number");
50+
51+
Self {
52+
db_url_mainnet,
53+
db_url_preprod,
54+
db_url_preview,
55+
metrics_delay,
56+
dcu_per_second_mainnet,
57+
dcu_per_second_preprod,
58+
dcu_per_second_preview,
59+
}
60+
}
61+
}

operator/src/controller.rs

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bech32::ToBase32;
12
use futures::StreamExt;
23
use kube::{
34
api::{Patch, PatchParams},
@@ -13,28 +14,14 @@ use rand::distributions::{Alphanumeric, DistString};
1314
use schemars::JsonSchema;
1415
use serde::{Deserialize, Serialize};
1516
use serde_json::json;
17+
use sha3::{Digest, Sha3_256};
1618
use std::{sync::Arc, time::Duration};
1719
use tracing::error;
1820

19-
use crate::{postgres::Postgres, Config, Error, Metrics, Network, State};
21+
use crate::{postgres::Postgres, Error, Network, State};
2022

2123
pub static DB_SYNC_PORT_FINALIZER: &str = "dbsyncports.demeter.run";
2224

23-
struct Context {
24-
pub client: Client,
25-
pub metrics: Metrics,
26-
pub config: Config,
27-
}
28-
impl Context {
29-
pub fn new(client: Client, metrics: Metrics, config: Config) -> Self {
30-
Self {
31-
client,
32-
metrics,
33-
config,
34-
}
35-
}
36-
}
37-
3825
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
3926
#[kube(
4027
kind = "DbSyncPort",
@@ -64,22 +51,19 @@ impl DbSyncPort {
6451
.unwrap_or(false)
6552
}
6653

67-
async fn reconcile(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
68-
let client = ctx.client.clone();
54+
async fn reconcile(&self, state: Arc<State>, pg: &Postgres) -> Result<Action, Error> {
55+
let client = state.kube_client.clone();
6956
let ns = self.namespace().unwrap();
7057
let name = self.name_any();
7158
let crds: Api<DbSyncPort> = Api::namespaced(client, &ns);
7259

73-
let username = format!("{name}.{ns}");
60+
let username = gen_username_hash(&format!("{name}.{ns}")).await?;
7461
let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
7562

7663
if !self.was_executed() {
77-
match pg.user_already_exists(&username).await? {
78-
true => pg.user_enable(&username, &password).await?,
79-
false => pg.user_create(&username, &password).await?,
80-
};
64+
pg.create_user(&username, &password).await?;
8165

82-
let new_status = Patch::Apply(json!({
66+
let status = Patch::Apply(json!({
8367
"apiVersion": "demeter.run/v1alpha1",
8468
"kind": "DbSyncPort",
8569
"status": DbSyncPortStatus {
@@ -89,61 +73,77 @@ impl DbSyncPort {
8973
}));
9074

9175
let ps = PatchParams::apply("cntrlr").force();
92-
crds.patch_status(&name, &ps, &new_status)
76+
crds.patch_status(&name, &ps, &status)
9377
.await
9478
.map_err(Error::KubeError)?;
9579

96-
ctx.metrics.count_user_created(&username);
80+
state.metrics.count_user_created(&ns, &self.spec.network);
9781
};
9882

99-
Ok(Action::requeue(Duration::from_secs(5 * 60)))
83+
Ok(Action::await_change())
10084
}
10185

102-
async fn cleanup(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
103-
let username = self.status.as_ref().unwrap().username.clone();
104-
pg.user_disable(&username).await?;
105-
ctx.metrics.count_user_deactivated(&username);
86+
async fn cleanup(&self, state: Arc<State>, pg: &Postgres) -> Result<Action, Error> {
87+
if self.was_executed() {
88+
let ns = self.namespace().unwrap();
89+
let username = self.status.as_ref().unwrap().username.clone();
90+
pg.drop_user(&username).await?;
91+
state
92+
.metrics
93+
.count_user_droped(&ns, &self.spec.network);
94+
}
95+
10696
Ok(Action::await_change())
10797
}
10898
}
10999

110-
async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action, Error> {
111-
let url = match crd.spec.network {
112-
Network::Mainnet => &ctx.config.db_url_mainnet,
113-
Network::Preprod => &ctx.config.db_url_preprod,
114-
Network::Preview => &ctx.config.db_url_preview,
115-
};
116-
100+
async fn reconcile(crd: Arc<DbSyncPort>, state: Arc<State>) -> Result<Action, Error> {
117101
let ns = crd.namespace().unwrap();
118-
let crds: Api<DbSyncPort> = Api::namespaced(ctx.client.clone(), &ns);
102+
let crds: Api<DbSyncPort> = Api::namespaced(state.kube_client.clone(), &ns);
119103

120-
let mut postgres = Postgres::new(url).await?;
104+
let postgres = state.get_pg_by_network(&crd.spec.network);
121105

122106
finalizer(&crds, DB_SYNC_PORT_FINALIZER, crd, |event| async {
123107
match event {
124-
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut postgres).await,
125-
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut postgres).await,
108+
Event::Apply(crd) => crd.reconcile(state.clone(), &postgres).await,
109+
Event::Cleanup(crd) => crd.cleanup(state.clone(), &postgres).await,
126110
}
127111
})
128112
.await
129113
.map_err(|e| Error::FinalizerError(Box::new(e)))
130114
}
131115

132-
fn error_policy(crd: Arc<DbSyncPort>, err: &Error, ctx: Arc<Context>) -> Action {
116+
fn error_policy(crd: Arc<DbSyncPort>, err: &Error, state: Arc<State>) -> Action {
133117
error!("reconcile failed: {:?}", err);
134-
ctx.metrics.reconcile_failure(&crd, err);
118+
state.metrics.reconcile_failure(&crd, err);
135119
Action::requeue(Duration::from_secs(5))
136120
}
137121

138-
pub async fn run(state: Arc<State>, config: Config) -> Result<(), Error> {
122+
async fn gen_username_hash(username: &str) -> Result<String, Error> {
123+
let mut hasher = Sha3_256::new();
124+
hasher.update(username);
125+
let sha256_hash = hasher.finalize();
126+
127+
let bech32_hash = bech32::encode(
128+
"dmtr_dbsync",
129+
sha256_hash.to_base32(),
130+
bech32::Variant::Bech32,
131+
)?;
132+
133+
let bech32_truncated: String = bech32_hash.chars().take(32).collect();
134+
135+
Ok(bech32_truncated)
136+
}
137+
138+
pub async fn run(state: Arc<State>) -> Result<(), Error> {
139139
let client = Client::try_default().await?;
140140
let crds = Api::<DbSyncPort>::all(client.clone());
141141

142-
let ctx = Context::new(client, state.metrics.clone(), config);
142+
// let ctx = Context::new(client, state.clone());
143143

144144
Controller::new(crds, WatcherConfig::default().any_semantic())
145145
.shutdown_on_signal()
146-
.run(reconcile, error_policy, Arc::new(ctx))
146+
.run(reconcile, error_policy, state)
147147
.for_each(|_| futures::future::ready(()))
148148
.await;
149149

0 commit comments

Comments
 (0)