diff --git a/operator/src/controller.rs b/operator/src/controller.rs index 95d046e..0fa9b76 100644 --- a/operator/src/controller.rs +++ b/operator/src/controller.rs @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize}; use serde_json::json; use sha3::{Digest, Sha3_256}; use std::{sync::Arc, time::Duration}; -use tracing::error; +use tracing::{error, info, instrument}; use crate::{postgres::Postgres, Error, Network, State}; @@ -77,6 +77,7 @@ impl DbSyncPort { .await .map_err(Error::KubeError)?; + info!({ username }, "user created"); state.metrics.count_user_created(&ns, &self.spec.network); }; @@ -88,9 +89,9 @@ impl DbSyncPort { let ns = self.namespace().unwrap(); let username = self.status.as_ref().unwrap().username.clone(); pg.drop_user(&username).await?; - state - .metrics - .count_user_dropped(&ns, &self.spec.network); + + info!({ username }, "user dropped"); + state.metrics.count_user_dropped(&ns, &self.spec.network); } Ok(Action::await_change()) @@ -113,9 +114,9 @@ async fn reconcile(crd: Arc, state: Arc) -> Result, err: &Error, state: Arc) -> Action { - error!("reconcile failed: {:?}", err); - state.metrics.reconcile_failure(&crd, err); +fn error_policy(crd: Arc, error: &Error, state: Arc) -> Action { + error!(error = error.to_string(), "reconcile failed"); + state.metrics.reconcile_failure(&crd, error); Action::requeue(Duration::from_secs(5)) } @@ -135,12 +136,12 @@ async fn gen_username_hash(username: &str) -> Result { Ok(bech32_truncated) } +#[instrument("controller run", skip_all)] pub async fn run(state: Arc) -> Result<(), Error> { + info!("listening crds running"); let client = Client::try_default().await?; let crds = Api::::all(client.clone()); - // let ctx = Context::new(client, state.clone()); - Controller::new(crds, WatcherConfig::default().any_semantic()) .shutdown_on_signal() .run(reconcile, error_policy, state) diff --git a/operator/src/main.rs b/operator/src/main.rs index 3682a26..a09830e 100644 --- a/operator/src/main.rs +++ b/operator/src/main.rs @@ -4,6 +4,7 @@ use actix_web::{ use dotenv::dotenv; use prometheus::{Encoder, TextEncoder}; use std::{io, sync::Arc}; +use tracing::{info, Level}; use ext_cardano_dbsync::{controller, metrics as metrics_collector, State}; @@ -25,6 +26,8 @@ async fn health(_: HttpRequest) -> impl Responder { async fn main() -> io::Result<()> { dotenv().ok(); + tracing_subscriber::fmt().with_max_level(Level::INFO).init(); + let state = Arc::new(State::try_new().await?); let controller = tokio::spawn(controller::run(state.clone())); @@ -39,14 +42,15 @@ async fn main() -> io::Result<()> { .service(health) .service(metrics) }) - .bind(addr)?; + .bind(&addr)?; + info!({ addr }, "metrics server running"); let signal = tokio::spawn(async { - tokio::signal::ctrl_c().await.expect("Fail to exit"); + tokio::signal::ctrl_c().await.expect("fail to exit"); std::process::exit(0); }); - tokio::join!(controller, metrics_collector, server.run(), signal).2?; + tokio::join!(server.run(), controller, metrics_collector, signal).0?; Ok(()) } diff --git a/operator/src/metrics.rs b/operator/src/metrics.rs index 2e32836..087b266 100644 --- a/operator/src/metrics.rs +++ b/operator/src/metrics.rs @@ -1,7 +1,7 @@ use kube::{api::ListParams, Api, Client, Resource, ResourceExt}; use prometheus::{opts, IntCounterVec, Registry}; use std::{collections::HashMap, sync::Arc, thread::sleep}; -use tracing::error; +use tracing::{error, info, instrument}; use crate::{get_config, postgres::UserStatements, DbSyncPort, Error, Network, State}; @@ -121,7 +121,10 @@ fn get_project_id(namespace: &str) -> String { namespace.split_once("prj-").unwrap().1.into() } +#[instrument("metrics collector run", skip_all)] pub async fn run_metrics_collector(state: Arc) -> Result<(), Error> { + info!("collecting metrics running"); + let client = Client::try_default().await?; let config = get_config(); @@ -130,9 +133,9 @@ pub async fn run_metrics_collector(state: Arc) -> Result<(), Error> { loop { let crds_api = Api::::all(client.clone()); let crds_result = crds_api.list(&ListParams::default()).await; - if let Err(err) = crds_result { - error!("Error to get k8s resources: {err}"); - state.metrics.metrics_failure(&err.into()); + if let Err(error) = crds_result { + error!(error = error.to_string(), "error to get k8s resources"); + state.metrics.metrics_failure(&error.into()); continue; } let crds = crds_result.unwrap(); @@ -143,9 +146,9 @@ pub async fn run_metrics_collector(state: Arc) -> Result<(), Error> { let postgres = state.get_pg_by_network(&crd.spec.network); let user_statements_result = postgres.find_metrics_by_user(&status.username).await; - if let Err(err) = user_statements_result { - error!("Error get user statements: {err}"); - state.metrics.metrics_failure(&err); + if let Err(error) = user_statements_result { + error!(error = error.to_string(), "error get user statements"); + state.metrics.metrics_failure(&error); continue; }