37
37
use crate :: export:: trace:: { SpanData , SpanExporter } ;
38
38
use crate :: resource:: Resource ;
39
39
use crate :: trace:: Span ;
40
+ use opentelemetry:: otel_error;
40
41
use opentelemetry:: { otel_debug, otel_warn} ;
41
42
use opentelemetry:: {
42
43
trace:: { TraceError , TraceResult } ,
@@ -162,6 +163,60 @@ impl SpanProcessor for SimpleSpanProcessor {
162
163
}
163
164
}
164
165
166
+ /// The `BatchSpanProcessor` collects finished spans in a buffer and exports them
167
+ /// in batches to the configured `SpanExporter`. This processor is ideal for
168
+ /// high-throughput environments, as it minimizes the overhead of exporting spans
169
+ /// individually. It uses a **dedicated background thread** to manage and export spans
170
+ /// asynchronously, ensuring that the application's main execution flow is not blocked.
171
+ ///
172
+ /// /// # Example
173
+ ///
174
+ /// This example demonstrates how to configure and use the `BatchSpanProcessor`
175
+ /// with a custom configuration. Note that a dedicated thread is used internally
176
+ /// to manage the export process.
177
+ ///
178
+ /// ```rust
179
+ /// use opentelemetry::global;
180
+ /// use opentelemetry_sdk::{
181
+ /// trace::{BatchSpanProcessor, BatchConfigBuilder, TracerProvider},
182
+ /// runtime,
183
+ /// testing::trace::NoopSpanExporter,
184
+ /// };
185
+ /// use opentelemetry::trace::Tracer as _;
186
+ /// use opentelemetry::trace::Span;
187
+ /// use std::time::Duration;
188
+ ///
189
+ /// fn main() {
190
+ /// // Step 1: Create an exporter (e.g., a No-Op Exporter for demonstration).
191
+ /// let exporter = NoopSpanExporter::new();
192
+ ///
193
+ /// // Step 2: Configure the BatchSpanProcessor.
194
+ /// let batch_processor = BatchSpanProcessor::builder(exporter)
195
+ /// .with_batch_config(
196
+ /// BatchConfigBuilder::default()
197
+ /// .with_max_queue_size(1024) // Buffer up to 1024 spans.
198
+ /// .with_max_export_batch_size(256) // Export in batches of up to 256 spans.
199
+ /// .with_scheduled_delay(Duration::from_secs(5)) // Export every 5 seconds.
200
+ /// .with_max_export_timeout(Duration::from_secs(10)) // Timeout after 10 seconds.
201
+ /// .build(),
202
+ /// )
203
+ /// .build();
204
+ ///
205
+ /// // Step 3: Set up a TracerProvider with the configured processor.
206
+ /// let provider = TracerProvider::builder()
207
+ /// .with_span_processor(batch_processor)
208
+ /// .build();
209
+ /// global::set_tracer_provider(provider.clone());
210
+ ///
211
+ /// // Step 4: Create spans and record operations.
212
+ /// let tracer = global::tracer("example-tracer");
213
+ /// let mut span = tracer.start("example-span");
214
+ /// span.end(); // Mark the span as completed.
215
+ ///
216
+ /// // Step 5: Ensure all spans are flushed before exiting.
217
+ /// provider.shutdown();
218
+ /// }
219
+ /// ```
165
220
use futures_executor:: block_on;
166
221
use std:: sync:: mpsc:: sync_channel;
167
222
use std:: sync:: mpsc:: RecvTimeoutError ;
@@ -220,7 +275,10 @@ impl BatchSpanProcessor {
220
275
{
221
276
if let Err ( err) = block_on ( exporter. export ( spans. split_off ( 0 ) ) )
222
277
{
223
- eprintln ! ( "Export error: {:?}" , err) ;
278
+ otel_error ! (
279
+ name: "BatchSpanProcessor.ExportError" ,
280
+ error = format!( "{}" , err)
281
+ ) ;
224
282
}
225
283
last_export_time = Instant :: now ( ) ;
226
284
}
@@ -238,19 +296,25 @@ impl BatchSpanProcessor {
238
296
Err ( RecvTimeoutError :: Timeout ) => {
239
297
if last_export_time. elapsed ( ) >= config. scheduled_delay {
240
298
if let Err ( err) = block_on ( exporter. export ( spans. split_off ( 0 ) ) ) {
241
- eprintln ! ( "Export error: {:?}" , err) ;
299
+ otel_error ! (
300
+ name: "BatchSpanProcessor.ExportError" ,
301
+ error = format!( "{}" , err)
302
+ ) ;
242
303
}
243
304
last_export_time = Instant :: now ( ) ;
244
305
}
245
306
}
246
307
Err ( RecvTimeoutError :: Disconnected ) => {
247
- eprintln ! ( "Channel disconnected, shutting down processor thread." ) ;
308
+ otel_error ! (
309
+ name: "BatchLogProcessor.InternalError.ChannelDisconnected" ,
310
+ message = "Channel disconnected, shutting down processor thread."
311
+ ) ;
248
312
break ;
249
313
}
250
314
}
251
315
}
252
316
} )
253
- . expect ( "Failed to spawn thread" ) ;
317
+ . expect ( "Failed to spawn thread" ) ; //TODO: Handle thread spawn failure
254
318
255
319
Self {
256
320
message_sender,
@@ -720,7 +784,7 @@ mod tests {
720
784
721
785
use futures_util:: future:: BoxFuture ;
722
786
use futures_util:: FutureExt ;
723
- use std:: sync:: { Arc , Mutex } ;
787
+ use std:: sync:: { atomic :: Ordering , Arc , Mutex } ;
724
788
725
789
// Mock exporter to test functionality
726
790
#[ derive( Debug ) ]
@@ -838,4 +902,49 @@ mod tests {
838
902
"Shutdown should fail when called a second time"
839
903
) ;
840
904
}
905
+
906
+ #[ test]
907
+ fn batchspanprocessor_handles_dropped_spans ( ) {
908
+ let exporter = MockSpanExporter :: new ( ) ;
909
+ let exporter_shared = exporter. exported_spans . clone ( ) ; // Shared access to verify exported spans
910
+ let config = BatchConfigBuilder :: default ( )
911
+ . with_max_queue_size ( 2 ) // Small queue size to test span dropping
912
+ . with_scheduled_delay ( Duration :: from_secs ( 5 ) )
913
+ . with_max_export_timeout ( Duration :: from_secs ( 2 ) )
914
+ . build ( ) ;
915
+ let processor = BatchSpanProcessor :: new ( exporter, config) ;
916
+
917
+ // Create test spans and send them to the processor
918
+ let span1 = create_test_span ( "span1" ) ;
919
+ let span2 = create_test_span ( "span2" ) ;
920
+ let span3 = create_test_span ( "span3" ) ; // This span should be dropped
921
+
922
+ processor. on_end ( span1. clone ( ) ) ;
923
+ processor. on_end ( span2. clone ( ) ) ;
924
+ processor. on_end ( span3. clone ( ) ) ; // This span exceeds the queue size
925
+
926
+ // Wait for the scheduled delay to expire
927
+ std:: thread:: sleep ( Duration :: from_secs ( 3 ) ) ;
928
+
929
+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
930
+
931
+ // Verify that only the first two spans are exported
932
+ assert_eq ! (
933
+ exported_spans. len( ) ,
934
+ 2 ,
935
+ "Unexpected number of exported spans"
936
+ ) ;
937
+ assert ! ( exported_spans. iter( ) . any( |s| s. name == "span1" ) ) ;
938
+ assert ! ( exported_spans. iter( ) . any( |s| s. name == "span2" ) ) ;
939
+
940
+ // Ensure the third span is dropped
941
+ assert ! (
942
+ !exported_spans. iter( ) . any( |s| s. name == "span3" ) ,
943
+ "Span3 should have been dropped"
944
+ ) ;
945
+
946
+ // Verify dropped spans count (if accessible in your implementation)
947
+ let dropped_count = processor. dropped_span_count . load ( Ordering :: Relaxed ) ;
948
+ assert_eq ! ( dropped_count, 1 , "Unexpected number of dropped spans" ) ;
949
+ }
841
950
}
0 commit comments