@@ -229,6 +229,7 @@ enum BatchMessage {
229
229
ExportSpan ( SpanData ) ,
230
230
ForceFlush ( SyncSender < TraceResult < ( ) > > ) ,
231
231
Shutdown ( SyncSender < TraceResult < ( ) > > ) ,
232
+ SetResource ( Arc < Resource > ) ,
232
233
}
233
234
234
235
/// A batch span processor with a dedicated background thread.
@@ -292,6 +293,9 @@ impl BatchSpanProcessor {
292
293
let _ = sender. send ( result) ;
293
294
break ;
294
295
}
296
+ BatchMessage :: SetResource ( resource) => {
297
+ exporter. set_resource ( & resource) ;
298
+ }
295
299
} ,
296
300
Err ( RecvTimeoutError :: Timeout ) => {
297
301
if last_export_time. elapsed ( ) >= config. scheduled_delay {
@@ -396,6 +400,14 @@ impl SpanProcessor for BatchSpanProcessor {
396
400
}
397
401
result
398
402
}
403
+
404
+ /// Set the resource for the processor.
405
+ fn set_resource ( & mut self , resource : & Resource ) {
406
+ let resource = Arc :: new ( resource. clone ( ) ) ;
407
+ let _ = self
408
+ . message_sender
409
+ . try_send ( BatchMessage :: SetResource ( resource) ) ;
410
+ }
399
411
}
400
412
401
413
/// Builder for `BatchSpanProcessorDedicatedThread`.
@@ -782,20 +794,24 @@ mod tests {
782
794
}
783
795
}
784
796
797
+ use crate :: Resource ;
785
798
use futures_util:: future:: BoxFuture ;
786
799
use futures_util:: FutureExt ;
800
+ use opentelemetry:: { Key , KeyValue , Value } ;
787
801
use std:: sync:: { atomic:: Ordering , Arc , Mutex } ;
788
802
789
803
// Mock exporter to test functionality
790
804
#[ derive( Debug ) ]
791
805
struct MockSpanExporter {
792
806
exported_spans : Arc < Mutex < Vec < SpanData > > > ,
807
+ exported_resource : Arc < Mutex < Option < Resource > > > ,
793
808
}
794
809
795
810
impl MockSpanExporter {
796
811
fn new ( ) -> Self {
797
812
Self {
798
813
exported_spans : Arc :: new ( Mutex :: new ( Vec :: new ( ) ) ) ,
814
+ exported_resource : Arc :: new ( Mutex :: new ( None ) ) ,
799
815
}
800
816
}
801
817
}
@@ -811,6 +827,10 @@ mod tests {
811
827
}
812
828
813
829
fn shutdown ( & mut self ) { }
830
+ fn set_resource ( & mut self , resource : & Resource ) {
831
+ let mut exported_resource = self . exported_resource . lock ( ) . unwrap ( ) ;
832
+ * exported_resource = Some ( resource. clone ( ) ) ;
833
+ }
814
834
}
815
835
816
836
#[ test]
@@ -947,4 +967,69 @@ mod tests {
947
967
let dropped_count = processor. dropped_span_count . load ( Ordering :: Relaxed ) ;
948
968
assert_eq ! ( dropped_count, 1 , "Unexpected number of dropped spans" ) ;
949
969
}
970
+
971
+ #[ test]
972
+ fn validate_span_attributes_exported_correctly ( ) {
973
+ let exporter = MockSpanExporter :: new ( ) ;
974
+ let exporter_shared = exporter. exported_spans . clone ( ) ;
975
+ let config = BatchConfigBuilder :: default ( ) . build ( ) ;
976
+ let processor = BatchSpanProcessor :: new ( exporter, config) ;
977
+
978
+ // Create a span with attributes
979
+ let mut span_data = create_test_span ( "attribute_validation" ) ;
980
+ span_data. attributes = vec ! [
981
+ KeyValue :: new( "key1" , "value1" ) ,
982
+ KeyValue :: new( "key2" , "value2" ) ,
983
+ ] ;
984
+ processor. on_end ( span_data. clone ( ) ) ;
985
+
986
+ // Force flush to export the span
987
+ let _ = processor. force_flush ( ) ;
988
+
989
+ // Validate the exported attributes
990
+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
991
+ assert_eq ! ( exported_spans. len( ) , 1 ) ;
992
+ let exported_span = & exported_spans[ 0 ] ;
993
+ assert ! ( exported_span
994
+ . attributes
995
+ . contains( & KeyValue :: new( "key1" , "value1" ) ) ) ;
996
+ assert ! ( exported_span
997
+ . attributes
998
+ . contains( & KeyValue :: new( "key2" , "value2" ) ) ) ;
999
+ }
1000
+
1001
+ #[ test]
1002
+ fn batchspanprocessor_sets_and_exports_with_resource ( ) {
1003
+ let exporter = MockSpanExporter :: new ( ) ;
1004
+ let exporter_shared = exporter. exported_spans . clone ( ) ;
1005
+ let resource_shared = exporter. exported_resource . clone ( ) ;
1006
+ let config = BatchConfigBuilder :: default ( ) . build ( ) ;
1007
+ let mut processor = BatchSpanProcessor :: new ( exporter, config) ;
1008
+
1009
+ // Set a resource for the processor
1010
+ let resource = Resource :: new ( vec ! [ KeyValue :: new( "service.name" , "test_service" ) ] ) ;
1011
+ processor. set_resource ( & resource) ;
1012
+
1013
+ // Create a span and send it to the processor
1014
+ let test_span = create_test_span ( "resource_test" ) ;
1015
+ processor. on_end ( test_span. clone ( ) ) ;
1016
+
1017
+ // Force flush to ensure the span is exported
1018
+ let _ = processor. force_flush ( ) ;
1019
+
1020
+ // Validate spans are exported
1021
+ let exported_spans = exporter_shared. lock ( ) . unwrap ( ) ;
1022
+ assert_eq ! ( exported_spans. len( ) , 1 ) ;
1023
+
1024
+ // Validate the resource is correctly set in the exporter
1025
+ let exported_resource = resource_shared. lock ( ) . unwrap ( ) ;
1026
+ assert ! ( exported_resource. is_some( ) ) ;
1027
+ assert_eq ! (
1028
+ exported_resource
1029
+ . as_ref( )
1030
+ . unwrap( )
1031
+ . get( Key :: new( "service.name" ) ) ,
1032
+ Some ( Value :: from( "test_service" ) )
1033
+ ) ;
1034
+ }
950
1035
}
0 commit comments