Skip to content

Commit 0f38bd4

Browse files
committed
initial commit
1 parent c841e53 commit 0f38bd4

File tree

1 file changed

+163
-67
lines changed

1 file changed

+163
-67
lines changed

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 163 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub trait SpanProcessor: Send + Sync + std::fmt::Debug {
8585
/// `on_end` is called after a `Span` is ended (i.e., the end timestamp is
8686
/// already set). This method is called synchronously within the `Span::end`
8787
/// API, therefore it should not block or throw an exception.
88+
/// TODO - This method should take reference to `SpanData`
8889
fn on_end(&self, span: SpanData);
8990
/// Force the spans lying in the cache to be exported.
9091
fn force_flush(&self) -> TraceResult<()>;
@@ -163,6 +164,7 @@ impl SpanProcessor for SimpleSpanProcessor {
163164
}
164165
}
165166

167+
use crate::export::trace::ExportResult;
166168
/// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
167169
/// in batches to the configured `SpanExporter`. This processor is ideal for
168170
/// high-throughput environments, as it minimizes the overhead of exporting spans
@@ -217,12 +219,10 @@ impl SpanProcessor for SimpleSpanProcessor {
217219
/// provider.shutdown();
218220
/// }
219221
/// ```
220-
use futures_executor::block_on;
221222
use std::sync::mpsc::sync_channel;
223+
use std::sync::mpsc::Receiver;
222224
use std::sync::mpsc::RecvTimeoutError;
223225
use std::sync::mpsc::SyncSender;
224-
use std::sync::mpsc::Receiver;
225-
use crate::export::trace::ExportResult;
226226

227227
/// Messages exchanged between the main thread and the background thread.
228228
#[allow(clippy::large_enum_variant)]
@@ -248,6 +248,7 @@ pub struct BatchSpanProcessor {
248248
export_span_message_sent: Arc<AtomicBool>,
249249
current_batch_size: Arc<AtomicUsize>,
250250
max_export_batch_size: usize,
251+
max_queue_size: usize,
251252
}
252253

253254
impl BatchSpanProcessor {
@@ -262,7 +263,7 @@ impl BatchSpanProcessor {
262263
where
263264
E: SpanExporter + Send + 'static,
264265
{
265-
let (message_sender, message_receiver) = sync_channel::<SpanData>(config.max_queue_size);
266+
let (span_sender, span_receiver) = sync_channel::<SpanData>(config.max_queue_size);
266267
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
267268
let max_queue_size = config.max_queue_size;
268269
let max_export_batch_size = config.max_export_batch_size;
@@ -281,40 +282,6 @@ impl BatchSpanProcessor {
281282
let mut spans = Vec::with_capacity(config.max_export_batch_size);
282283
let mut last_export_time = Instant::now();
283284
let current_batch_size = current_batch_size_for_thread;
284-
// This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
285-
// It returns the result of the export operation.
286-
// It expects the span vec to be empty when it's called.
287-
#[inline]
288-
fn get_spans_and_export<E>(
289-
spans_receiver: &Receiver<SpanData>,
290-
exporter: &E,
291-
spans: &mut Vec<SpanData>,
292-
last_export_time: &mut Instant,
293-
current_batch_size: &AtomicUsize,
294-
config: &BatchConfig,
295-
) -> ExportResult
296-
where
297-
E: SpanExporter + Send + Sync + 'static,
298-
{
299-
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
300-
while let Ok(log) = spans_receiver.try_recv() {
301-
spans.push(log);
302-
if spans.len() == config.max_export_batch_size {
303-
break;
304-
}
305-
}
306-
307-
let count_of_logs = spans.len(); // Count of logs that will be exported
308-
let result = export_with_timeout_sync(
309-
config.max_export_timeout,
310-
exporter,
311-
spans,
312-
last_export_time,
313-
); // This method clears the logs vec after exporting
314-
315-
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
316-
result
317-
}
318285
loop {
319286
let remaining_time_option = config
320287
.scheduled_delay
@@ -325,47 +292,71 @@ impl BatchSpanProcessor {
325292
};
326293
match message_receiver.recv_timeout(remaining_time) {
327294
Ok(message) => match message {
328-
BatchMessage::ExportSpan(span) => {
295+
BatchMessage::ExportSpan(export_span_message_sent) => {
329296
otel_debug!(
330297
name: "BatchSpanProcessor.ExportingDueToBatchSize",
331298
);
332-
spans.push(span);
333-
if spans.len() >= config.max_queue_size
334-
|| last_export_time.elapsed() >= config.scheduled_delay
335-
{
336-
if let Err(err) = block_on(exporter.export(spans.split_off(0)))
337-
{
338-
otel_error!(
339-
name: "BatchSpanProcessor.ExportError",
340-
error = format!("{}", err)
341-
);
342-
}
343-
last_export_time = Instant::now();
344-
}
299+
let _ = Self::get_spans_and_export(
300+
&span_receiver,
301+
&mut exporter,
302+
&mut spans,
303+
&mut last_export_time,
304+
&current_batch_size,
305+
&config,
306+
);
307+
// Reset the export span message sent flag now it has has been processed.
308+
export_span_message_sent.store(false, Ordering::Relaxed);
345309
}
346310
BatchMessage::ForceFlush(sender) => {
347-
let result = block_on(exporter.export(spans.split_off(0)));
311+
otel_debug!(name: "BatchSpanProcessor.ExportingDueToForceFlush");
312+
let result = Self::get_spans_and_export(
313+
&span_receiver,
314+
&mut exporter,
315+
&mut spans,
316+
&mut last_export_time,
317+
&current_batch_size,
318+
&config,
319+
);
348320
let _ = sender.send(result);
349321
}
350322
BatchMessage::Shutdown(sender) => {
351-
let result = block_on(exporter.export(spans.split_off(0)));
323+
otel_debug!(name: "BatchSpanProcessor.ExportingDueToShutdown");
324+
let result = Self::get_spans_and_export(
325+
&span_receiver,
326+
&mut exporter,
327+
&mut spans,
328+
&mut last_export_time,
329+
&current_batch_size,
330+
&config,
331+
);
352332
let _ = sender.send(result);
333+
334+
otel_debug!(
335+
name: "BatchSpanProcessor.ThreadExiting",
336+
reason = "ShutdownRequested"
337+
);
338+
//
339+
// break out the loop and return from the current background thread.
340+
//
353341
break;
354342
}
355343
BatchMessage::SetResource(resource) => {
356344
exporter.set_resource(&resource);
357345
}
358346
},
359347
Err(RecvTimeoutError::Timeout) => {
360-
if last_export_time.elapsed() >= config.scheduled_delay {
361-
if let Err(err) = block_on(exporter.export(spans.split_off(0))) {
362-
otel_error!(
363-
name: "BatchSpanProcessor.ExportError",
364-
error = format!("{}", err)
365-
);
366-
}
367-
last_export_time = Instant::now();
368-
}
348+
otel_debug!(
349+
name: "BatchLogProcessor.ExportingDueToTimer",
350+
);
351+
352+
let _ = Self::get_spans_and_export(
353+
&span_receiver,
354+
&mut exporter,
355+
&mut spans,
356+
&mut last_export_time,
357+
&current_batch_size,
358+
&config,
359+
);
369360
}
370361
Err(RecvTimeoutError::Disconnected) => {
371362
// Channel disconnected, only thing to do is break
@@ -385,12 +376,17 @@ impl BatchSpanProcessor {
385376
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure
386377

387378
Self {
379+
span_sender,
388380
message_sender,
389381
handle: Mutex::new(Some(handle)),
390382
forceflush_timeout: Duration::from_secs(5), // TODO: make this configurable
391383
shutdown_timeout: Duration::from_secs(5), // TODO: make this configurable
392384
is_shutdown: AtomicBool::new(false),
393385
dropped_span_count: Arc::new(AtomicUsize::new(0)),
386+
max_queue_size,
387+
export_span_message_sent: Arc::new(AtomicBool::new(false)),
388+
current_batch_size,
389+
max_export_batch_size,
394390
}
395391
}
396392

@@ -404,6 +400,72 @@ impl BatchSpanProcessor {
404400
config: BatchConfig::default(),
405401
}
406402
}
403+
404+
// This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
405+
// It returns the result of the export operation.
406+
// It expects the span vec to be empty when it's called.
407+
#[inline]
408+
fn get_spans_and_export<E>(
409+
spans_receiver: &Receiver<SpanData>,
410+
exporter: &mut E,
411+
spans: &mut Vec<SpanData>,
412+
last_export_time: &mut Instant,
413+
current_batch_size: &AtomicUsize,
414+
config: &BatchConfig,
415+
) -> ExportResult
416+
where
417+
E: SpanExporter + Send + Sync + 'static,
418+
{
419+
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
420+
while let Ok(log) = spans_receiver.try_recv() {
421+
spans.push(log);
422+
if spans.len() == config.max_export_batch_size {
423+
break;
424+
}
425+
}
426+
427+
let count_of_logs = spans.len(); // Count of logs that will be exported
428+
let result = Self::export_with_timeout_sync(
429+
config.max_export_timeout,
430+
exporter,
431+
spans,
432+
last_export_time,
433+
); // This method clears the logs vec after exporting
434+
435+
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
436+
result
437+
}
438+
439+
#[allow(clippy::vec_box)]
440+
fn export_with_timeout_sync<E>(
441+
_: Duration, // TODO, enforcing timeout in exporter.
442+
exporter: &mut E,
443+
batch: &mut Vec<SpanData>,
444+
last_export_time: &mut Instant,
445+
) -> ExportResult
446+
where
447+
E: SpanExporter + Send + Sync + 'static,
448+
{
449+
*last_export_time = Instant::now();
450+
451+
if batch.is_empty() {
452+
return TraceResult::Ok(());
453+
}
454+
455+
let export = exporter.export(batch.split_off(0));
456+
let export_result = futures_executor::block_on(export);
457+
458+
match export_result {
459+
Ok(_) => TraceResult::Ok(()),
460+
Err(err) => {
461+
otel_error!(
462+
name: "BatchLogProcessor.ExportError",
463+
error = format!("{}", err)
464+
);
465+
TraceResult::Err(err)
466+
}
467+
}
468+
}
407469
}
408470

409471
impl SpanProcessor for BatchSpanProcessor {
@@ -418,10 +480,11 @@ impl SpanProcessor for BatchSpanProcessor {
418480
// this is a warning, as the user is trying to emit after the processor has been shutdown
419481
otel_warn!(
420482
name: "BatchSpanProcessor.Emit.ProcessorShutdown",
483+
message = "BatchSpanProcessor has been shutdown. No further spans will be emitted."
421484
);
422485
return;
423486
}
424-
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
487+
let result = self.span_sender.try_send(span);
425488

426489
if result.is_err() {
427490
// Increment dropped span count. The first time we have to drop a span,
@@ -431,6 +494,36 @@ impl SpanProcessor for BatchSpanProcessor {
431494
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
432495
}
433496
}
497+
// At this point, sending the log record to the data channel was successful.
498+
// Increment the current batch size and check if it has reached the max export batch size.
499+
if self.current_batch_size.fetch_add(1, Ordering::Relaxed) + 1 >= self.max_export_batch_size
500+
{
501+
// Check if the a control message for exporting logs is already sent to the worker thread.
502+
// If not, send a control message to export logs.
503+
// `export_span_message_sent` is set to false ONLY when the worker thread has processed the control message.
504+
505+
if !self.export_span_message_sent.load(Ordering::Relaxed) {
506+
// This is a cost-efficient check as atomic load operations do not require exclusive access to cache line.
507+
// Perform atomic swap to `export_log_message_sent` ONLY when the atomic load operation above returns false.
508+
// Atomic swap/compare_exchange operations require exclusive access to cache line on most processor architectures.
509+
// We could have used compare_exchange as well here, but it's more verbose than swap.
510+
if !self.export_span_message_sent.swap(true, Ordering::Relaxed) {
511+
match self.message_sender.try_send(BatchMessage::ExportSpan(
512+
self.export_span_message_sent.clone(),
513+
)) {
514+
Ok(_) => {
515+
// Control message sent successfully.
516+
}
517+
Err(_err) => {
518+
// TODO: Log error
519+
// If the control message could not be sent, reset the `export_log_message_sent` flag.
520+
self.export_span_message_sent
521+
.store(false, Ordering::Relaxed);
522+
}
523+
}
524+
}
525+
}
526+
}
434527
}
435528

436529
/// Flushes all pending spans.
@@ -450,17 +543,20 @@ impl SpanProcessor for BatchSpanProcessor {
450543

451544
/// Shuts down the processor.
452545
fn shutdown(&self) -> TraceResult<()> {
546+
if self.is_shutdown.swap(true, Ordering::Relaxed) {
547+
return Err(TraceError::Other("Processor already shutdown".into()));
548+
}
453549
let dropped_spans = self.dropped_span_count.load(Ordering::Relaxed);
550+
let max_queue_size = self.max_queue_size;
454551
if dropped_spans > 0 {
455552
otel_warn!(
456553
name: "BatchSpanProcessor.LogsDropped",
457554
dropped_span_count = dropped_spans,
555+
max_queue_size = max_queue_size,
458556
message = "Spans were dropped due to a queue being full or other error. The count represents the total count of spans dropped in the lifetime of this BatchSpanProcessor. Consider increasing the queue size and/or decrease delay between intervals."
459557
);
460558
}
461-
if self.is_shutdown.swap(true, Ordering::Relaxed) {
462-
return Err(TraceError::Other("Processor already shutdown".into()));
463-
}
559+
464560
let (sender, receiver) = sync_channel(1);
465561
self.message_sender
466562
.try_send(BatchMessage::Shutdown(sender))

0 commit comments

Comments
 (0)