Skip to content

Commit ef49833

Browse files
authored
Minor followups to LogProcessor (#2464)
1 parent 1f35467 commit ef49833

File tree

5 files changed

+30
-14
lines changed

5 files changed

+30
-14
lines changed

opentelemetry-otlp/examples/basic-otlp/src/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use opentelemetry_sdk::logs::LogError;
88
use opentelemetry_sdk::logs::LoggerProvider;
99
use opentelemetry_sdk::metrics::MetricError;
1010
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
11-
use opentelemetry_sdk::{runtime, trace as sdktrace, Resource};
11+
use opentelemetry_sdk::{trace as sdktrace, Resource};
1212
use std::error::Error;
1313
use tracing::info;
1414
use tracing_subscriber::prelude::*;

opentelemetry-otlp/src/logs.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
//!
33
//! Defines a [LogExporter] to send logs via the OpenTelemetry Protocol (OTLP)
44
5+
#[cfg(feature = "grpc-tonic")]
56
use opentelemetry::otel_debug;
67
use std::fmt::Debug;
78

opentelemetry-sdk/src/logs/log_processor.rs

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::sync::mpsc::{self, RecvTimeoutError, SyncSender};
77

88
#[cfg(feature = "spec_unstable_logs_enabled")]
99
use opentelemetry::logs::Severity;
10-
use opentelemetry::{otel_debug, otel_error, otel_warn, InstrumentationScope};
10+
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn, InstrumentationScope};
1111

1212
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
1313
use std::{cmp::min, env, sync::Mutex};
@@ -207,7 +207,6 @@ impl LogProcessor for BatchLogProcessor {
207207
instrumentation.clone(),
208208
))));
209209

210-
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
211210
if result.is_err() {
212211
// Increment dropped logs count. The first time we have to drop a log,
213212
// emit a warning.
@@ -317,9 +316,14 @@ impl BatchLogProcessor {
317316
let handle = thread::Builder::new()
318317
.name("OpenTelemetry.Logs.BatchProcessor".to_string())
319318
.spawn(move || {
319+
otel_info!(
320+
name: "BatchLogProcessor.ThreadStarted",
321+
interval_in_millisecs = config.scheduled_delay.as_millis(),
322+
max_export_batch_size = config.max_export_batch_size,
323+
max_queue_size = max_queue_size,
324+
);
320325
let mut last_export_time = Instant::now();
321-
let mut logs = Vec::new();
322-
logs.reserve(config.max_export_batch_size);
326+
let mut logs = Vec::with_capacity(config.max_export_batch_size);
323327

324328
loop {
325329
let remaining_time_option = config
@@ -387,6 +391,9 @@ impl BatchLogProcessor {
387391
}
388392
}
389393
}
394+
otel_info!(
395+
name: "BatchLogProcessor.ThreadStopped"
396+
);
390397
})
391398
.expect("Thread spawn failed."); //TODO: Handle thread spawn failure
392399

opentelemetry-sdk/src/trace/span_processor.rs

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@
3737
use crate::export::trace::{SpanData, SpanExporter};
3838
use crate::resource::Resource;
3939
use crate::trace::Span;
40-
use opentelemetry::otel_error;
4140
use opentelemetry::{otel_debug, otel_warn};
41+
use opentelemetry::{otel_error, otel_info};
4242
use opentelemetry::{
4343
trace::{TraceError, TraceResult},
4444
Context,
@@ -258,8 +258,14 @@ impl BatchSpanProcessor {
258258
let (message_sender, message_receiver) = sync_channel(config.max_queue_size);
259259

260260
let handle = thread::Builder::new()
261-
.name("BatchSpanProcessorThread".to_string())
261+
.name("OpenTelemetry.Traces.BatchProcessor".to_string())
262262
.spawn(move || {
263+
otel_info!(
264+
name: "BatchSpanProcessor.ThreadStarted",
265+
interval_in_millisecs = config.scheduled_delay.as_millis(),
266+
max_export_batch_size = config.max_export_batch_size,
267+
max_queue_size = config.max_queue_size,
268+
);
263269
let mut spans = Vec::with_capacity(config.max_export_batch_size);
264270
let mut last_export_time = Instant::now();
265271

@@ -321,6 +327,9 @@ impl BatchSpanProcessor {
321327
}
322328
}
323329
}
330+
otel_info!(
331+
name: "BatchSpanProcessor.ThreadStopped"
332+
);
324333
})
325334
.expect("Failed to spawn thread"); //TODO: Handle thread spawn failure
326335

@@ -363,13 +372,12 @@ impl SpanProcessor for BatchSpanProcessor {
363372
}
364373
let result = self.message_sender.try_send(BatchMessage::ExportSpan(span));
365374

366-
// TODO - Implement throttling to prevent error flooding when the queue is full or closed.
367375
if result.is_err() {
368376
// Increment dropped span count. The first time we have to drop a span,
369377
// emit a warning.
370378
if self.dropped_span_count.fetch_add(1, Ordering::Relaxed) == 0 {
371-
otel_warn!(name: "BatchSpanProcessorDedicatedThread.SpanDroppingStarted",
372-
message = "BatchSpanProcessorDedicatedThread dropped a Span due to queue full/internal errors. No further span will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total logs dropped.");
379+
otel_warn!(name: "BatchSpanProcessor.SpanDroppingStarted",
380+
message = "BatchSpanProcessor dropped a Span due to queue full/internal errors. No further internal log will be emitted for further drops until Shutdown. During Shutdown time, a log will be emitted with exact count of total Spans dropped.");
373381
}
374382
}
375383
}

scripts/integration_tests.sh

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,13 @@ if [ -d "$TEST_DIR" ]; then
1616
# Run tests with the reqwest-client feature
1717
echo
1818
echo ####
19-
echo Integration Tests: Reqwest Client
19+
echo "Integration Tests: Reqwest Client (Disabled now)"
2020
echo ####
2121
echo
2222
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
2323
#cargo test --no-default-features --features "reqwest-client","internal-logs"
2424

25-
# Run tests with the reqwest-client feature
25+
# Run tests with the reqwest-blocking-client feature
2626
echo
2727
echo ####
2828
echo Integration Tests: Reqwest Blocking Client
@@ -33,10 +33,10 @@ if [ -d "$TEST_DIR" ]; then
3333
# Run tests with the hyper-client feature
3434
echo
3535
echo ####
36-
echo Integration Tests: Hyper Client
36+
echo "Integration Tests: Hyper Client (Disabled now)"
3737
echo ####
3838
echo
39-
# TODO: reqwest client is not supported with thread based processor and reader. Enable this test once it is supported.
39+
# TODO: hyper client is not supported with thread based processor and reader. Enable this test once it is supported.
4040
#cargo test --no-default-features --features "hyper-client","internal-logs"
4141
else
4242
echo "Directory $TEST_DIR does not exist. Skipping tests."

0 commit comments

Comments
 (0)