Skip to content

Commit

Permalink
metrics: add a histogram for peer rtt
Browse files Browse the repository at this point in the history
This histogram will show how long peers are taking to reply our
messages, in seconds
  • Loading branch information
Davidson-Souza committed Feb 12, 2025
1 parent 53b0d88 commit a462f7f
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 25 deletions.
53 changes: 29 additions & 24 deletions crates/floresta-wire/src/p2p_wire/chain_selector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -809,37 +809,42 @@ where
&mut self,
notification: Option<NodeNotification>,
) -> Result<(), WireError> {
if let Some(NodeNotification::FromPeer(peer, message)) = notification {
match message {
PeerMessages::Headers(headers) => {
self.inflight.remove(&InflightRequests::Headers);
return self.handle_headers(peer, headers).await;
}
let Some(notification) = notification else {
return Ok(());
};

PeerMessages::Ready(version) => {
self.handle_peer_ready(peer, &version).await?;
if matches!(self.context.state, ChainSelectorState::LookingForForks(_)) {
let locator = self.chain.get_block_locator().unwrap();
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))
.await?;
}
}
#[cfg(feature = "metrics")]
self.register_message_time(&notification);

PeerMessages::Disconnected(idx) => {
if peer == self.context.sync_peer {
self.context.state = ChainSelectorState::CreatingConnections;
}
self.handle_disconnection(peer, idx).await?;
let NodeNotification::FromPeer(peer, message) = notification;
match message {
PeerMessages::Headers(headers) => {
self.inflight.remove(&InflightRequests::Headers);
return self.handle_headers(peer, headers).await;
}

PeerMessages::Ready(version) => {
self.handle_peer_ready(peer, &version).await?;
if matches!(self.context.state, ChainSelectorState::LookingForForks(_)) {
let locator = self.chain.get_block_locator().unwrap();
self.send_to_peer(peer, NodeRequest::GetHeaders(locator))
.await?;
}
}

PeerMessages::Addr(addresses) => {
let addresses: Vec<_> =
addresses.iter().cloned().map(|addr| addr.into()).collect();
self.address_man.push_addresses(&addresses);
PeerMessages::Disconnected(idx) => {
if peer == self.context.sync_peer {
self.context.state = ChainSelectorState::CreatingConnections;
}
self.handle_disconnection(peer, idx).await?;
}

_ => {}
PeerMessages::Addr(addresses) => {
let addresses: Vec<_> = addresses.iter().cloned().map(|addr| addr.into()).collect();
self.address_man.push_addresses(&addresses);
}

_ => {}
}
Ok(())
}
Expand Down
44 changes: 44 additions & 0 deletions crates/floresta-wire/src/p2p_wire/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,50 @@ where
}
}

#[cfg(feature = "metrics")]
pub(crate) fn register_message_time(&self, notification: &NodeNotification) -> Option<()> {
use metrics::get_metrics;
let now = Instant::now();
let NodeNotification::FromPeer(peer, message) = notification;

let when = match message {
PeerMessages::Block(block) => {
let inflight = self
.inflight
.get(&InflightRequests::Blocks(block.block.block_hash()))?;

inflight.1
}

PeerMessages::Ready(_) => {
let inflight = self.inflight.get(&InflightRequests::Connect(*peer))?;
inflight.1
}

PeerMessages::Headers(_) => {
let inflight = self.inflight.get(&InflightRequests::Headers)?;
inflight.1
}

PeerMessages::BlockFilter((hash, _)) => {
let inflight = self.inflight.get(&InflightRequests::GetFilters)?;
inflight.1
}

PeerMessages::UtreexoState(_) => {
let inflight = self.inflight.get(&InflightRequests::UtreexoState(*peer))?;
inflight.1
}

_ => return None,
};

let metrics = get_metrics();
let elapsed = now.duration_since(when).as_secs_f64();
metrics.message_times.observe(elapsed);
Some(())
}

/// Resolves a string address into a LocalAddress
///
/// This function should get an address in the format `<address>[<:port>]` and return a
Expand Down
3 changes: 3 additions & 0 deletions crates/floresta-wire/src/p2p_wire/running_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -790,6 +790,9 @@ where
&mut self,
notification: NodeNotification,
) -> Result<(), WireError> {
#[cfg(feature = "metrics")]
self.register_message_time(&notification);

match notification {
NodeNotification::FromPeer(peer, message) => match message {
PeerMessages::NewBlock(block) => {
Expand Down
3 changes: 3 additions & 0 deletions crates/floresta-wire/src/p2p_wire/sync_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,9 @@ where
}
/// Process a message from a peer and handle it accordingly between the variants of [`PeerMessages`].
async fn handle_message(&mut self, msg: NodeNotification) {
#[cfg(feature = "metrics")]
self.register_message_time(&msg);

match msg {
NodeNotification::FromPeer(peer, notification) => match notification {
PeerMessages::Block(block) => {
Expand Down
2 changes: 1 addition & 1 deletion florestad/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ json-rpc = [
"compact-filters"
]
default = ["experimental-p2p", "json-rpc"]
metrics = ["dep:metrics"]
metrics = ["dep:metrics", "floresta-wire/metrics"]

[build-dependencies]
toml = "0.5.10"
10 changes: 10 additions & 0 deletions metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use axum::routing::get;
use axum::Router;
use prometheus_client::encoding::text::encode;
use prometheus_client::metrics::gauge::Gauge;
use prometheus_client::metrics::histogram::Histogram;
use prometheus_client::registry::Registry;
use sysinfo::System;

Expand All @@ -15,6 +16,7 @@ pub struct AppMetrics {
pub block_height: Gauge,
pub peer_count: Gauge<f64, AtomicU64>,
pub avg_block_processing_time: Gauge<f64, AtomicU64>,
pub message_times: Histogram,
}

impl AppMetrics {
Expand All @@ -24,6 +26,7 @@ impl AppMetrics {
let block_height = Gauge::default();
let peer_count = Gauge::<f64, AtomicU64>::default();
let avg_block_processing_time = Gauge::<f64, AtomicU64>::default();
let message_times = Histogram::new([0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0].into_iter());

registry.register("block_height", "Current block height", block_height.clone());
registry.register(
Expand All @@ -44,12 +47,19 @@ impl AppMetrics {
memory_usage.clone(),
);

registry.register(
"message_times",
"A time-series of how long our peers take to respond to our requests. Timed out requests are not included.",
message_times.clone(),
);

Self {
registry,
block_height,
memory_usage,
peer_count,
avg_block_processing_time,
message_times,
}
}

Expand Down

0 comments on commit a462f7f

Please sign in to comment.