Skip to content

Commit f9c572c

Browse files
committed
feat: added collector to collect users statements metrics
1 parent b067e54 commit f9c572c

File tree

7 files changed

+166
-61
lines changed

7 files changed

+166
-61
lines changed

Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,13 @@ prometheus = "0.13.3"
2424
actix-web = "4.4.0"
2525

2626
[[bin]]
27-
doc = false
2827
name = "controller"
2928
path = "src/main.rs"
3029

3130
[[bin]]
32-
doc = false
3331
name = "crdgen"
3432
path = "src/crdgen.rs"
3533

3634
[lib]
37-
name = "controller"
3835
path = "src/lib.rs"
3936

src/controller.rs

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
use crate::{postgres::Postgres, Config, Error, Metrics, Result};
21
use futures::StreamExt;
32
use kube::{
43
api::{Patch, PatchParams},
@@ -17,6 +16,8 @@ use serde_json::json;
1716
use std::{sync::Arc, time::Duration};
1817
use tracing::error;
1918

19+
use crate::{postgres::Postgres, Config, Error, Metrics, State};
20+
2021
pub static DB_SYNC_PORT_FINALIZER: &str = "dbsyncports.demeter.run";
2122

2223
struct Context {
@@ -33,15 +34,6 @@ impl Context {
3334
}
3435
}
3536
}
36-
#[derive(Clone, Default)]
37-
pub struct State {
38-
registry: prometheus::Registry,
39-
}
40-
impl State {
41-
pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
42-
self.registry.gather()
43-
}
44-
}
4537

