Skip to content

improve tracing logs #17

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions operator/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
use serde_json::json;
use sha3::{Digest, Sha3_256};
use std::{sync::Arc, time::Duration};
use tracing::error;
use tracing::{error, info, instrument};

use crate::{postgres::Postgres, Error, Network, State};

Expand Down Expand Up @@ -77,6 +77,7 @@ impl DbSyncPort {
.await
.map_err(Error::KubeError)?;

info!({ username }, "user created");
state.metrics.count_user_created(&ns, &self.spec.network);
};

Expand All @@ -88,9 +89,9 @@ impl DbSyncPort {
let ns = self.namespace().unwrap();
let username = self.status.as_ref().unwrap().username.clone();
pg.drop_user(&username).await?;
state
.metrics
.count_user_dropped(&ns, &self.spec.network);

info!({ username }, "user dropped");
state.metrics.count_user_dropped(&ns, &self.spec.network);
}

Ok(Action::await_change())
Expand All @@ -113,9 +114,9 @@ async fn reconcile(crd: Arc<DbSyncPort>, state: Arc<State>) -> Result<Action, Er
.map_err(|e| Error::FinalizerError(Box::new(e)))
}

fn error_policy(crd: Arc<DbSyncPort>, err: &Error, state: Arc<State>) -> Action {
error!("reconcile failed: {:?}", err);
state.metrics.reconcile_failure(&crd, err);
fn error_policy(crd: Arc<DbSyncPort>, error: &Error, state: Arc<State>) -> Action {
error!(error = error.to_string(), "reconcile failed");
state.metrics.reconcile_failure(&crd, error);
Action::requeue(Duration::from_secs(5))
}

Expand All @@ -135,12 +136,12 @@ async fn gen_username_hash(username: &str) -> Result<String, Error> {
Ok(bech32_truncated)
}

#[instrument("controller run", skip_all)]
pub async fn run(state: Arc<State>) -> Result<(), Error> {
info!("listening crds running");
let client = Client::try_default().await?;
let crds = Api::<DbSyncPort>::all(client.clone());

// let ctx = Context::new(client, state.clone());

Controller::new(crds, WatcherConfig::default().any_semantic())
.shutdown_on_signal()
.run(reconcile, error_policy, state)
Expand Down
10 changes: 7 additions & 3 deletions operator/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use actix_web::{
use dotenv::dotenv;
use prometheus::{Encoder, TextEncoder};
use std::{io, sync::Arc};
use tracing::{info, Level};

use ext_cardano_dbsync::{controller, metrics as metrics_collector, State};

Expand All @@ -25,6 +26,8 @@ async fn health(_: HttpRequest) -> impl Responder {
async fn main() -> io::Result<()> {
dotenv().ok();

tracing_subscriber::fmt().with_max_level(Level::INFO).init();

let state = Arc::new(State::try_new().await?);

let controller = tokio::spawn(controller::run(state.clone()));
Expand All @@ -39,14 +42,15 @@ async fn main() -> io::Result<()> {
.service(health)
.service(metrics)
})
.bind(addr)?;
.bind(&addr)?;
info!({ addr }, "metrics server running");

let signal = tokio::spawn(async {
tokio::signal::ctrl_c().await.expect("Fail to exit");
tokio::signal::ctrl_c().await.expect("fail to exit");
std::process::exit(0);
});

tokio::join!(controller, metrics_collector, server.run(), signal).2?;
tokio::join!(server.run(), controller, metrics_collector, signal).0?;

Ok(())
}
17 changes: 10 additions & 7 deletions operator/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use kube::{api::ListParams, Api, Client, Resource, ResourceExt};
use prometheus::{opts, IntCounterVec, Registry};
use std::{collections::HashMap, sync::Arc, thread::sleep};
use tracing::error;
use tracing::{error, info, instrument};

use crate::{get_config, postgres::UserStatements, DbSyncPort, Error, Network, State};

Expand Down Expand Up @@ -121,7 +121,10 @@ fn get_project_id(namespace: &str) -> String {
namespace.split_once("prj-").unwrap().1.into()
}

#[instrument("metrics collector run", skip_all)]
pub async fn run_metrics_collector(state: Arc<State>) -> Result<(), Error> {
info!("collecting metrics running");

let client = Client::try_default().await?;
let config = get_config();

Expand All @@ -130,9 +133,9 @@ pub async fn run_metrics_collector(state: Arc<State>) -> Result<(), Error> {
loop {
let crds_api = Api::<DbSyncPort>::all(client.clone());
let crds_result = crds_api.list(&ListParams::default()).await;
if let Err(err) = crds_result {
error!("Error to get k8s resources: {err}");
state.metrics.metrics_failure(&err.into());
if let Err(error) = crds_result {
error!(error = error.to_string(), "error to get k8s resources");
state.metrics.metrics_failure(&error.into());
continue;
}
let crds = crds_result.unwrap();
Expand All @@ -143,9 +146,9 @@ pub async fn run_metrics_collector(state: Arc<State>) -> Result<(), Error> {
let postgres = state.get_pg_by_network(&crd.spec.network);

let user_statements_result = postgres.find_metrics_by_user(&status.username).await;
if let Err(err) = user_statements_result {
error!("Error get user statements: {err}");
state.metrics.metrics_failure(&err);
if let Err(error) = user_statements_result {
error!(error = error.to_string(), "error get user statements");
state.metrics.metrics_failure(&error);
continue;
}

Expand Down