From d7e8c73da94fc89fcbe338afd61bac2924378299 Mon Sep 17 00:00:00 2001 From: bodymindarts Date: Wed, 19 Feb 2025 11:33:36 +0100 Subject: [PATCH] chore: enable tracing across outbox handling --- Cargo.lock | 3 ++ core/customer/src/lib.rs | 6 +-- .../migrations/20240517074612_core_setup.sql | 1 + lana/customer-onboarding/Cargo.toml | 1 + lana/customer-onboarding/src/job.rs | 51 +++++++++++++++---- lib/kratos-admin/Cargo.toml | 1 + lib/kratos-admin/src/lib.rs | 1 + lib/outbox/Cargo.toml | 1 + lib/outbox/src/event.rs | 8 +++ lib/outbox/src/repo.rs | 19 +++++-- lib/tracing-utils/Cargo.toml | 1 + lib/tracing-utils/src/lib.rs | 46 +++++++++++++++++ 12 files changed, 122 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 47760cd49..74261569e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1156,6 +1156,7 @@ dependencies = [ "strum", "thiserror 1.0.69", "tokio", + "tracing", "uuid", ] @@ -2499,6 +2500,7 @@ dependencies = [ "serde", "serde_json", "thiserror 1.0.69", + "tracing", "uuid", ] @@ -3071,6 +3073,7 @@ dependencies = [ "sqlx", "tokio", "tokio-stream", + "tracing-utils", "uuid", ] diff --git a/core/customer/src/lib.rs b/core/customer/src/lib.rs index 301922039..e9b764ce6 100644 --- a/core/customer/src/lib.rs +++ b/core/customer/src/lib.rs @@ -136,7 +136,7 @@ where Ok(customer) } - #[instrument(name = "core_custorem.find_for_subject", skip(self))] + #[instrument(name = "customer.find_for_subject", skip(self))] pub async fn find_for_subject( &self, sub: &<::Audit as AuditSvc>::Subject, @@ -220,7 +220,7 @@ where } #[instrument( - name = "core_customer.update_authentication_id_for_customer", + name = "customer.update_authentication_id_for_customer", skip(self, authentication_id) )] pub async fn update_authentication_id_for_customer( @@ -247,7 +247,7 @@ where } #[instrument( - name = "core_customer.find_by_authentication_id", + name = "customer.find_by_authentication_id", skip(self, authentication_id) )] pub async fn find_by_authentication_id( diff --git a/lana/app/migrations/20240517074612_core_setup.sql b/lana/app/migrations/20240517074612_core_setup.sql index c70e2b0e9..8d446ee2f 100644 --- a/lana/app/migrations/20240517074612_core_setup.sql +++ b/lana/app/migrations/20240517074612_core_setup.sql @@ -362,6 +362,7 @@ CREATE TABLE persistent_outbox_events ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), sequence BIGSERIAL UNIQUE, payload JSONB, + tracing_context JSONB, recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); diff --git a/lana/customer-onboarding/Cargo.toml b/lana/customer-onboarding/Cargo.toml index 2052dc19e..513d8fb14 100644 --- a/lana/customer-onboarding/Cargo.toml +++ b/lana/customer-onboarding/Cargo.toml @@ -23,6 +23,7 @@ kratos-admin = { path = "../../lib/kratos-admin" } sim-time = { workspace = true, optional = true } es-entity = { workspace = true, features = ["graphql"] } +tracing = { workspace = true } uuid = { workspace = true } strum = { workspace = true } serde = { workspace = true } diff --git a/lana/customer-onboarding/src/job.rs b/lana/customer-onboarding/src/job.rs index 16bcf350c..7a2ed3cd1 100644 --- a/lana/customer-onboarding/src/job.rs +++ b/lana/customer-onboarding/src/job.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use futures::StreamExt; use kratos_admin::KratosAdmin; +use tracing::instrument; use audit::AuditSvc; use authz::PermissionCheck; @@ -11,7 +12,7 @@ use deposit::{ CoreDepositAction, CoreDepositEvent, CoreDepositObject, GovernanceAction, GovernanceObject, }; use governance::GovernanceEvent; -use outbox::{Outbox, OutboxEventMarker}; +use outbox::{Outbox, OutboxEventMarker, PersistentOutboxEvent}; use job::*; @@ -145,16 +146,8 @@ where let mut stream = self.outbox.listen_persisted(Some(state.sequence)).await?; while let Some(message) = stream.next().await { - if let Some(CoreCustomerEvent::CustomerCreated { id, email }) = - &message.as_ref().as_event() - { - let authentication_id = self - .kratos_admin - .create_user::(email.clone()) - .await?; - self.customers - .update_authentication_id_for_customer(*id, authentication_id) - .await?; + if let Some(CoreCustomerEvent::CustomerCreated { .. }) = &message.as_ref().as_event() { + self.handle_customer_created_event(message.as_ref()).await?; } } @@ -162,3 +155,39 @@ where Ok(JobCompletion::RescheduleAt(now)) } } + +impl CustomerOnboardingJobRunner +where + Perms: PermissionCheck, + <::Audit as AuditSvc>::Action: + From + From + From, + <::Audit as AuditSvc>::Object: + From + From + From, + E: OutboxEventMarker + + OutboxEventMarker + + OutboxEventMarker, +{ + #[instrument( + name = "customer_onboarding.handle_customer_created_event", + skip(self, message) + )] + async fn handle_customer_created_event( + &self, + message: &PersistentOutboxEvent, + ) -> Result<(), Box> + where + E: OutboxEventMarker, + { + if let Some(CoreCustomerEvent::CustomerCreated { id, email }) = message.as_event() { + message.inject_trace_parent(); + let authentication_id = self + .kratos_admin + .create_user::(email.clone()) + .await?; + self.customers + .update_authentication_id_for_customer(*id, authentication_id) + .await?; + } + Ok(()) + } +} diff --git a/lib/kratos-admin/Cargo.toml b/lib/kratos-admin/Cargo.toml index b7ab03553..ceb79ab3a 100644 --- a/lib/kratos-admin/Cargo.toml +++ b/lib/kratos-admin/Cargo.toml @@ -14,3 +14,4 @@ thiserror = { workspace = true } serde = { workspace = true } serde_json = { workspace = true } ory-kratos-client = { workspace = true } +tracing = { workspace = true } diff --git a/lib/kratos-admin/src/lib.rs b/lib/kratos-admin/src/lib.rs index 6cbfeff4f..71e85939a 100644 --- a/lib/kratos-admin/src/lib.rs +++ b/lib/kratos-admin/src/lib.rs @@ -24,6 +24,7 @@ impl KratosAdmin { } } + #[tracing::instrument(name = "kratos_admin.create_user", skip(self))] pub async fn create_user(&self, email: String) -> Result where T: From, diff --git a/lib/outbox/Cargo.toml b/lib/outbox/Cargo.toml index e751223f6..5bb62a630 100644 --- a/lib/outbox/Cargo.toml +++ b/lib/outbox/Cargo.toml @@ -9,6 +9,7 @@ fail-on-warnings = [] [dependencies] es-entity = { workspace = true } +tracing-utils = { path = "../tracing-utils", features = ["persistence"] } serde = { workspace = true } serde_json = { workspace = true } diff --git a/lib/outbox/src/event.rs b/lib/outbox/src/event.rs index a50876fbd..bacbc99cc 100644 --- a/lib/outbox/src/event.rs +++ b/lib/outbox/src/event.rs @@ -53,6 +53,7 @@ where pub sequence: EventSequence, #[serde(bound = "T: DeserializeOwned")] pub payload: Option, + pub(crate) tracing_context: Option, pub recorded_at: chrono::DateTime, } @@ -65,6 +66,7 @@ where id: self.id, sequence: self.sequence, payload: self.payload.clone(), + tracing_context: self.tracing_context.clone(), recorded_at: self.recorded_at, } } @@ -84,6 +86,12 @@ where None } } + + pub fn inject_trace_parent(&self) { + if let Some(context) = &self.tracing_context { + tracing_utils::persistence::set_parent(context); + } + } } #[derive( diff --git a/lib/outbox/src/repo.rs b/lib/outbox/src/repo.rs index 6b20feb79..cf190af86 100644 --- a/lib/outbox/src/repo.rs +++ b/lib/outbox/src/repo.rs @@ -62,15 +62,20 @@ where return Ok(Vec::new()); } + let tracing_context = tracing_utils::persistence::extract(); + let tracing_json = + serde_json::to_value(&tracing_context).expect("Could not serialize tracing context"); + let rows = sqlx::query!( r#"WITH new_events AS ( - INSERT INTO persistent_outbox_events (payload) - SELECT unnest($1::jsonb[]) AS payload + INSERT INTO persistent_outbox_events (payload, tracing_context) + SELECT unnest($1::jsonb[]) AS payload, $2::jsonb AS tracing_context RETURNING id AS "id: OutboxEventId", sequence AS "sequence: EventSequence", recorded_at ) SELECT * FROM new_events "#, &serialized_events as _, + tracing_json ) .fetch_all(&mut **db) .await?; @@ -81,6 +86,7 @@ where id: row.id, sequence: row.sequence, recorded_at: row.recorded_at, + tracing_context: Some(tracing_context.clone()), payload: Some(payload), }) .collect::>(); @@ -101,6 +107,7 @@ where g.seq AS "sequence!: EventSequence", e.id AS "id?", e.payload AS "payload?", + e.tracing_context AS "tracing_context?", e.recorded_at AS "recorded_at?" FROM generate_series(LEAST($1 + 1, (SELECT max FROM max_sequence)), @@ -131,6 +138,9 @@ where payload: row .payload .map(|p| serde_json::from_value(p).expect("Could not deserialize payload")), + tracing_context: row + .tracing_context + .map(|p| serde_json::from_value(p).expect("Could not deserialize payload")), recorded_at: row.recorded_at.unwrap_or_default(), }); } @@ -142,7 +152,7 @@ where SELECT unnest($1::bigint[]) AS sequence ON CONFLICT (sequence) DO UPDATE SET sequence = EXCLUDED.sequence - RETURNING id, sequence AS "sequence!: EventSequence", payload, recorded_at + RETURNING id, sequence AS "sequence!: EventSequence", payload, tracing_context, recorded_at "#, &empty_ids as &[EventSequence] ) @@ -155,6 +165,9 @@ where payload: row .payload .map(|p| serde_json::from_value(p).expect("Could not deserialize payload")), + tracing_context: row + .tracing_context + .map(|p| serde_json::from_value(p).expect("Could not deserialize payload")), recorded_at: row.recorded_at, }); } diff --git a/lib/tracing-utils/Cargo.toml b/lib/tracing-utils/Cargo.toml index f779707c3..c1ef649a9 100644 --- a/lib/tracing-utils/Cargo.toml +++ b/lib/tracing-utils/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" fail-on-warnings = [] http = ["dep:opentelemetry-http"] +persistence = [] [dependencies] anyhow = { workspace = true } diff --git a/lib/tracing-utils/src/lib.rs b/lib/tracing-utils/src/lib.rs index a420776e3..32e0d0766 100644 --- a/lib/tracing-utils/src/lib.rs +++ b/lib/tracing-utils/src/lib.rs @@ -104,3 +104,49 @@ pub mod http { header_map } } + +#[cfg(feature = "persistence")] +pub mod persistence { + use serde::{Deserialize, Serialize}; + use tracing_opentelemetry::OpenTelemetrySpanExt; + + #[derive(Serialize, Deserialize, Debug, Clone)] + pub struct SerializableTraceContext { + pub traceparent: Option, + pub tracestate: Option, + } + + pub fn extract() -> SerializableTraceContext { + use opentelemetry::propagation::TextMapPropagator; + use opentelemetry_sdk::propagation::TraceContextPropagator; + + let mut carrier = std::collections::HashMap::new(); + let propagator = TraceContextPropagator::new(); + let current_context = tracing::Span::current().context(); + + propagator.inject_context(¤t_context, &mut carrier); + + SerializableTraceContext { + traceparent: carrier.get("traceparent").cloned(), + tracestate: carrier.get("tracestate").cloned(), + } + } + + pub fn set_parent(context: &SerializableTraceContext) { + use opentelemetry::propagation::TextMapPropagator; + use opentelemetry_sdk::propagation::TraceContextPropagator; + + let mut carrier = std::collections::HashMap::new(); + + if let Some(traceparent) = &context.traceparent { + carrier.insert("traceparent".to_string(), traceparent.clone()); + } + if let Some(tracestate) = &context.tracestate { + carrier.insert("tracestate".to_string(), tracestate.clone()); + } + + let propagator = TraceContextPropagator::new(); + let extracted_context = propagator.extract(&carrier); + tracing::Span::current().set_parent(extracted_context); + } +}