diff --git a/Cargo.toml b/Cargo.toml index 8a90b9f..a6114fb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,16 +24,13 @@ prometheus = "0.13.3" actix-web = "4.4.0" [[bin]] -doc = false name = "controller" path = "src/main.rs" [[bin]] -doc = false name = "crdgen" path = "src/crdgen.rs" [lib] -name = "controller" path = "src/lib.rs" diff --git a/benchmark/.gitignore b/benchmark/.gitignore new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/benchmark/.gitignore @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/benchmark/Dockerfile b/benchmark/Dockerfile new file mode 100644 index 0000000..95c69b6 --- /dev/null +++ b/benchmark/Dockerfile @@ -0,0 +1,5 @@ +FROM alpine:latest + +RUN apk --no-cache add postgresql-contrib + +ENTRYPOINT [ "pgbench" ] \ No newline at end of file diff --git a/benchmark/README.md b/benchmark/README.md new file mode 100644 index 0000000..3fb8651 --- /dev/null +++ b/benchmark/README.md @@ -0,0 +1,58 @@ +# Benchmark + +This folder has a configuration to run benchmarks on Postgres dbsync. Docker is used to access the tool pgbench and bench.sql is some common queries. + +## Compile docker image + +To use the image is necessary to compile + +```bash +docker build -t pgbench . +``` + +## Environment + +The pgbench needs some environment variables to work, then create a file `.env` and set these envs below + +| Key | Value | +| ---------- | ----- | +| PGDATABASE | | +| PGHOST | | +| PGPORT | | +| PGUSER | | +| PGPASSWORD | | + +## Run benchmark + +To run the benchmark it's necessary to run the docker image compiled before, but it's necessary to use some parameters of pgbench. + +```bash +docker run --env-file .env --network host --volume ./bench.sql:/bench.sql pgbench:latest -c 10 -T 5 -n -f /bench.sql +``` + +- `-c` concurrences users +- `-T` execution time(seconds) +- `-n` enable for the custom scripts +- `-f` script path + +more parameters +https://www.postgresql.org/docs/devel/pgbench.html + +### Metrics example + +The return when the command is finished + +``` +transaction type: /bench.sql +scaling factor: 1 +query mode: simple +number of clients: 4 +number of threads: 1 +maximum number of tries: 1 +duration: 10 s +number of transactions actually processed: 16 +number of failed transactions: 0 (0.000%) +latency average = 1562.050 ms +initial connection time = 3951.848 ms +tps = 2.560738 (without initial connection time) +``` \ No newline at end of file diff --git a/benchmark/bench.sql b/benchmark/bench.sql new file mode 100644 index 0000000..4f05eff --- /dev/null +++ b/benchmark/bench.sql @@ -0,0 +1,163 @@ +--- Select latest epoch parameters +select + * + from + epoch_param ep + order by + ep.epoch_no desc + limit 1; + +--- Select assets by policy and apply filters by metadata +with asset as ( + select + ma_tx_mint.tx_id, + encode(multi_asset.name, 'escape') as name, + encode(multi_asset.policy, 'hex') as policy, + multi_asset.fingerprint + from + multi_asset + inner join + ma_tx_mint on + ma_tx_mint.ident = multi_asset.id + where + multi_asset.policy = '\x8f80ebfaf62a8c33ae2adf047572604c74db8bc1daba2b43f9a65635' + ),metadata as ( + select + tx_metadata.tx_id, + tx_metadata.json as metadata + from + tx_metadata + where + tx_metadata.tx_id in (select tx_id from asset) + ) + select + *, + count(*) over () as count + from + asset + inner join metadata on + asset.tx_id = metadata.tx_id + where + jsonb_path_query_array(metadata.metadata,'$.*.*.type') ?| array['Orc'] + order by + asset.name asc + limit 20 offset 0; + +--- Select total assets by policy from stake address +select + sum(ma_tx_out.quantity) as quantity, + encode(multi_asset.policy, 'hex') as policy + from + utxo_view + inner join + stake_address on stake_address.id = utxo_view.stake_address_id + inner join + ma_tx_out on ma_tx_out.tx_out_id = utxo_view.id + inner join + multi_asset on multi_asset.id = ma_tx_out.ident + where + stake_address."view" = 'stake1u90nkx5yw6qkpas3kxa0wcql93axph8fetw20l0j2ntszucgg4rr2' + and + multi_asset.policy = '\xb7761c472eef3b6e0505441efaf940892bb59c01be96070b0a0a89b3' + group by multi_asset.policy; + +--- Select all assets from a stake address +select + ma_tx_out.tx_out_id, + ma_tx_out.quantity, + encode(multi_asset.name, 'escape') as name, + encode(multi_asset.policy, 'hex') as policy, + multi_asset.fingerprint, + tx_metadata.json as metadata, + count(*) over () as count + from + utxo_view + inner join + stake_address on stake_address.id = utxo_view.stake_address_id + inner join + ma_tx_out on ma_tx_out.tx_out_id = utxo_view.id + inner join + multi_asset on multi_asset.id = ma_tx_out.ident + inner join + ma_tx_mint on ma_tx_mint.ident = multi_asset.id + inner join + tx_metadata on tx_metadata.tx_id = ma_tx_mint.tx_id + where + stake_address.view = 'stake1u90nkx5yw6qkpas3kxa0wcql93axph8fetw20l0j2ntszucgg4rr2' + order by + multi_asset.name asc + limit 20 offset 1; + +--- Select all utxos from a stake address +select + tx_out.id, + tx.hash, + tx_out.index, + tx_out.address, + tx_out.value + from + tx_out + left join + tx_in on tx_out.tx_id = tx_in.tx_out_id and tx_out.index::smallint = tx_in.tx_out_index::smallint + left join + tx on tx.id = tx_out.tx_id + left join + block on tx.block_id = block.id + inner join + stake_address on stake_address.id = tx_out.stake_address_id + where + tx_in.tx_in_id is null and + block.epoch_no is not null and + stake_address.view = 'stake1u90nkx5yw6qkpas3kxa0wcql93axph8fetw20l0j2ntszucgg4rr2'; + +--- Select slot number of the most recent block +select + slot_no + from + block + where + block_no is not null + order by + block_no desc + limit 1; + +--- Select current valid pools +select + * + from + pool_update + where + registered_tx_id in (select max(registered_tx_id) from pool_update group by hash_id) + and + not exists( + select + * + from + pool_retire + where + pool_retire.hash_id = pool_update.hash_id + and + pool_retire.retiring_epoch <= (select max (epoch_no) from block) + ); + +--- Select the stake address for a given Shelley address +select + stake_address.id as stake_address_id, + tx_out.address, + stake_address.view as stake_address + from + tx_out + inner join + stake_address on tx_out.stake_address_id = stake_address.id + where + address = 'addr1q8u4wgd8qplhxpt4xm2l8yagy5ng7veurwrns2ysh03zuh2l8vdgga5pvrmprvd67asp7tr6vrwwnjku5l7ly4xhq9esr9h59t'; + +--- Select transaction outputs for specified transaction hash +select + tx_out.* + from + tx_out + inner join + tx on tx_out.tx_id = tx.id + where + tx.hash = '\xabd21556d9bb817d436e33a5fa32619702633dc809e707a5297566e9d74d57c1'; \ No newline at end of file diff --git a/src/controller.rs b/src/controller.rs index 4597e98..87f3c4f 100644 --- a/src/controller.rs +++ b/src/controller.rs @@ -1,7 +1,3 @@ -use crate::{ - postgres::{self, user_already_exists, user_create, user_disable, user_enable}, - Config, Error, Metrics, Result, -}; use futures::StreamExt; use kube::{ api::{Patch, PatchParams}, @@ -20,6 +16,8 @@ use serde_json::json; use std::{sync::Arc, time::Duration}; use tracing::error; +use crate::{postgres::Postgres, Config, Error, Metrics, State}; + pub static DB_SYNC_PORT_FINALIZER: &str = "dbsyncports.demeter.run"; struct Context { @@ -36,15 +34,6 @@ impl Context { } } } -#[derive(Clone, Default)] -pub struct State { - registry: prometheus::Registry, -} -impl State { - pub fn metrics(&self) -> Vec { - self.registry.gather() - } -} #[derive(Debug, Clone, Deserialize, Serialize, JsonSchema)] pub enum Network { @@ -75,11 +64,7 @@ impl DbSyncPort { .unwrap_or(false) } - async fn reconcile( - &self, - ctx: Arc, - pg_client: &mut tokio_postgres::Client, - ) -> Result { + async fn reconcile(&self, ctx: Arc, pg: &mut Postgres) -> Result { let client = ctx.client.clone(); let ns = self.namespace().unwrap(); let name = self.name_any(); @@ -89,9 +74,9 @@ impl DbSyncPort { let password = Alphanumeric.sample_string(&mut rand::thread_rng(), 16); if !self.was_executed() { - match user_already_exists(pg_client, &username).await? { - true => user_enable(pg_client, &username, &password).await?, - false => user_create(pg_client, &username, &password).await?, + match pg.user_already_exists(&username).await? { + true => pg.user_enable(&username, &password).await?, + false => pg.user_create(&username, &password).await?, }; let new_status = Patch::Apply(json!({ @@ -114,19 +99,15 @@ impl DbSyncPort { Ok(Action::requeue(Duration::from_secs(5 * 60))) } - async fn cleanup( - &self, - ctx: Arc, - pg_client: &mut tokio_postgres::Client, - ) -> Result { + async fn cleanup(&self, ctx: Arc, pg: &mut Postgres) -> Result { let username = self.status.as_ref().unwrap().username.clone(); - user_disable(pg_client, &username).await?; + pg.user_disable(&username).await?; ctx.metrics.count_user_deactivated(&username); Ok(Action::await_change()) } } -async fn reconcile(crd: Arc, ctx: Arc) -> Result { +async fn reconcile(crd: Arc, ctx: Arc) -> Result { let url = match crd.spec.network { Network::Mainnet => &ctx.config.db_url_mainnet, Network::Preprod => &ctx.config.db_url_preprod, @@ -136,12 +117,12 @@ async fn reconcile(crd: Arc, ctx: Arc) -> Result { let ns = crd.namespace().unwrap(); let crds: Api = Api::namespaced(ctx.client.clone(), &ns); - let mut pg_client = postgres::connect(url).await?; + let mut postgres = Postgres::new(url).await?; finalizer(&crds, DB_SYNC_PORT_FINALIZER, crd, |event| async { match event { - Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut pg_client).await, - Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut pg_client).await, + Event::Apply(crd) => crd.reconcile(ctx.clone(), &mut postgres).await, + Event::Cleanup(crd) => crd.cleanup(ctx.clone(), &mut postgres).await, } }) .await @@ -154,11 +135,11 @@ fn error_policy(crd: Arc, err: &Error, ctx: Arc) -> Action Action::requeue(Duration::from_secs(5)) } -pub async fn run(state: State, config: Config) -> Result<(), Error> { +pub async fn run(state: Arc, config: Config) -> Result<(), Error> { let client = Client::try_default().await?; let crds = Api::::all(client.clone()); - let metrics = Metrics::default().register(&state.registry).unwrap(); - let ctx = Context::new(client, metrics, config); + + let ctx = Context::new(client, state.metrics.clone(), config); Controller::new(crds, WatcherConfig::default().any_semantic()) .shutdown_on_signal() diff --git a/src/crdgen.rs b/src/crdgen.rs index 57ad587..a3f9a52 100644 --- a/src/crdgen.rs +++ b/src/crdgen.rs @@ -1,3 +1,4 @@ +use ext_cardano_dbsync::controller; use kube::CustomResourceExt; fn main() { diff --git a/src/lib.rs b/src/lib.rs index bdef2a0..f667ac6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,3 +1,6 @@ +use std::time::Duration; + +use prometheus::Registry; use thiserror::Error; #[derive(Error, Debug)] @@ -10,6 +13,15 @@ pub enum Error { #[error("Finalizer Error: {0}")] FinalizerError(#[source] Box>), + + #[error("Env Error: {0}")] + EnvError(#[source] std::env::VarError), + + #[error("Prometheus Error: {0}")] + PrometheusError(#[source] prometheus::Error), + + #[error("Parse Int Error: {0}")] + ParseIntError(#[source] std::num::ParseIntError), } impl Error { @@ -17,44 +29,78 @@ impl Error { format!("{self:?}").to_lowercase() } } - -pub type Result = std::result::Result; - impl From for Error { fn from(value: tokio_postgres::Error) -> Self { Error::PgError(value) } } - impl From for Error { fn from(value: kube::Error) -> Self { Error::KubeError(value) } } +impl From for Error { + fn from(value: std::env::VarError) -> Self { + Error::EnvError(value) + } +} +impl From for Error { + fn from(value: prometheus::Error) -> Self { + Error::PrometheusError(value) + } +} +impl From for Error { + fn from(value: std::num::ParseIntError) -> Self { + Error::ParseIntError(value) + } +} + +#[derive(Clone, Default)] +pub struct State { + registry: Registry, + pub metrics: Metrics, +} +impl State { + pub fn new() -> Self { + let registry = Registry::default(); + let metrics = Metrics::default().register(®istry).unwrap(); + Self { registry, metrics } + } + + pub fn metrics_collected(&self) -> Vec { + self.registry.gather() + } +} +#[derive(Clone)] pub struct Config { pub db_url_mainnet: String, pub db_url_preprod: String, pub db_url_preview: String, + pub metrics_delay: Duration, + pub dcu_base: u64, } impl Config { - pub fn new() -> Self { - Self { - db_url_mainnet: std::env::var("DB_URL_MAINNET").expect("DB_URL_MAINNET must be set"), - db_url_preprod: std::env::var("DB_URL_PREPROD").expect("DB_URL_PREPROD must be set"), - db_url_preview: std::env::var("DB_URL_PREVIEW").expect("DB_URL_PREVIEW must be set"), - } - } -} -impl Default for Config { - fn default() -> Self { - Self::new() + pub fn try_new() -> Result { + let db_url_mainnet = std::env::var("DB_URL_MAINNET")?; + let db_url_preprod = std::env::var("DB_URL_PREPROD")?; + let db_url_preview = std::env::var("DB_URL_PREVIEW")?; + let metrics_delay = Duration::from_secs(std::env::var("METRICS_DELAY")?.parse::()?); + let dcu_base = std::env::var("DCU_BASE")?.parse::()?; + + Ok(Self { + db_url_mainnet, + db_url_preprod, + db_url_preview, + metrics_delay, + dcu_base, + }) } } pub mod controller; +pub mod metrics; pub mod postgres; -pub use crate::controller::*; -mod metrics; -pub use metrics::Metrics; +pub use controller::*; +pub use metrics::*; diff --git a/src/main.rs b/src/main.rs index 610e1f8..36b36c5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,15 +1,16 @@ -use std::io; - use actix_web::{ get, middleware, web::Data, App, HttpRequest, HttpResponse, HttpServer, Responder, }; -use controller::{Config, State}; use dotenv::dotenv; use prometheus::{Encoder, TextEncoder}; +use std::{io, sync::Arc}; +use tracing::error; + +use ext_cardano_dbsync::{controller, metrics as metrics_collector, Config, State}; #[get("/metrics")] -async fn metrics(c: Data, _req: HttpRequest) -> impl Responder { - let metrics = c.metrics(); +async fn metrics(c: Data>, _req: HttpRequest) -> impl Responder { + let metrics = c.metrics_collected(); let encoder = TextEncoder::new(); let mut buffer = vec![]; encoder.encode(&metrics, &mut buffer).unwrap(); @@ -25,10 +26,11 @@ async fn health(_: HttpRequest) -> impl Responder { async fn main() -> io::Result<()> { dotenv().ok(); - let state = State::default(); - let config = Config::default(); + let state = Arc::new(State::new()); + let config = Config::try_new().unwrap(); - let controller = controller::run(state.clone(), config); + let controller = controller::run(state.clone(), config.clone()); + let metrics_collector = metrics_collector::run_metrics_collector(state.clone(), config.clone()); let addr = std::env::var("ADDR").unwrap_or("0.0.0.0:8080".into()); @@ -41,7 +43,11 @@ async fn main() -> io::Result<()> { }) .bind(addr)?; - tokio::join!(controller, server.run()).1?; + let result = tokio::join!(controller, metrics_collector, server.run()).1; + if let Err(err) = result { + error!("{err}"); + std::process::exit(1) + } Ok(()) } diff --git a/src/metrics.rs b/src/metrics.rs index 61ca56b..08492e8 100644 --- a/src/metrics.rs +++ b/src/metrics.rs @@ -1,6 +1,12 @@ -use crate::{DbSyncPort, Error}; use kube::ResourceExt; use prometheus::{opts, IntCounterVec, Registry}; +use std::{sync::Arc, thread::sleep}; +use tracing::error; + +use crate::{ + postgres::Postgres, + Config, DbSyncPort, Error, State, +}; #[derive(Clone)] pub struct Metrics { @@ -68,3 +74,39 @@ impl Metrics { self.users_deactivated.with_label_values(&[username]).inc(); } } + +pub async fn run_metrics_collector(state: Arc, config: Config) -> Result<(), Error> { + let db_urls = &vec![ + config.db_url_mainnet, + config.db_url_preprod, + config.db_url_preview, + ]; + + loop { + for url in db_urls { + let postgres_result = Postgres::new(url).await; + if let Err(err) = postgres_result { + error!("Error to connect postgres: {err}"); + continue; + } + let postgres = postgres_result.unwrap(); + + let user_statements_result = postgres.user_metrics().await; + if let Err(err) = user_statements_result { + error!("Error get user statements: {err}"); + continue; + } + + let user_statements = user_statements_result.unwrap(); + if user_statements.is_none() { + continue; + } + + let user_statements = user_statements.unwrap(); + + // TODO: calculate dcu + } + + sleep(config.metrics_delay) + } +} diff --git a/src/postgres.rs b/src/postgres.rs index 7fcad96..71bd33c 100644 --- a/src/postgres.rs +++ b/src/postgres.rs @@ -1,82 +1,123 @@ -use tokio_postgres::{Client, NoTls}; +use tokio_postgres::{Client, NoTls, Row}; use crate::Error; -pub async fn connect(url: &str) -> Result { - let (client, connection) = tokio_postgres::connect(url, NoTls).await?; - - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); - - Ok(client) +pub struct Postgres { + client: Client, } -pub async fn user_create(client: &mut Client, username: &str, password: &str) -> Result<(), Error> { - let query_create_user = format!("create user \"{username}\" with password '{password}';"); - let query_create_role = - format!("grant select on all tables in schema public to \"{username}\";"); +impl Postgres { + pub async fn new(url: &str) -> Result { + let (client, connection) = tokio_postgres::connect(url, NoTls).await?; - let tx = client.transaction().await?; + tokio::spawn(async move { + if let Err(e) = connection.await { + eprintln!("connection error: {}", e); + } + }); - let user_stmt = tx.prepare(&query_create_user).await?; - let user_result = tx.execute(&user_stmt, &[]).await; - if let Err(err) = user_result { - tx.rollback().await?; - return Err(Error::PgError(err)); + Ok(Self { client }) } - let role_stmt = tx.prepare(&query_create_role).await?; - let role_result = tx.execute(&role_stmt, &[]).await; - if let Err(err) = role_result { - tx.rollback().await?; - return Err(Error::PgError(err)); - } + pub async fn user_create(&mut self, username: &str, password: &str) -> Result<(), Error> { + let query_create_user = format!("create user \"{username}\" with password '{password}';"); + let query_create_role = + format!("grant select on all tables in schema public to \"{username}\";"); - tx.commit().await?; - Ok(()) -} + let tx = self.client.transaction().await?; -pub async fn user_disable(client: &mut Client, username: &str) -> Result<(), Error> { - let query_revoke_login = format!("alter user \"{username}\" with nologin;"); + let user_stmt = tx.prepare(&query_create_user).await?; + let user_result = tx.execute(&user_stmt, &[]).await; + if let Err(err) = user_result { + tx.rollback().await?; + return Err(Error::PgError(err)); + } - let revoke_stmt = client.prepare(&query_revoke_login).await?; - client.execute(&revoke_stmt, &[]).await?; + let role_stmt = tx.prepare(&query_create_role).await?; + let role_result = tx.execute(&role_stmt, &[]).await; + if let Err(err) = role_result { + tx.rollback().await?; + return Err(Error::PgError(err)); + } - Ok(()) -} + tx.commit().await?; + Ok(()) + } -pub async fn user_enable(client: &mut Client, username: &str, password: &str) -> Result<(), Error> { - let query_grant_login = format!("alter user \"{username}\" with login;"); - let query_alter_password = format!("alter user \"{username}\" with password '{password}';"); + pub async fn user_disable(&self, username: &str) -> Result<(), Error> { + let query_revoke_login = format!("alter user \"{username}\" with nologin;"); - let tx = client.transaction().await?; + let revoke_stmt = self.client.prepare(&query_revoke_login).await?; + self.client.execute(&revoke_stmt, &[]).await?; - let login_stmt = tx.prepare(&query_grant_login).await?; - let login_result = tx.execute(&login_stmt, &[]).await; - if let Err(err) = login_result { - tx.rollback().await?; - return Err(Error::PgError(err)); + Ok(()) } - let alter_stmt = tx.prepare(&query_alter_password).await?; - let alter_result = tx.execute(&alter_stmt, &[]).await; - if let Err(err) = alter_result { - tx.rollback().await?; - return Err(Error::PgError(err)); + pub async fn user_enable(&mut self, username: &str, password: &str) -> Result<(), Error> { + let query_grant_login = format!("alter user \"{username}\" with login;"); + let query_alter_password = format!("alter user \"{username}\" with password '{password}';"); + + let tx = self.client.transaction().await?; + + let login_stmt = tx.prepare(&query_grant_login).await?; + let login_result = tx.execute(&login_stmt, &[]).await; + if let Err(err) = login_result { + tx.rollback().await?; + return Err(Error::PgError(err)); + } + + let alter_stmt = tx.prepare(&query_alter_password).await?; + let alter_result = tx.execute(&alter_stmt, &[]).await; + if let Err(err) = alter_result { + tx.rollback().await?; + return Err(Error::PgError(err)); + } + + tx.commit().await?; + Ok(()) } - tx.commit().await?; - Ok(()) -} + pub async fn user_already_exists(&self, username: &str) -> Result { + let query = "select rolname from pg_roles where rolname = $1;"; -pub async fn user_already_exists(client: &mut Client, username: &str) -> Result { - let query = "select rolname from pg_roles where rolname = $1;"; + let user_stmt = self.client.prepare(query).await?; + let result = self.client.query_opt(&user_stmt, &[&username]).await?; - let user_stmt = client.prepare(query).await?; - let result = client.query_opt(&user_stmt, &[&username]).await?; + Ok(result.is_some()) + } - Ok(result.is_some()) + pub async fn user_metrics(&self) -> Result>, Error> { + let query_metrics = "SELECT + usename, + SUM(calls) AS total_queries, + SUM(total_exec_time) AS total_exec_time + FROM + pg_stat_statements + inner join + pg_catalog.pg_user on pg_catalog.pg_user.usesysid = userid + GROUP BY + usename;"; + + let stmt = self.client.prepare(&query_metrics).await?; + let result = self.client.query(&stmt, &[]).await?; + + Ok(result + .is_empty() + .then_some(result.iter().map(|r| r.into()).collect())) + } +} + +pub struct UserStatements { + pub usename: String, + pub total_queries: u32, + pub total_exec_time: u32, +} +impl From<&Row> for UserStatements { + fn from(row: &Row) -> Self { + Self { + usename: row.get("usename"), + total_queries: row.get("total_queries"), + total_exec_time: row.get("total_exec_time"), + } + } }