Skip to content

Commit 5aa9120

Browse files
authored
Fix BatchLogProcessor (open-telemetry#2510)
1 parent 888d5a3 commit 5aa9120

File tree

1 file changed

+26
-18
lines changed

1 file changed

+26
-18
lines changed

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 26 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl<T: LogExporter> LogProcessor for SimpleLogProcessor<T> {
199199
#[allow(clippy::large_enum_variant)]
200200
#[derive(Debug)]
201201
enum BatchMessage {
202-
/// This ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
202+
/// This is ONLY sent when the number of logs records in the data channel has reached `max_export_batch_size`.
203203
ExportLog(Arc<AtomicBool>),
204204
/// ForceFlush flushes the current buffer to the exporter.
205205
ForceFlush(mpsc::SyncSender<ExportResult>),
@@ -457,23 +457,31 @@ impl BatchLogProcessor {
457457
where
458458
E: LogExporter + Send + Sync + 'static,
459459
{
460-
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
461-
while let Ok(log) = logs_receiver.try_recv() {
462-
logs.push(log);
463-
if logs.len() == config.max_export_batch_size {
464-
break;
460+
let target = current_batch_size.load(Ordering::Relaxed); // `target` is used to determine the stopping criteria for exporting logs.
461+
let mut result = LogResult::Ok(());
462+
let mut total_exported_logs: usize = 0;
463+
464+
while target > 0 && total_exported_logs < target {
465+
// Get upto `max_export_batch_size` amount of logs log records from the channel and push them to the logs vec
466+
while let Ok(log) = logs_receiver.try_recv() {
467+
logs.push(log);
468+
if logs.len() == config.max_export_batch_size {
469+
break;
470+
}
465471
}
466-
}
467472

468-
let count_of_logs = logs.len(); // Count of logs that will be exported
469-
let result = export_with_timeout_sync(
470-
config.max_export_timeout,
471-
exporter,
472-
logs,
473-
last_export_time,
474-
); // This method clears the logs vec after exporting
473+
let count_of_logs = logs.len(); // Count of logs that will be exported
474+
total_exported_logs += count_of_logs;
475+
476+
result = export_with_timeout_sync(
477+
config.max_export_timeout,
478+
exporter,
479+
logs,
480+
last_export_time,
481+
); // This method clears the logs vec after exporting
475482

476-
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
483+
current_batch_size.fetch_sub(count_of_logs, Ordering::Relaxed);
484+
}
477485
result
478486
}
479487

@@ -485,6 +493,9 @@ impl BatchLogProcessor {
485493

486494
match message_receiver.recv_timeout(remaining_time) {
487495
Ok(BatchMessage::ExportLog(export_log_message_sent)) => {
496+
// Reset the export log message sent flag now it has has been processed.
497+
export_log_message_sent.store(false, Ordering::Relaxed);
498+
488499
otel_debug!(
489500
name: "BatchLogProcessor.ExportingDueToBatchSize",
490501
);
@@ -497,9 +508,6 @@ impl BatchLogProcessor {
497508
&current_batch_size,
498509
&config,
499510
);
500-
501-
// Reset the export log message sent flag now it has has been processed.
502-
export_log_message_sent.store(false, Ordering::Relaxed);
503511
}
504512
Ok(BatchMessage::ForceFlush(sender)) => {
505513
otel_debug!(name: "BatchLogProcessor.ExportingDueToForceFlush");

0 commit comments

Comments
 (0)