Skip to content

Commit 904f1f4

Browse files
authored
feat: improve tracing logs (#17)
1 parent aa4fae2 commit 904f1f4

File tree

3 files changed

+27
-19
lines changed

3 files changed

+27
-19
lines changed

operator/src/controller.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use serde::{Deserialize, Serialize};
1616
use serde_json::json;
1717
use sha3::{Digest, Sha3_256};
1818
use std::{sync::Arc, time::Duration};
19-
use tracing::error;
19+
use tracing::{error, info, instrument};
2020

2121
use crate::{postgres::Postgres, Error, Network, State};
2222

@@ -77,6 +77,7 @@ impl DbSyncPort {
7777
.await
7878
.map_err(Error::KubeError)?;
7979

80+
info!({ username }, "user created");
8081
state.metrics.count_user_created(&ns, &self.spec.network);
8182
};
8283

@@ -88,9 +89,9 @@ impl DbSyncPort {
8889
let ns = self.namespace().unwrap();
8990
let username = self.status.as_ref().unwrap().username.clone();
9091
pg.drop_user(&username).await?;
91-
state
92-
.metrics
93-
.count_user_dropped(&ns, &self.spec.network);
92+
93+
info!({ username }, "user dropped");
94+
state.metrics.count_user_dropped(&ns, &self.spec.network);
9495
}
9596

9697
Ok(Action::await_change())
@@ -113,9 +114,9 @@ async fn reconcile(crd: Arc<DbSyncPort>, state: Arc<State>) -> Result<Action, Er
113114
.map_err(|e| Error::FinalizerError(Box::new(e)))
114115
}
115116

116-
fn error_policy(crd: Arc<DbSyncPort>, err: &Error, state: Arc<State>) -> Action {
117-
error!("reconcile failed: {:?}", err);
118-
state.metrics.reconcile_failure(&crd, err);
117+
fn error_policy(crd: Arc<DbSyncPort>, error: &Error, state: Arc<State>) -> Action {
118+
error!(error = error.to_string(), "reconcile failed");
119+
state.metrics.reconcile_failure(&crd, error);
119120
Action::requeue(Duration::from_secs(5))
120121
}
121122

@@ -135,12 +136,12 @@ async fn gen_username_hash(username: &str) -> Result<String, Error> {
135136
Ok(bech32_truncated)
136137
}
137138

139+
#[instrument("controller run", skip_all)]
138140
pub async fn run(state: Arc<State>) -> Result<(), Error> {
141+
info!("listening crds running");
139142
let client = Client::try_default().await?;
140143
let crds = Api::<DbSyncPort>::all(client.clone());
141144

142-
// let ctx = Context::new(client, state.clone());
143-
144145
Controller::new(crds, WatcherConfig::default().any_semantic())
145146
.shutdown_on_signal()
146147
.run(reconcile, error_policy, state)

operator/src/main.rs

+7-3
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use actix_web::{
44
use dotenv::dotenv;
55
use prometheus::{Encoder, TextEncoder};
66
use std::{io, sync::Arc};
7+
use tracing::{info, Level};
78

89
use ext_cardano_dbsync::{controller, metrics as metrics_collector, State};
910

@@ -25,6 +26,8 @@ async fn health(_: HttpRequest) -> impl Responder {
2526
async fn main() -> io::Result<()> {
2627
dotenv().ok();
2728

29+
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
30+
2831
let state = Arc::new(State::try_new().await?);
2932

3033
let controller = tokio::spawn(controller::run(state.clone()));
@@ -39,14 +42,15 @@ async fn main() -> io::Result<()> {
3942
.service(health)
4043
.service(metrics)
4144
})
42-
.bind(addr)?;
45+
.bind(&addr)?;
46+
info!({ addr }, "metrics server running");
4347

4448
let signal = tokio::spawn(async {
45-
tokio::signal::ctrl_c().await.expect("Fail to exit");
49+
tokio::signal::ctrl_c().await.expect("fail to exit");
4650
std::process::exit(0);
4751
});
4852

49-
tokio::join!(controller, metrics_collector, server.run(), signal).2?;
53+
tokio::join!(server.run(), controller, metrics_collector, signal).0?;
5054

5155
Ok(())
5256
}

operator/src/metrics.rs

+10-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use kube::{api::ListParams, Api, Client, Resource, ResourceExt};
22
use prometheus::{opts, IntCounterVec, Registry};
33
use std::{collections::HashMap, sync::Arc, thread::sleep};
4-
use tracing::error;
4+
use tracing::{error, info, instrument};
55

66
use crate::{get_config, postgres::UserStatements, DbSyncPort, Error, Network, State};
77

@@ -121,7 +121,10 @@ fn get_project_id(namespace: &str) -> String {
121121
namespace.split_once("prj-").unwrap().1.into()
122122
}
123123

124+
#[instrument("metrics collector run", skip_all)]
124125
pub async fn run_metrics_collector(state: Arc<State>) -> Result<(), Error> {
126+
info!("collecting metrics running");
127+
125128
let client = Client::try_default().await?;
126129
let config = get_config();
127130

@@ -130,9 +133,9 @@ pub async fn run_metrics_collector(state: Arc<State>) -> Result<(), Error> {
130133
loop {
131134
let crds_api = Api::<DbSyncPort>::all(client.clone());
132135
let crds_result = crds_api.list(&ListParams::default()).await;
133-
if let Err(err) = crds_result {
134-
error!("Error to get k8s resources: {err}");
135-
state.metrics.metrics_failure(&err.into());
136+
if let Err(error) = crds_result {
137+
error!(error = error.to_string(), "error to get k8s resources");
138+
state.metrics.metrics_failure(&error.into());
136139
continue;
137140
}
138141
let crds = crds_result.unwrap();
@@ -143,9 +146,9 @@ pub async fn run_metrics_collector(state: Arc<State>) -> Result<(), Error> {
143146
let postgres = state.get_pg_by_network(&crd.spec.network);
144147

145148
let user_statements_result = postgres.find_metrics_by_user(&status.username).await;
146-
if let Err(err) = user_statements_result {
147-
error!("Error get user statements: {err}");
148-
state.metrics.metrics_failure(&err);
149+
if let Err(error) = user_statements_result {
150+
error!(error = error.to_string(), "error get user statements");
151+
state.metrics.metrics_failure(&error);
149152
continue;
150153
}
151154

0 commit comments

Comments
 (0)