4638
#[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)]
4739
pub enum Network {
@@ -72,7 +64,7 @@ impl DbSyncPort {
7264
.unwrap_or(false)
7365
}
7466

75-
async fn reconcile(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action> {
67+
async fn reconcile(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
7668
let client = ctx.client.clone();
7769
let ns = self.namespace().unwrap();
7870
let name = self.name_any();
@@ -107,15 +99,15 @@ impl DbSyncPort {
10799
Ok(Action::requeue(Duration::from_secs(5 * 60)))
108100
}
109101

110-
async fn cleanup(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action> {
102+
async fn cleanup(&self, ctx: Arc<Context>, pg: &mut Postgres) -> Result<Action, Error> {
111103
let username = self.status.as_ref().unwrap().username.clone();
112104
pg.user_disable(&username).await?;
113105
ctx.metrics.count_user_deactivated(&username);
114106
Ok(Action::await_change())
115107
}
116108
}
117109

118-
async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action> {
110+
async fn reconcile(crd: Arc<DbSyncPort>, ctx: Arc<Context>) -> Result<Action, Error> {
119111
let url = match crd.spec.network {
120112
Network::Mainnet => &ctx.config.db_url_mainnet,
121113
Network::Preprod => &ctx.config.db_url_preprod,
@@ -143,11 +135,11 @@ fn error_policy(crd: Arc<DbSyncPort>, err: &Error, ctx: Arc<Context>) -> Action
143135
Action::requeue(Duration::from_secs(5))
144136
}
145137

146-
pub async fn run(state: State, config: Config) -> Result<(), Error> {
138+
pub async fn run(state: Arc<State>, config: Config) -> Result<(), Error> {
147139
let client = Client::try_default().await?;
148140
let crds = Api::<DbSyncPort>::all(client.clone());
149-
let metrics = Metrics::default().register(&state.registry).unwrap();
150-
let ctx = Context::new(client, metrics, config);
141+
142+
let ctx = Context::new(client, state.metrics.clone(), config);
151143

152144
Controller::new(crds, WatcherConfig::default().any_semantic())
153145
.shutdown_on_signal()

src/crdgen.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use ext_cardano_dbsync::controller;
12
use kube::CustomResourceExt;
23

34
fn main() {

src/lib.rs

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
use std::time::Duration;
2+
3+
use prometheus::Registry;
14
use thiserror::Error;
25

36
#[derive(Error, Debug)]
@@ -10,51 +13,94 @@ pub enum Error {
1013

1114
#[error("Finalizer Error: {0}")]
1215
FinalizerError(#[source] Box<kube::runtime::finalizer::Error<Error>>),
16+
17+
#[error("Env Error: {0}")]
18+
EnvError(#[source] std::env::VarError),
19+
20+
#[error("Prometheus Error: {0}")]
21+
PrometheusError(#[source] prometheus::Error),
22+
23+
#[error("Parse Int Error: {0}")]
24+
ParseIntError(#[source] std::num::ParseIntError),
1325
}
1426

1527
impl Error {
1628
pub fn metric_label(&self) -> String {
1729
format!("{self:?}").to_lowercase()
1830
}
1931
}
20-
21-
pub type Result<T, E = Error> = std::result::Result<T, E>;
22-
2332
impl From<tokio_postgres::Error> for Error {
2433
fn from(value: tokio_postgres::Error) -> Self {
2534
Error::PgError(value)
2635
}
2736
}
28-
2937
impl From<kube::Error> for Error {
3038
fn from(value: kube::Error) -> Self {
3139
Error::KubeError(value)
3240
}
3341
}
42+
impl From<std::env::VarError> for Error {
43+
fn from(value: std::env::VarError) -> Self {
44+
Error::EnvError(value)
45+
}
46+
}
47+
impl From<prometheus::Error> for Error {
48+
fn from(value: prometheus::Error) -> Self {
49+
Error::PrometheusError(value)
50+
}
51+
}
52+
impl From<std::num::ParseIntError> for Error {
53+
fn from(value: std::num::ParseIntError) -> Self {
54+
Error::ParseIntError(value)
55+
}
56+
}
57+
58+
#[derive(Clone, Default)]
59+
pub struct State {
60+
registry: Registry,
61+
pub metrics: Metrics,
62+
}
63+
impl State {
64+
pub fn new() -> Self {
65+
let registry = Registry::default();
66+
let metrics = Metrics::default().register(&registry).unwrap();
67+
Self { registry, metrics }
68+
}
69+
70+
pub fn metrics_collected(&self) -> Vec<prometheus::proto::MetricFamily> {
71+
self.registry.gather()
72+
}
73+
}
3474

75+
#[derive(Clone)]
3576
pub struct Config {
3677
pub db_url_mainnet: String,
3778
pub db_url_preprod: String,
3879
pub db_url_preview: String,
80+
pub metrics_delay: Duration,
81+
pub dcu_base: u64,
3982
}
4083
impl Config {
41-
pub fn new() -> Self {
42-
Self {
43-
db_url_mainnet: std::env::var("DB_URL_MAINNET").expect("DB_URL_MAINNET must be set"),
44-
db_url_preprod: std::env::var("DB_URL_PREPROD").expect("DB_URL_PREPROD must be set"),
45-
db_url_preview: std::env::var("DB_URL_PREVIEW").expect("DB_URL_PREVIEW must be set"),
46-
}
47-
}
48-
}
49-
impl Default for Config {
50-
fn default() -> Self {
51-
Self::new()
84+
pub fn try_new() -> Result<Self, Error> {
85+
let db_url_mainnet = std::env::var("DB_URL_MAINNET")?;
86+
let db_url_preprod = std::env::var("DB_URL_PREPROD")?;
87+
let db_url_preview = std::env::var("DB_URL_PREVIEW")?;
88+
let metrics_delay = Duration::from_secs(std::env::var("METRICS_DELAY")?.parse::<u64>()?);
89+
let dcu_base = std::env::var("DCU_BASE")?.parse::<u64>()?;
90+
91+
Ok(Self {
92+
db_url_mainnet,
93+
db_url_preprod,
94+
db_url_preview,
95+
metrics_delay,
96+
dcu_base,
97+
})
5298
}
5399
}
54100

55101
pub mod controller;
102+
pub mod metrics;
56103
pub mod postgres;
57-
pub use crate::controller::*;
58104

59-
mod metrics;
60-
pub use metrics::Metrics;
105+
pub use controller::*;
106+
pub use metrics::*;

src/main.rs

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
1-
use std::io;
2-
31
use actix_web::{
42
get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder,
53
};
6-
use controller::{Config, State};
74
use dotenv::dotenv;
85
use prometheus::{Encoder, TextEncoder};
6+
use std::{io, sync::Arc};
7+
use tracing::error;
8+
9+
use ext_cardano_dbsync::{controller, metrics as metrics_collector, Config, State};
910

1011
#[get("/metrics")]
11-
async fn metrics(c: Data<State>, _req: HttpRequest) -> impl Responder {
12-
let metrics = c.metrics();
12+
async fn metrics(c: Data<Arc<State>>, _req: HttpRequest) -> impl Responder {
13+
let metrics = c.metrics_collected();
1314
let encoder = TextEncoder::new();
1415
let mut buffer = vec![];
1516
encoder.encode(&metrics, &mut buffer).unwrap();
@@ -25,10 +26,11 @@ async fn health(_: HttpRequest) -> impl Responder {
2526
async fn main() -> io::Result<()> {
2627
dotenv().ok();
2728

28-
let state = State::default();
29-
let config = Config::default();
29+
let state = Arc::new(State::new());
30+
let config = Config::try_new().unwrap();
3031

31-
let controller = controller::run(state.clone(), config);
32+
let controller = controller::run(state.clone(), config.clone());
33+
let metrics_collector = metrics_collector::run_metrics_collector(state.clone(), config.clone());
3234

3335
let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into());
3436

@@ -41,7 +43,11 @@ async fn main() -> io::Result<()> {
4143
})
4244
.bind(addr)?;
4345

44-
tokio::join!(controller, server.run()).1?;
46+
let result = tokio::join!(controller, metrics_collector, server.run()).1;
47+
if let Err(err) = result {
48+
error!("{err}");
49+
std::process::exit(1)
50+
}
4551

4652
Ok(())
4753
}

src/metrics.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,12 @@
1-
use crate::{DbSyncPort, Error};
21
use kube::ResourceExt;
32
use prometheus::{opts, IntCounterVec, Registry};
3+
use std::{sync::Arc, thread::sleep};
4+
use tracing::error;
5+
6+
use crate::{
7+
postgres::Postgres,
8+
Config, DbSyncPort, Error, State,
9+
};
410

511
#[derive(Clone)]
612
pub struct Metrics {
@@ -68,3 +74,39 @@ impl Metrics {
6874
self.users_deactivated.with_label_values(&[username]).inc();
6975
}
7076
}
77+
78+
pub async fn run_metrics_collector(state: Arc<State>, config: Config) -> Result<(), Error> {
79+
let db_urls = &vec![
80+
config.db_url_mainnet,
81+
config.db_url_preprod,
82+
config.db_url_preview,
83+
];
84+
85+
loop {
86+
for url in db_urls {
87+
let postgres_result = Postgres::new(url).await;
88+
if let Err(err) = postgres_result {
89+
error!("Error to connect postgres: {err}");
90+
continue;
91+
}
92+
let postgres = postgres_result.unwrap();
93+
94+
let user_statements_result = postgres.user_metrics().await;
95+
if let Err(err) = user_statements_result {
96+
error!("Error get user statements: {err}");
97+
continue;
98+
}
99+
100+
let user_statements = user_statements_result.unwrap();
101+
if user_statements.is_none() {
102+
continue;
103+
}
104+
105+
let user_statements = user_statements.unwrap();
106+
107+
// TODO: calculate dcu
108+
}
109+
110+
sleep(config.metrics_delay)
111+
}
112+
}

src/postgres.rs

Lines changed: 35 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,7 @@
1-
use tokio_postgres::{Client, NoTls};
1+
use tokio_postgres::{Client, NoTls, Row};
22

33
use crate::Error;
44

5-
// const QUERY_GET_METRICS: &str = "
6-
// SELECT
7-
// usename,
8-
// SUM(calls) AS total_queries,
9-
// SUM(total_exec_time) AS total_exec_time
10-
// FROM
11-
// pg_stat_statements
12-
// inner join
13-
// pg_catalog.pg_user on pg_catalog.pg_user.usesysid = userid
14-
// GROUP BY
15-
// usename;";
16-
175
pub struct Postgres {
186
client: Client,
197
}
@@ -97,6 +85,39 @@ impl Postgres {
9785

9886
Ok(result.is_some())
9987
}
88+
89+
pub async fn user_metrics(&self) -> Result<Option<Vec<UserStatements>>, Error> {
90+
let query_metrics = "SELECT
91+
usename,
92+
SUM(calls) AS total_queries,
93+
SUM(total_exec_time) AS total_exec_time
94+
FROM
95+
pg_stat_statements
96+
inner join
97+
pg_catalog.pg_user on pg_catalog.pg_user.usesysid = userid
98+
GROUP BY
99+
usename;";
100+
101+
let stmt = self.client.prepare(&query_metrics).await?;
102+
let result = self.client.query(&stmt, &[]).await?;
103+
104+
Ok(result
105+
.is_empty()
106+
.then_some(result.iter().map(|r| r.into()).collect()))
107+
}
100108
}
101109

102-
pub async fn get_metrics() {}
110+
pub struct UserStatements {
111+
pub usename: String,
112+
pub total_queries: u32,
113+
pub total_exec_time: u32,
114+
}
115+
impl From<&Row> for UserStatements {
116+
fn from(row: &Row) -> Self {
117+
Self {
118+
usename: row.get("usename"),
119+
total_queries: row.get("total_queries"),
120+
total_exec_time: row.get("total_exec_time"),
121+
}
122+
}
123+
}

0 commit comments

Comments
 (0)