@@ -296,23 +296,27 @@ async fn run(
296
296
297
297
// construct Metric Attributes and initialize Metrics
298
298
let metric_attributes = vec ! [
299
- ( "version" . to_string ( ) , version. to_string( ) ) ,
300
- ( "role" . to_string ( ) , "lightnode" . to_string( ) ) ,
301
- ( "origin" . to_string ( ) , cfg. origin. to_string( ) ) ,
302
- ( "peerID" . to_string ( ) , peer_id. to_string( ) ) ,
303
- ( "avail_address" . to_string ( ) , identity_cfg. avail_public_key) ,
304
- ( "network" . to_string ( ) , Network :: name( & cfg. genesis_hash) ) ,
305
- ( "client_id" . to_string ( ) , client_id. to_string( ) ) ,
306
- ( "execution_id" . to_string ( ) , execution_id. to_string( ) ) ,
299
+ ( "version" , version. to_string( ) ) ,
300
+ ( "role" , "lightnode" . to_string( ) ) ,
301
+ ( "origin" , cfg. origin. to_string( ) ) ,
302
+ ( "peerID" , peer_id. to_string( ) ) ,
303
+ ( "avail_address" , identity_cfg. avail_public_key) ,
304
+ ( "network" , Network :: name( & cfg. genesis_hash) ) ,
305
+ ( "client_id" , client_id. to_string( ) ) ,
306
+ ( "execution_id" , execution_id. to_string( ) ) ,
307
307
(
308
- "client_alias" . to_string ( ) ,
308
+ "client_alias" ,
309
309
cfg. client_alias. clone( ) . unwrap_or( "" . to_string( ) ) ,
310
310
) ,
311
311
] ;
312
312
313
- let metrics =
314
- telemetry:: otlp:: initialize ( cfg. project_name . clone ( ) , & cfg. origin , cfg. otel . clone ( ) )
315
- . wrap_err ( "Unable to initialize OpenTelemetry service" ) ?;
313
+ let metrics = telemetry:: otlp:: initialize (
314
+ cfg. project_name . clone ( ) ,
315
+ & cfg. origin ,
316
+ cfg. otel . clone ( ) ,
317
+ metric_attributes,
318
+ )
319
+ . wrap_err ( "Unable to initialize OpenTelemetry service" ) ?;
316
320
317
321
let rpc_host = db
318
322
. get ( RpcNodeKey )
@@ -324,7 +328,6 @@ async fn run(
324
328
cfg. libp2p . kademlia . operation_mode . into ( ) ,
325
329
rpc_host,
326
330
Multiaddr :: empty ( ) ,
327
- metric_attributes,
328
331
) ;
329
332
330
333
spawn_in_span ( shutdown. with_cancel ( async move {
@@ -446,52 +449,33 @@ impl BlockStat {
446
449
447
450
struct ClientState {
448
451
metrics : Metrics ,
449
- kad_mode : Mode ,
450
- multiaddress : Multiaddr ,
451
- rpc_host : String ,
452
- metric_attributes : Vec < ( String , String ) > ,
453
452
active_blocks : HashMap < u32 , BlockStat > ,
454
453
}
455
454
456
455
impl ClientState {
457
- fn new (
458
- metrics : Metrics ,
459
- kad_mode : Mode ,
460
- rpc_host : String ,
461
- multiaddress : Multiaddr ,
462
- metric_attributes : Vec < ( String , String ) > ,
463
- ) -> Self {
464
- ClientState {
456
+ fn new ( metrics : Metrics , kad_mode : Mode , rpc_host : String , multiaddress : Multiaddr ) -> Self {
457
+ let mut state = ClientState {
465
458
metrics,
466
- kad_mode,
467
- multiaddress,
468
- rpc_host,
469
- metric_attributes,
470
459
active_blocks : Default :: default ( ) ,
471
- }
460
+ } ;
461
+ state. update_operating_mode ( kad_mode) ;
462
+ state. update_rpc_host ( rpc_host) ;
463
+ state. update_multiaddress ( multiaddress) ;
464
+ state
472
465
}
473
466
474
467
fn update_multiaddress ( & mut self , value : Multiaddr ) {
475
- self . multiaddress = value;
468
+ self . metrics
469
+ . set_attribute ( "multiaddress" , value. to_string ( ) ) ;
476
470
}
477
471
478
472
fn update_operating_mode ( & mut self , value : Mode ) {
479
- self . kad_mode = value;
473
+ self . metrics
474
+ . set_attribute ( "operating_mode" , value. to_string ( ) ) ;
480
475
}
481
476
482
477
fn update_rpc_host ( & mut self , value : String ) {
483
- self . rpc_host = value;
484
- }
485
-
486
- fn attributes ( & self ) -> Vec < ( String , String ) > {
487
- let mut attrs = vec ! [
488
- ( "operating_mode" . to_string( ) , self . kad_mode. to_string( ) ) ,
489
- ( "multiaddress" . to_string( ) , self . multiaddress. to_string( ) ) ,
490
- ( "rpc_host" . to_string( ) , self . rpc_host. to_string( ) ) ,
491
- ] ;
492
-
493
- attrs. extend ( self . metric_attributes . clone ( ) ) ;
494
- attrs
478
+ self . metrics . set_attribute ( "rpc_host" , value) ;
495
479
}
496
480
497
481
fn get_block_stat ( & mut self , block_num : u32 ) -> Result < & mut BlockStat > {
@@ -576,40 +560,41 @@ impl ClientState {
576
560
mut lc_receiver : UnboundedReceiver < LcEvent > ,
577
561
mut rpc_receiver : broadcast:: Receiver < RpcEvent > ,
578
562
) {
579
- self . metrics . count ( MetricCounter :: Starts , self . attributes ( ) ) ;
563
+ self . metrics . count ( MetricCounter :: Starts ) ;
580
564
loop {
581
565
select ! {
582
566
Some ( p2p_event) = p2p_receiver. recv( ) => {
583
567
match p2p_event {
584
568
P2pEvent :: Count => {
585
- self . metrics. count( MetricCounter :: EventLoopEvent , self . attributes ( ) ) ;
569
+ self . metrics. count( MetricCounter :: EventLoopEvent ) ;
586
570
} ,
587
571
P2pEvent :: IncomingGetRecord => {
588
- self . metrics. count( MetricCounter :: IncomingGetRecord , self . attributes ( ) ) ;
572
+ self . metrics. count( MetricCounter :: IncomingGetRecord ) ;
589
573
} ,
590
574
P2pEvent :: IncomingPutRecord => {
591
- self . metrics. count( MetricCounter :: IncomingPutRecord , self . attributes ( ) ) ;
575
+ self . metrics. count( MetricCounter :: IncomingPutRecord ) ;
592
576
} ,
593
577
P2pEvent :: KadModeChange ( mode) => {
578
+
594
579
self . update_operating_mode( mode) ;
595
580
} ,
596
581
P2pEvent :: Ping ( rtt) => {
597
582
self . metrics. record( MetricValue :: DHTPingLatency ( rtt. as_millis( ) as f64 ) ) ;
598
583
} ,
599
584
P2pEvent :: IncomingConnection => {
600
- self . metrics. count( MetricCounter :: IncomingConnections , self . attributes ( ) ) ;
585
+ self . metrics. count( MetricCounter :: IncomingConnections ) ;
601
586
} ,
602
587
P2pEvent :: IncomingConnectionError => {
603
- self . metrics. count( MetricCounter :: IncomingConnectionErrors , self . attributes ( ) ) ;
588
+ self . metrics. count( MetricCounter :: IncomingConnectionErrors ) ;
604
589
} ,
605
590
P2pEvent :: MultiaddressUpdate ( address) => {
606
591
self . update_multiaddress( address) ;
607
592
} ,
608
593
P2pEvent :: EstablishedConnection => {
609
- self . metrics. count( MetricCounter :: EstablishedConnections , self . attributes ( ) ) ;
594
+ self . metrics. count( MetricCounter :: EstablishedConnections ) ;
610
595
} ,
611
596
P2pEvent :: OutgoingConnectionError => {
612
- self . metrics. count( MetricCounter :: OutgoingConnectionErrors , self . attributes ( ) ) ;
597
+ self . metrics. count( MetricCounter :: OutgoingConnectionErrors ) ;
613
598
} ,
614
599
P2pEvent :: PutRecord { block_num, records } => {
615
600
self . handle_new_put_record( block_num, records) ;
@@ -634,16 +619,6 @@ impl ClientState {
634
619
}
635
620
Some ( maintenance_event) = maintenance_receiver. recv( ) => {
636
621
match maintenance_event {
637
- MaintenanceEvent :: FlushMetrics ( block_num) => {
638
- if let Err ( error) = self . metrics. flush( self . attributes( ) ) {
639
- error!(
640
- block_num,
641
- "Could not handle Flush Maintenance event properly: {error}"
642
- ) ;
643
- } else {
644
- info!( block_num, "Flushing metrics finished" ) ;
645
- } ;
646
- } ,
647
622
MaintenanceEvent :: RecordStats {
648
623
connected_peers,
649
624
block_confidence_treshold,
@@ -656,7 +631,7 @@ impl ClientState {
656
631
self . metrics. record( MetricValue :: DHTQueryTimeout ( query_timeout) ) ;
657
632
} ,
658
633
MaintenanceEvent :: CountUps => {
659
- self . metrics. count( MetricCounter :: Up , self . attributes ( ) ) ;
634
+ self . metrics. count( MetricCounter :: Up ) ;
660
635
} ,
661
636
}
662
637
}
@@ -666,7 +641,7 @@ impl ClientState {
666
641
self . metrics. record( MetricValue :: BlockProcessingDelay ( delay) ) ;
667
642
} ,
668
643
LcEvent :: CountSessionBlocks => {
669
- self . metrics. count( MetricCounter :: SessionBlocks , self . attributes ( ) ) ;
644
+ self . metrics. count( MetricCounter :: SessionBlocks ) ;
670
645
} ,
671
646
LcEvent :: RecordBlockHeight ( block_num) => {
672
647
self . metrics. record( MetricValue :: BlockHeight ( block_num) ) ;
0 commit comments