Skip to content

Commit c841e53

Browse files
committed
initial commit
1 parent 68af3bb commit c841e53

File tree

1 file changed

+52
-3
lines changed

1 file changed

+52
-3
lines changed

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 52 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -221,12 +221,15 @@ use futures_executor::block_on;
221221
use std::sync::mpsc::sync_channel;
222222
use std::sync::mpsc::RecvTimeoutError;
223223
use std::sync::mpsc::SyncSender;
224+
use std::sync::mpsc::Receiver;
225+
use crate::export::trace::ExportResult;
224226

225227
/// Messages exchanged between the main thread and the background thread.
226228
#[allow(clippy::large_enum_variant)]
227229
#[derive(Debug)]
228230
enum BatchMessage {
229-
ExportSpan(SpanData),
231+
//ExportSpan(SpanData),
232+
ExportSpan(Arc<AtomicBool>),
230233
ForceFlush(SyncSender<TraceResult<()>>),
231234
Shutdown(SyncSender<TraceResult<()>>),
232235
SetResource(Arc<Resource>),
@@ -235,12 +238,16 @@ enum BatchMessage {
235238
/// A batch span processor with a dedicated background thread.
236239
#[derive(Debug)]
237240
pub struct BatchSpanProcessor {
238-
message_sender: SyncSender<BatchMessage>,
241+
span_sender: SyncSender<SpanData>, // Data channel to store spans
242+
message_sender: SyncSender<BatchMessage>, // Control channel to store control messages.
239243
handle: Mutex<Option<thread::JoinHandle<()>>>,
240244
forceflush_timeout: Duration,
241245
shutdown_timeout: Duration,
242246
is_shutdown: AtomicBool,
243247
dropped_span_count: Arc<AtomicUsize>,
248+
export_span_message_sent: Arc<AtomicBool>,
249+
current_batch_size: Arc<AtomicUsize>,
250+
max_export_batch_size: usize,
244251
}
245252

246253
impl BatchSpanProcessor {
@@ -255,7 +262,12 @@ impl BatchSpanProcessor {
255262
where
256263
E: SpanExporter + Send + 'static,
257264
{
258-
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);
265+
let (message_sender, message_receiver) = sync_channel::<SpanData>(config.max_queue_size);
266+
let (message_sender, message_receiver) = sync_channel::<BatchMessage>(64); // Is this a reasonable bound?
267+
let max_queue_size = config.max_queue_size;
268+
let max_export_batch_size = config.max_export_batch_size;
269+
let current_batch_size = Arc::new(AtomicUsize::new(0));
270+
let current_batch_size_for_thread = current_batch_size.clone();
259271

260272
let handle = thread::Builder::new()
261273
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
@@ -268,7 +280,41 @@ impl BatchSpanProcessor {
268280
);
269281
let mut spans = Vec::with_capacity(config.max_export_batch_size);
270282
let mut last_export_time = Instant::now();
283+
let current_batch_size = current_batch_size_for_thread;
284+
// This method gets upto `max_export_batch_size` amount of spans from the channel and exports them.
285+
// It returns the result of the export operation.
286+
// It expects the span vec to be empty when it's called.
287+
#[inline]
288+
fn get_spans_and_export<E>(
289+
spans_receiver: &Receiver<SpanData>,
290+
exporter: &E,
291+
spans: &mut Vec<SpanData>,
292+
last_export_time: &mut Instant,
293+
current_batch_size: &AtomicUsize,
294+
config: &BatchConfig,
295+
) -> ExportResult
296+
where
297+
E: SpanExporter + Send + Sync + 'static,
298+
{
299+
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
300+
while let Ok(log) = spans_receiver.try_recv() {
301+
spans.push(log);
302+
if spans.len() == config.max_export_batch_size {
303+
break;
304+
}
305+
}
306+
307+
let count_of_logs = spans.len(); // Count of logs that will be exported
308+
let result = export_with_timeout_sync(
309+
config.max_export_timeout,
310+
exporter,
311+
spans,
312+
last_export_time,
313+
); // This method clears the logs vec after exporting
271314

315+
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
316+
result
317+
}
272318
loop {
273319
let remaining_time_option = config
274320
.scheduled_delay
@@ -280,6 +326,9 @@ impl BatchSpanProcessor {
280326
match message_receiver.recv_timeout(remaining_time) {
281327
Ok(message) => match message {
282328
BatchMessage::ExportSpan(span) => {
329+
otel_debug!(
330+
name: "BatchSpanProcessor.ExportingDueToBatchSize",
331+
);
283332
spans.push(span);
284333
if spans.len() >= config.max_queue_size
285334
|| last_export_time.elapsed() >= config.scheduled_delay

0 commit comments

Comments
 (0)