Skip to content

Commit

Permalink
chore: enable tracing across outbox handling
Browse files Browse the repository at this point in the history
  • Loading branch information
bodymindarts committed Feb 19, 2025
1 parent 01d2dab commit d7e8c73
Show file tree
Hide file tree
Showing 12 changed files with 122 additions and 17 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions core/customer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ where
Ok(customer)
}

#[instrument(name = "core_custorem.find_for_subject", skip(self))]
#[instrument(name = "customer.find_for_subject", skip(self))]
pub async fn find_for_subject(
&self,
sub: &<<Perms as PermissionCheck>::Audit as AuditSvc>::Subject,
Expand Down Expand Up @@ -220,7 +220,7 @@ where
}

#[instrument(
name = "core_customer.update_authentication_id_for_customer",
name = "customer.update_authentication_id_for_customer",
skip(self, authentication_id)
)]
pub async fn update_authentication_id_for_customer(
Expand All @@ -247,7 +247,7 @@ where
}

#[instrument(
name = "core_customer.find_by_authentication_id",
name = "customer.find_by_authentication_id",
skip(self, authentication_id)
)]
pub async fn find_by_authentication_id(
Expand Down
1 change: 1 addition & 0 deletions lana/app/migrations/20240517074612_core_setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ CREATE TABLE persistent_outbox_events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
sequence BIGSERIAL UNIQUE,
payload JSONB,
tracing_context JSONB,
recorded_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
seen_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Expand Down
1 change: 1 addition & 0 deletions lana/customer-onboarding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ kratos-admin = { path = "../../lib/kratos-admin" }
sim-time = { workspace = true, optional = true }
es-entity = { workspace = true, features = ["graphql"] }

tracing = { workspace = true }
uuid = { workspace = true }
strum = { workspace = true }
serde = { workspace = true }
Expand Down
51 changes: 40 additions & 11 deletions lana/customer-onboarding/src/job.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use async_trait::async_trait;
use futures::StreamExt;
use kratos_admin::KratosAdmin;
use tracing::instrument;

use audit::AuditSvc;
use authz::PermissionCheck;
Expand All @@ -11,7 +12,7 @@ use deposit::{
CoreDepositAction, CoreDepositEvent, CoreDepositObject, GovernanceAction, GovernanceObject,
};
use governance::GovernanceEvent;
use outbox::{Outbox, OutboxEventMarker};
use outbox::{Outbox, OutboxEventMarker, PersistentOutboxEvent};

use job::*;

Expand Down Expand Up @@ -145,20 +146,48 @@ where
let mut stream = self.outbox.listen_persisted(Some(state.sequence)).await?;

while let Some(message) = stream.next().await {
if let Some(CoreCustomerEvent::CustomerCreated { id, email }) =
&message.as_ref().as_event()
{
let authentication_id = self
.kratos_admin
.create_user::<AuthenticationId>(email.clone())
.await?;
self.customers
.update_authentication_id_for_customer(*id, authentication_id)
.await?;
if let Some(CoreCustomerEvent::CustomerCreated { .. }) = &message.as_ref().as_event() {
self.handle_customer_created_event(message.as_ref()).await?;
}
}

let now = crate::time::now();
Ok(JobCompletion::RescheduleAt(now))
}
}

impl<Perms, E> CustomerOnboardingJobRunner<Perms, E>
where
Perms: PermissionCheck,
<<Perms as PermissionCheck>::Audit as AuditSvc>::Action:
From<CoreCustomerAction> + From<CoreDepositAction> + From<GovernanceAction>,
<<Perms as PermissionCheck>::Audit as AuditSvc>::Object:
From<CustomerObject> + From<CoreDepositObject> + From<GovernanceObject>,
E: OutboxEventMarker<CoreCustomerEvent>
+ OutboxEventMarker<CoreDepositEvent>
+ OutboxEventMarker<GovernanceEvent>,
{
#[instrument(
name = "customer_onboarding.handle_customer_created_event",
skip(self, message)
)]
async fn handle_customer_created_event(
&self,
message: &PersistentOutboxEvent<E>,
) -> Result<(), Box<dyn std::error::Error>>
where
E: OutboxEventMarker<CoreCustomerEvent>,
{
if let Some(CoreCustomerEvent::CustomerCreated { id, email }) = message.as_event() {
message.inject_trace_parent();
let authentication_id = self
.kratos_admin
.create_user::<AuthenticationId>(email.clone())
.await?;
self.customers
.update_authentication_id_for_customer(*id, authentication_id)
.await?;
}
Ok(())
}
}
1 change: 1 addition & 0 deletions lib/kratos-admin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ thiserror = { workspace = true }
serde = { workspace = true }
serde_json = { workspace = true }
ory-kratos-client = { workspace = true }
tracing = { workspace = true }
1 change: 1 addition & 0 deletions lib/kratos-admin/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ impl KratosAdmin {
}
}

#[tracing::instrument(name = "kratos_admin.create_user", skip(self))]
pub async fn create_user<T>(&self, email: String) -> Result<T, KratosAdminError>
where
T: From<Uuid>,
Expand Down
1 change: 1 addition & 0 deletions lib/outbox/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ fail-on-warnings = []

[dependencies]
es-entity = { workspace = true }
tracing-utils = { path = "../tracing-utils", features = ["persistence"] }

