Skip to content

Commit 394ca5e

Browse files
committed
add tokiocontext test
1 parent 0a82887 commit 394ca5e

File tree

1 file changed

+113
-8
lines changed

1 file changed

+113
-8
lines changed

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 113 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ impl BatchSpanProcessor {
248248
pub fn new<E>(
249249
mut exporter: E,
250250
config: BatchConfig,
251-
//ax_queue_size: usize,
251+
//max_queue_size: usize,
252252
//scheduled_delay: Duration,
253253
//shutdown_timeout: Duration,
254254
) -> Self
@@ -258,16 +258,21 @@ impl BatchSpanProcessor {
258258
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);
259259

260260
let handle = thread::Builder::new()
261-
.name("BatchSpanProcessorDedicatedThread".to_string())
261+
.name("BatchSpanProcessorThread".to_string())
262262
.spawn(move || {
263263
let mut spans = Vec::new();
264+
spans.reserve(config.max_export_batch_size);
264265
let mut last_export_time = Instant::now();
265266

266267
loop {
267-
let timeout = config
268+
let remaining_time_option = config
268269
.scheduled_delay
269-
.saturating_sub(last_export_time.elapsed());
270-
match message_receiver.recv_timeout(timeout) {
270+
.checked_sub(last_export_time.elapsed());
271+
let remaining_time = match remaining_time_option {
272+
Some(remaining_time) => remaining_time,
273+
None => config.scheduled_delay,
274+
};
275+
match message_receiver.recv_timeout(remaining_time) {
271276
Ok(message) => match message {
272277
BatchMessage::ExportSpan(span) => {
273278
spans.push(span);
@@ -351,14 +356,17 @@ impl SpanProcessor for BatchSpanProcessor {
351356
/// Handles span end.
352357
fn on_end(&self, span: SpanData) {
353358
if self.is_shutdown.load(Ordering::Relaxed) {
354-
eprintln!("Processor is shutdown. Dropping span.");
359+
// this is a warning, as the user is trying to emit after the processor has been shutdown
360+
otel_warn!(
361+
name: "BatchSpanProcessor.Emit.ProcessorShutdown",
362+
);
355363
return;
356364
}
357365
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
358366

359367
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
360368
if result.is_err() {
361-
// Increment dropped logs count. The first time we have to drop a log,
369+
// Increment dropped span count. The first time we have to drop a span,
362370
// emit a warning.
363371
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
364372
otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted",
@@ -384,6 +392,14 @@ impl SpanProcessor for BatchSpanProcessor {
384392

385393
/// Shuts down the processor.
386394
fn shutdown(&self) -> TraceResult<()> {
395+
let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
396+
if dropped_spans > 0 {
397+
otel_warn!(
398+
name: "BatchSpanProcessor.LogsDropped",
399+
dropped_span_count = dropped_spans,
400+
message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
401+
);
402+
}
387403
if self.is_shutdown.swap(true, Ordering::Relaxed) {
388404
return Err(TraceError::Other("Processor already shutdown".into()));
389405
}
@@ -396,7 +412,12 @@ impl SpanProcessor for BatchSpanProcessor {
396412
.recv_timeout(self.shutdown_timeout)
397413
.map_err(|_| TraceError::ExportTimedOut(self.shutdown_timeout))?;
398414
if let Some(handle) = self.handle.lock().unwrap().take() {
399-
handle.join().expect("Failed to join thread");
415+
if let Err(err) = handle.join() {
416+
return Err(TraceError::Other(format!(
417+
"Background thread failed to join during shutdown. This may indicate a panic or unexpected termination: {:?}",
418+
err
419+
).into()));
420+
}
400421
}
401422
result
402423
}
@@ -1032,4 +1053,88 @@ mod tests {
10321053
Some(Value::from("test_service"))
10331054
);
10341055
}
1056+
1057+
#[tokio::test(flavor = "current_thread")]
1058+
async fn test_batch_processor_current_thread_runtime() {
1059+
let exporter = MockSpanExporter::new();
1060+
let exporter_shared = exporter.exported_spans.clone();
1061+
1062+
let config = BatchConfigBuilder::default()
1063+
.with_max_queue_size(5)
1064+
.with_max_export_batch_size(3)
1065+
.with_scheduled_delay(Duration::from_millis(50))
1066+
.build();
1067+
1068+
let processor = BatchSpanProcessor::new(exporter, config);
1069+
1070+
for _ in 0..4 {
1071+
let span = new_test_export_span_data();
1072+
processor.on_end(span);
1073+
}
1074+
1075+
tokio::time::sleep(Duration::from_millis(100)).await;
1076+
1077+
let exported_spans = exporter_shared.lock().unwrap();
1078+
assert_eq!(exported_spans.len(), 4);
1079+
}
1080+
1081+
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
1082+
async fn test_batch_processor_multi_thread_count_1_runtime() {
1083+
let exporter = MockSpanExporter::new();
1084+
let exporter_shared = exporter.exported_spans.clone();
1085+
1086+
let config = BatchConfigBuilder::default()
1087+
.with_max_queue_size(5)
1088+
.with_max_export_batch_size(3)
1089+
.with_scheduled_delay(Duration::from_millis(50))
1090+
.build();
1091+
1092+
let processor = BatchSpanProcessor::new(exporter, config);
1093+
1094+
for _ in 0..4 {
1095+
let span = new_test_export_span_data();
1096+
processor.on_end(span);
1097+
}
1098+
1099+
tokio::time::sleep(Duration::from_millis(100)).await;
1100+
1101+
let exported_spans = exporter_shared.lock().unwrap();
1102+
assert_eq!(exported_spans.len(), 4);
1103+
}
1104+
1105+
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
1106+
async fn test_batch_processor_multi_thread() {
1107+
let exporter = MockSpanExporter::new();
1108+
let exporter_shared = exporter.exported_spans.clone();
1109+
1110+
let config = BatchConfigBuilder::default()
1111+
.with_max_queue_size(20)
1112+
.with_max_export_batch_size(5)
1113+
.with_scheduled_delay(Duration::from_millis(50))
1114+
.build();
1115+
1116+
// Create the processor with the thread-safe exporter
1117+
let processor = Arc::new(BatchSpanProcessor::new(exporter, config));
1118+
1119+
let mut handles = vec![];
1120+
for _ in 0..10 {
1121+
let processor_clone = Arc::clone(&processor);
1122+
let handle = tokio::spawn(async move {
1123+
let span = new_test_export_span_data();
1124+
processor_clone.on_end(span);
1125+
});
1126+
handles.push(handle);
1127+
}
1128+
1129+
for handle in handles {
1130+
handle.await.unwrap();
1131+
}
1132+
1133+
// Allow time for batching and export
1134+
tokio::time::sleep(Duration::from_millis(200)).await;
1135+
1136+
// Verify exported spans
1137+
let exported_spans = exporter_shared.lock().unwrap();
1138+
assert_eq!(exported_spans.len(), 10);
1139+
}
10351140
}

0 commit comments

Comments
 (0)