Skip to content

Log - modify export/processor to not require mutable self in shutdown and set-resource #2634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
4 changes: 2 additions & 2 deletions opentelemetry-otlp/src/exporter/http/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ impl LogExporter for OtlpHttpClient {
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
let mut client_guard = self.client.lock().map_err(|e| {
OTelSdkError::InternalFailure(format!("Failed to acquire client lock: {}", e))
})?;
Expand All @@ -64,7 +64,7 @@ impl LogExporter for OtlpHttpClient {
Ok(())
}

fn set_resource(&mut self, resource: &opentelemetry_sdk::Resource) {
fn set_resource(&self, resource: &opentelemetry_sdk::Resource) {
self.resource = resource.into();
}
}
6 changes: 3 additions & 3 deletions opentelemetry-sdk/src/logs/export.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,15 +136,15 @@ pub trait LogExporter: Send + Sync + Debug {
) -> impl std::future::Future<Output = OTelSdkResult> + Send;

/// Shuts down the exporter.
fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}
#[cfg(feature = "spec_unstable_logs_enabled")]
/// Chek if logs are enabled.
/// Check if logs are enabled.
fn event_enabled(&self, _level: Severity, _target: &str, _name: &str) -> bool {
// By default, all logs are enabled
true
}
/// Set the resource for the exporter.
fn set_resource(&mut self, _resource: &Resource) {}
fn set_resource(&self, _resource: &Resource) {}
}
4 changes: 2 additions & 2 deletions opentelemetry-sdk/src/logs/in_memory_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,14 @@ impl LogExporter for InMemoryLogExporter {
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
if self.should_reset_on_shutdown {
self.reset();
}
Ok(())
}

fn set_resource(&mut self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) {
let mut res_guard = self.resource.lock().expect("Resource lock poisoned");
*res_guard = resource.clone();
}
Expand Down
77 changes: 21 additions & 56 deletions opentelemetry-sdk/src/logs/log_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,53 +135,31 @@ pub trait LogProcessor: Send + Sync + Debug {
/// ```
#[derive(Debug)]
pub struct SimpleLogProcessor<T: LogExporter> {
exporter: Mutex<T>,
is_shutdown: AtomicBool,
exporter: T,
}

impl<T: LogExporter> SimpleLogProcessor<T> {
pub(crate) fn new(exporter: T) -> Self {
SimpleLogProcessor {
exporter: Mutex::new(exporter),
is_shutdown: AtomicBool::new(false),
exporter: exporter
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the purpose of Mutex lock was to ensure that the multiple exports are not invoked concurrently.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point! We need to enforce that spec requirement.
Will see how to do it without Mutex.

}
}
}

impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
fn emit(&self, record: &mut SdkLogRecord, instrumentation: &InstrumentationScope) {
// noop after shutdown
if self.is_shutdown.load(std::sync::atomic::Ordering::Relaxed) {
// this is a warning, as the user is trying to log after the processor has been shutdown
otel_warn!(
name: "SimpleLogProcessor.Emit.ProcessorShutdown",
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
let result = futures_executor::block_on(self.exporter.export(LogBatch::new(log_tuple)));
if result.is_err() {
otel_error!(
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}", result.err().unwrap())
);
return;
}

let result = self
.exporter
.lock()
.map_err(|_| OTelSdkError::InternalFailure("SimpleLogProcessor mutex poison".into()))
.and_then(|exporter| {
let log_tuple = &[(record as &SdkLogRecord, instrumentation)];
futures_executor::block_on(exporter.export(LogBatch::new(log_tuple)))
});
// Handle errors with specific static names
match result {
Err(OTelSdkError::InternalFailure(_)) => {
// logging as debug as this is not a user error
otel_debug!(
name: "SimpleLogProcessor.Emit.MutexPoisoning",
);
}
Err(err) => {
otel_error!(
name: "SimpleLogProcessor.Emit.ExportError",
error = format!("{}",err)
);
}
_ => {}
else {
otel_debug!(
name: "SimpleLogProcessor.Emit.Success",
);
}
}

Expand All @@ -190,21 +168,11 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
}

fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown
.store(true, std::sync::atomic::Ordering::Relaxed);
if let Ok(mut exporter) = self.exporter.lock() {
exporter.shutdown()
} else {
Err(OTelSdkError::InternalFailure(
"SimpleLogProcessor mutex poison at shutdown".into(),
))
}
self.exporter.shutdown()
}

fn set_resource(&self, resource: &Resource) {
if let Ok(mut exporter) = self.exporter.lock() {
exporter.set_resource(resource);
}
self.exporter.set_resource(resource);
}
}

Expand Down Expand Up @@ -481,7 +449,7 @@ impl LogProcessor for BatchLogProcessor {
}