serde = { workspace = true }
serde_json = { workspace = true }
Expand Down
8 changes: 8 additions & 0 deletions lib/outbox/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ where
pub sequence: EventSequence,
#[serde(bound = "T: DeserializeOwned")]
pub payload: Option<T>,
pub(crate) tracing_context: Option<tracing_utils::persistence::SerializableTraceContext>,
pub recorded_at: chrono::DateTime<chrono::Utc>,
}

Expand All @@ -65,6 +66,7 @@ where
id: self.id,
sequence: self.sequence,
payload: self.payload.clone(),
tracing_context: self.tracing_context.clone(),
recorded_at: self.recorded_at,
}
}
Expand All @@ -84,6 +86,12 @@ where
None
}
}

pub fn inject_trace_parent(&self) {
if let Some(context) = &self.tracing_context {
tracing_utils::persistence::set_parent(context);
}
}
}

#[derive(
Expand Down
19 changes: 16 additions & 3 deletions lib/outbox/src/repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,15 +62,20 @@ where
return Ok(Vec::new());
}

let tracing_context = tracing_utils::persistence::extract();
let tracing_json =
serde_json::to_value(&tracing_context).expect("Could not serialize tracing context");

let rows = sqlx::query!(
r#"WITH new_events AS (
INSERT INTO persistent_outbox_events (payload)
SELECT unnest($1::jsonb[]) AS payload
INSERT INTO persistent_outbox_events (payload, tracing_context)
SELECT unnest($1::jsonb[]) AS payload, $2::jsonb AS tracing_context
RETURNING id AS "id: OutboxEventId", sequence AS "sequence: EventSequence", recorded_at
)
SELECT * FROM new_events
"#,
&serialized_events as _,
tracing_json
)
.fetch_all(&mut **db)
.await?;
Expand All @@ -81,6 +86,7 @@ where
id: row.id,
sequence: row.sequence,
recorded_at: row.recorded_at,
tracing_context: Some(tracing_context.clone()),
payload: Some(payload),
})
.collect::<Vec<_>>();
Expand All @@ -101,6 +107,7 @@ where
g.seq AS "sequence!: EventSequence",
e.id AS "id?",
e.payload AS "payload?",
e.tracing_context AS "tracing_context?",
e.recorded_at AS "recorded_at?"
FROM
generate_series(LEAST($1 + 1, (SELECT max FROM max_sequence)),
Expand Down Expand Up @@ -131,6 +138,9 @@ where
payload: row
.payload
.map(|p| serde_json::from_value(p).expect("Could not deserialize payload")),
tracing_context: row
.tracing_context
.map(|p| serde_json::from_value(p).expect("Could not deserialize payload")),
recorded_at: row.recorded_at.unwrap_or_default(),
});
}
Expand All @@ -142,7 +152,7 @@ where
SELECT unnest($1::bigint[]) AS sequence
ON CONFLICT (sequence) DO UPDATE
SET sequence = EXCLUDED.sequence
RETURNING id, sequence AS "sequence!: EventSequence", payload, recorded_at
RETURNING id, sequence AS "sequence!: EventSequence", payload, tracing_context, recorded_at
"#,
&empty_ids as &[EventSequence]
)
Expand All @@ -155,6 +165,9 @@ where
payload: row
.payload
.map(|p| serde_json::from_value(p).expect("Could not deserialize payload")),
tracing_context: row
.tracing_context
.map(|p| serde_json::from_value(p).expect("Could not deserialize payload")),
recorded_at: row.recorded_at,
});
}
Expand Down
1 change: 1 addition & 0 deletions lib/tracing-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ edition = "2021"

fail-on-warnings = []
http = ["dep:opentelemetry-http"]
persistence = []

[dependencies]
anyhow = { workspace = true }
Expand Down
46 changes: 46 additions & 0 deletions lib/tracing-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,49 @@ pub mod http {
header_map
}
}

#[cfg(feature = "persistence")]
pub mod persistence {
use serde::{Deserialize, Serialize};
use tracing_opentelemetry::OpenTelemetrySpanExt;

#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SerializableTraceContext {
pub traceparent: Option<String>,
pub tracestate: Option<String>,
}

pub fn extract() -> SerializableTraceContext {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;

let mut carrier = std::collections::HashMap::new();
let propagator = TraceContextPropagator::new();
let current_context = tracing::Span::current().context();

propagator.inject_context(&current_context, &mut carrier);

SerializableTraceContext {
traceparent: carrier.get("traceparent").cloned(),
tracestate: carrier.get("tracestate").cloned(),
}
}

pub fn set_parent(context: &SerializableTraceContext) {
use opentelemetry::propagation::TextMapPropagator;
use opentelemetry_sdk::propagation::TraceContextPropagator;

let mut carrier = std::collections::HashMap::new();

if let Some(traceparent) = &context.traceparent {
carrier.insert("traceparent".to_string(), traceparent.clone());
}
if let Some(tracestate) = &context.tracestate {
carrier.insert("tracestate".to_string(), tracestate.clone());
}

let propagator = TraceContextPropagator::new();
let extracted_context = propagator.extract(&carrier);
tracing::Span::current().set_parent(extracted_context);
}
}

0 comments on commit d7e8c73

Please sign in to comment.