Skip to content

Commit b067e54

Browse files
committed
chore: improved postgres functions
1 parent ac139cc commit b067e54

File tree

2 files changed

+88
-79
lines changed

2 files changed

+88
-79
lines changed

src/controller.rs

Lines changed: 10 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,4 @@
1-
use crate::{
2-
postgres::{self, user_already_exists, user_create, user_disable, user_enable},
3-
Config, Error, Metrics, Result,
4-
};
1+
use crate::{postgres::Postgres, Config, Error, Metrics, Result};
52
use futures::StreamExt;
63
use kube::{
74
api::{Patch, PatchParams},
@@ -75,11 +72,7 @@ impl DbSyncPort {
7572
.unwrap_or(false)
7673
}
7774

78-
async fn reconcile(
79-
&self,
80-
ctx: Arc<Context>,
81-
pg_client: &mut tokio_postgres::Client,
82-
) -> Result<Action> {
75+
async fn reconcile(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action> {
8376
let client = ctx.client.clone();
8477
let ns = self.namespace().unwrap();
8578
let name = self.name_any();
@@ -89,9 +82,9 @@ impl DbSyncPort {
8982
let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
9083

9184
if !self.was_executed() {
92-
match user_already_exists(pg_client, &username).await? {
93-
true => user_enable(pg_client, &username, &password).await?,
94-
false => user_create(pg_client, &username, &password).await?,
85+
match pg.user_already_exists(&username).await? {
86+
true => pg.user_enable(&username, &password).await?,
87+
false => pg.user_create(&username, &password).await?,
9588
};
9689

9790
let new_status = Patch::Apply(json!({
@@ -114,13 +107,9 @@ impl DbSyncPort {
114107
Ok(Action::requeue(Duration::from_secs(5 * 60)))
115108
}
116109

117-
async fn cleanup(
118-
&self,
119-
ctx: Arc<Context>,
120-
pg_client: &mut tokio_postgres::Client,
121-
) -> Result<Action> {
110+
async fn cleanup(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action> {
122111
let username = self.status.as_ref().unwrap().username.clone();
123-
user_disable(pg_client, &username).await?;
112+
pg.user_disable(&username).await?;
124113
ctx.metrics.count_user_deactivated(&username);
125114
Ok(Action::await_change())
126115
}
@@ -136,12 +125,12 @@ async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action> {
136125
let ns = crd.namespace().unwrap();
137126
let crds: Api<DbSyncPort> = Api::namespaced(ctx.client.clone(), &ns);
138127

139-
let mut pg_client = postgres::connect(url).await?;
128+
let mut postgres = Postgres::new(url).await?;
140129

141130
finalizer(&crds, DB_SYNC_PORT_FINALIZER, crd, |event| async {
142131
match event {
143-
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut pg_client).await,
144-
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut pg_client).await,
132+
Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut postgres).await,
133+
Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut postgres).await,
145134
}
146135
})
147136
.await

src/postgres.rs

Lines changed: 78 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2,81 +2,101 @@ use tokio_postgres::{Client, NoTls};
22

33
use crate::Error;
44

5-
pub async fn connect(url: &str) -> Result<Client, Error> {
6-
let (client, connection) = tokio_postgres::connect(url, NoTls).await?;
7-
8-
tokio::spawn(async move {
9-
if let Err(e) = connection.await {
10-
eprintln!("connection error: {}", e);
11-
}
12-
});
13-
14-
Ok(client)
5+
// const QUERY_GET_METRICS: &str = "
6+
// SELECT
7+
// usename,
8+
// SUM(calls) AS total_queries,
9+
// SUM(total_exec_time) AS total_exec_time
10+
// FROM
11+
// pg_stat_statements
12+
// inner join
13+
// pg_catalog.pg_user on pg_catalog.pg_user.usesysid = userid
14+
// GROUP BY
15+
// usename;";
16+
17+
pub struct Postgres {
18+
client: Client,
1519
}
1620

17-
pub async fn user_create(client: &mut Client, username: &str, password: &str) -> Result<(), Error> {
18-
let query_create_user = format!("create user \"{username}\" with password '{password}';");
19-
let query_create_role =
20-
format!("grant select on all tables in schema public to \"{username}\";");
21+
impl Postgres {
22+
pub async fn new(url: &str) -> Result<Self, Error> {
23+
let (client, connection) = tokio_postgres::connect(url, NoTls).await?;
2124

22-
let tx = client.transaction().await?;
25+
tokio::spawn(async move {
26+
if let Err(e) = connection.await {
27+
eprintln!("connection error: {}", e);
28+
}
29+
});
2330

24-
let user_stmt = tx.prepare(&query_create_user).await?;
25-
let user_result = tx.execute(&user_stmt, &[]).await;
26-
if let Err(err) = user_result {
27-
tx.rollback().await?;
28-
return Err(Error::PgError(err));
31+
Ok(Self { client })
2932
}
3033

31-
let role_stmt = tx.prepare(&query_create_role).await?;
32-
let role_result = tx.execute(&role_stmt, &[]).await;
33-
if let Err(err) = role_result {
34-
tx.rollback().await?;
35-
return Err(Error::PgError(err));
36-
}
34+
pub async fn user_create(&mut self, username: &str, password: &str) -> Result<(), Error> {
35+
let query_create_user = format!("create user \"{username}\" with password '{password}';");
36+
let query_create_role =
37+
format!("grant select on all tables in schema public to \"{username}\";");
3738

38-
tx.commit().await?;
39-
Ok(())
40-
}
39+
let tx = self.client.transaction().await?;
4140

42-
pub async fn user_disable(client: &mut Client, username: &str) -> Result<(), Error> {
43-
let query_revoke_login = format!("alter user \"{username}\" with nologin;");
41+
let user_stmt = tx.prepare(&query_create_user).await?;
42+
let user_result = tx.execute(&user_stmt, &[]).await;
43+
if let Err(err) = user_result {
44+
tx.rollback().await?;
45+
return Err(Error::PgError(err));
46+
}
4447

45-
let revoke_stmt = client.prepare(&query_revoke_login).await?;
46-
client.execute(&revoke_stmt, &[]).await?;
48+
let role_stmt = tx.prepare(&query_create_role).await?;
49+
let role_result = tx.execute(&role_stmt, &[]).await;
50+
if let Err(err) = role_result {
51+
tx.rollback().await?;
52+
return Err(Error::PgError(err));
53+
}
4754

48-
Ok(())
49-
}
55+
tx.commit().await?;
56+
Ok(())
57+
}
5058

51-
pub async fn user_enable(client: &mut Client, username: &str, password: &str) -> Result<(), Error> {
52-
let query_grant_login = format!("alter user \"{username}\" with login;");
53-
let query_alter_password = format!("alter user \"{username}\" with password '{password}';");
59+
pub async fn user_disable(&self, username: &str) -> Result<(), Error> {
60+
let query_revoke_login = format!("alter user \"{username}\" with nologin;");
5461

55-
let tx = client.transaction().await?;
62+
let revoke_stmt = self.client.prepare(&query_revoke_login).await?;
63+
self.client.execute(&revoke_stmt, &[]).await?;
5664

57-
let login_stmt = tx.prepare(&query_grant_login).await?;
58-
let login_result = tx.execute(&login_stmt, &[]).await;
59-
if let Err(err) = login_result {
60-
tx.rollback().await?;
61-
return Err(Error::PgError(err));
65+
Ok(())
6266
}
6367

64-
let alter_stmt = tx.prepare(&query_alter_password).await?;
65-
let alter_result = tx.execute(&alter_stmt, &[]).await;
66-
if let Err(err) = alter_result {
67-
tx.rollback().await?;
68-
return Err(Error::PgError(err));
69-
}
68+
pub async fn user_enable(&mut self, username: &str, password: &str) -> Result<(), Error> {
69+
let query_grant_login = format!("alter user \"{username}\" with login;");
70+
let query_alter_password = format!("alter user \"{username}\" with password '{password}';");
7071

71-
tx.commit().await?;
72-
Ok(())
73-
}
72+
let tx = self.client.transaction().await?;
7473

75-
pub async fn user_already_exists(client: &mut Client, username: &str) -> Result<bool, Error> {
76-
let query = "select rolname from pg_roles where rolname = $1;";
74+
let login_stmt = tx.prepare(&query_grant_login).await?;
75+
let login_result = tx.execute(&login_stmt, &[]).await;
76+
if let Err(err) = login_result {
77+
tx.rollback().await?;
78+
return Err(Error::PgError(err));
79+
}
7780

78-
let user_stmt = client.prepare(query).await?;
79-
let result = client.query_opt(&user_stmt, &[&username]).await?;
81+
let alter_stmt = tx.prepare(&query_alter_password).await?;
82+
let alter_result = tx.execute(&alter_stmt, &[]).await;
83+
if let Err(err) = alter_result {
84+
tx.rollback().await?;
85+
return Err(Error::PgError(err));
86+
}
87+
88+
tx.commit().await?;
89+
Ok(())
90+
}
8091

81-
Ok(result.is_some())
92+
pub async fn user_already_exists(&self, username: &str) -> Result<bool, Error> {
93+
let query = "select rolname from pg_roles where rolname = $1;";
94+
95+
let user_stmt = self.client.prepare(query).await?;
96+
let result = self.client.query_opt(&user_stmt, &[&username]).await?;
97+
98+
Ok(result.is_some())
99+
}
82100
}
101+
102+
pub async fn get_metrics() {}

0 commit comments

Comments
 (0)