impl BatchLogProcessor {
pub(crate) fn new<E>(mut exporter: E, config: BatchConfig) -> Self
pub(crate) fn new<E>(exporter: E, config: BatchConfig) -> Self
where
E: LogExporter + Send + Sync + 'static,
{
Expand Down Expand Up @@ -909,11 +877,11 @@ mod tests {
async { Ok(()) }
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

fn set_resource(&mut self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
Expand Down Expand Up @@ -1171,17 +1139,14 @@ mod tests {
let instrumentation: InstrumentationScope = Default::default();

processor.emit(&mut record, &instrumentation);

processor.shutdown().unwrap();

let is_shutdown = processor
.is_shutdown
.load(std::sync::atomic::Ordering::Relaxed);
assert!(is_shutdown);

// Emit after shutdown.
processor.emit(&mut record, &instrumentation);

assert_eq!(1, exporter.get_emitted_logs().unwrap().len())
// SimpleProcessor does not do anything to check if logs
// are flowing after shutdown.
assert_eq!(2, exporter.get_emitted_logs().unwrap().len())
}

#[tokio::test(flavor = "current_thread")]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -325,11 +325,11 @@ mod tests {
async { Ok(()) }
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
Ok(())
}

fn set_resource(&mut self, resource: &Resource) {
fn set_resource(&self, resource: &Resource) {
self.resource
.lock()
.map(|mut res_opt| {
Expand Down
2 changes: 1 addition & 1 deletion opentelemetry-stdout/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ opentelemetry_sdk = { path = "../opentelemetry-sdk", features = ["rt-tokio", "me
opentelemetry-appender-tracing = { path = "../opentelemetry-appender-tracing"}
opentelemetry-semantic-conventions = { path = "../opentelemetry-semantic-conventions" }
tracing = { workspace = true, features = ["std"]}
tracing-subscriber = { workspace = true, features = ["registry", "std"] }
tracing-subscriber = { workspace = true, features = ["env-filter","registry", "std"] }
tokio = { workspace = true, features = ["full"] }
once_cell = { workspace = true }
11 changes: 9 additions & 2 deletions opentelemetry-stdout/examples/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,22 @@ fn init_metrics() -> opentelemetry_sdk::metrics::SdkMeterProvider {
fn init_logs() -> opentelemetry_sdk::logs::SdkLoggerProvider {
use opentelemetry_appender_tracing::layer;
use opentelemetry_sdk::logs::SdkLoggerProvider;
use tracing_subscriber::prelude::*;
use tracing_subscriber::{prelude::*, EnvFilter};

let exporter = opentelemetry_stdout::LogExporter::default();
let provider: SdkLoggerProvider = SdkLoggerProvider::builder()
.with_simple_exporter(exporter)
.with_resource(RESOURCE.clone())
.build();
let filter_otel = EnvFilter::new("info")
.add_directive("hyper=off".parse().unwrap())
.add_directive("opentelemetry=off".parse().unwrap())
.add_directive("tonic=off".parse().unwrap())
.add_directive("h2=off".parse().unwrap())
.add_directive("reqwest=off".parse().unwrap());
let layer = layer::OpenTelemetryTracingBridge::new(&provider);
tracing_subscriber::registry().with(layer).init();
let otel_layer = layer.with_filter(filter_otel);
tracing_subscriber::registry().with(otel_layer).init();
provider
}

Expand Down
18 changes: 10 additions & 8 deletions opentelemetry-stdout/src/logs/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,20 @@ use core::fmt;
use opentelemetry_sdk::error::{OTelSdkError, OTelSdkResult};
use opentelemetry_sdk::logs::LogBatch;
use opentelemetry_sdk::Resource;
use std::sync::atomic;
use std::sync::{atomic, Mutex};
use std::sync::atomic::Ordering;

/// An OpenTelemetry exporter that writes Logs to stdout on export.
pub struct LogExporter {
resource: Resource,
resource: Mutex<Resource>,
is_shutdown: atomic::AtomicBool,
resource_emitted: atomic::AtomicBool,
}

impl Default for LogExporter {
fn default() -> Self {
LogExporter {
resource: Resource::builder().build(),
resource: Mutex::new(Resource::builder().build()),
is_shutdown: atomic::AtomicBool::new(false),
resource_emitted: atomic::AtomicBool::new(false),
}
Expand Down Expand Up @@ -49,10 +49,11 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
print_logs(batch);
} else {
println!("Resource");
if let Some(schema_url) = self.resource.schema_url() {
let resource = self.resource.lock().unwrap();
if let Some(schema_url) = resource.schema_url() {
println!("\t Resource SchemaUrl: {:?}", schema_url);
}
self.resource.iter().for_each(|(k, v)| {
resource.iter().for_each(|(k, v)| {
println!("\t -> {}={:?}", k, v);
});
print_logs(batch);
Expand All @@ -63,13 +64,14 @@ impl opentelemetry_sdk::logs::LogExporter for LogExporter {
}
}

fn shutdown(&mut self) -> OTelSdkResult {
fn shutdown(&self) -> OTelSdkResult {
self.is_shutdown.store(true, atomic::Ordering::SeqCst);
Ok(())
}

fn set_resource(&mut self, res: &opentelemetry_sdk::Resource) {
self.resource = res.clone();
fn set_resource(&self, res: &opentelemetry_sdk::Resource) {
let mut res_guard = self.resource.lock().unwrap();
*res_guard = res.clone();
}
}

Expand Down
Loading