Skip to content

Commit 9824c45

Browse files
authored
P2P monitoring tests (#751)
1 parent 9f05d0b commit 9824c45

File tree

14 files changed

+309
-20
lines changed

14 files changed

+309
-20
lines changed

Cargo.lock

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ members = [
66
"core",
77
"crawler",
88
"fat",
9+
"monitor-client",
910
"relay",
1011
]
1112
default-members = ["client"]

client/src/main.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,7 @@ use avail_light_core::{
2020
sync_finality::SyncFinality,
2121
telemetry::{self, otlp::Metrics, MetricCounter, MetricValue},
2222
types::{
23-
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, MultiaddrConfig, SecretKey,
24-
Uuid,
23+
load_or_init_suri, Delay, IdentityConfig, MaintenanceConfig, PeerAddress, SecretKey, Uuid,
2524
},
2625
utils::{default_subscriber, install_panic_hooks, json_subscriber, spawn_in_span},
2726
};
@@ -60,7 +59,6 @@ mod cli;
6059
mod config;
6160

6261
/// Light Client for Avail Blockchain
63-
6462
async fn run(
6563
cfg: RuntimeConfig,
6664
identity_cfg: IdentityConfig,
@@ -365,7 +363,7 @@ pub fn load_runtime_config(opts: &CliOpts) -> Result<RuntimeConfig> {
365363
if let Some(network) = &opts.network {
366364
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
367365
cfg.rpc.full_node_ws = network.full_node_ws();
368-
cfg.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
366+
cfg.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
369367
cfg.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
370368
cfg.genesis_hash = network.genesis_hash().to_string();
371369
}

core/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## 1.0.5
44

5+
- Rename `MultiaddrConfig` type to `PeerAddress` for better clarity
56
- Enable WASM compilation on proof mod
67
- Enable WASM compilation on utils and shutdown mods
78
- Allocate new port on each new dial attempt

core/src/network/p2p/client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use super::{
2525
event_loop::ConnectionEstablishedInfo, is_global, is_multiaddr_global, Command, EventLoop,
2626
MultiAddressInfo, OutputEvent, PeerInfo, QueryChannel,
2727
};
28-
use crate::types::MultiaddrConfig;
28+
use crate::types::PeerAddress;
2929

3030
#[derive(Clone)]
3131
pub struct Client {
@@ -137,8 +137,8 @@ impl Client {
137137
Box::new(move |context: &mut EventLoop| {
138138
let opts = DialOpts::peer_id(peer_id)
139139
.addresses(peer_address)
140-
.allocate_new_port()
141140
.condition(dial_condition)
141+
.allocate_new_port()
142142
.build();
143143
context.swarm.dial(opts)?;
144144

@@ -166,7 +166,7 @@ impl Client {
166166

167167
// Bootstrap is triggered automatically on add_address call
168168
// Bootstrap nodes are also used as autonat servers
169-
pub async fn bootstrap_on_startup(&self, bootstraps: &[MultiaddrConfig]) -> Result<()> {
169+
pub async fn bootstrap_on_startup(&self, bootstraps: &[PeerAddress]) -> Result<()> {
170170
for (peer, addr) in bootstraps.iter().map(Into::into) {
171171
self.dial_peer(peer, vec![addr.clone()], PeerCondition::Always)
172172
.await

core/src/network/p2p/configuration.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use super::{protocol_name, ProvidersConfig};
22
use crate::network::p2p::MemoryStoreConfig;
3-
use crate::types::{duration_seconds_format, KademliaMode, MultiaddrConfig, SecretKey};
3+
use crate::types::{duration_seconds_format, KademliaMode, PeerAddress, SecretKey};
44
use libp2p::{kad, multiaddr::Protocol, Multiaddr};
55
use serde::{Deserialize, Serialize};
66
use std::{
@@ -139,7 +139,7 @@ pub struct LibP2PConfig {
139139
#[serde(flatten)]
140140
pub kademlia: KademliaConfig,
141141
/// Vector of Relay nodes, which are used for hole punching
142-
pub relays: Vec<MultiaddrConfig>,
142+
pub relays: Vec<PeerAddress>,
143143
/// Sets the amount of time to keep connections alive when they're idle. (default: 10s).
144144
#[serde(with = "duration_seconds_format")]
145145
pub connection_idle_timeout: Duration,
@@ -148,7 +148,7 @@ pub struct LibP2PConfig {
148148
pub per_connection_event_buffer_size: usize,
149149
pub dial_concurrency_factor: NonZeroU8,
150150
/// Vector of Light Client bootstrap nodes, used to bootstrap DHT. If not set, light client acts as a bootstrap node, waiting for first peer to connect for DHT bootstrap (default: empty).
151-
pub bootstraps: Vec<MultiaddrConfig>,
151+
pub bootstraps: Vec<PeerAddress>,
152152
/// Maximum number of parallel tasks spawned for GET and PUT operations on DHT (default: 20).
153153
pub dht_parallelization_limit: usize,
154154
}

core/src/shutdown/signal.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ impl<T: Clone> Signal<T> {
2121
/// If the shutdown initiates before the wrapped future completes, the resulting future yields
2222
/// `Err(reason)` containing the shutdown reason. Upon successful completion of the wrapped future
2323
/// before a shutdown, it yields `Ok(val)`.
24-
2524
pub fn with_cancel<F: Future>(&self, future: F) -> WithCancel<T, F> {
2625
WithCancel {
2726
signal: self.clone(),

core/src/types.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -333,16 +333,16 @@ impl TryFrom<String> for CompactMultiaddress {
333333
untagged,
334334
expecting = "Valid multiaddress/peer_id string or a tuple (peer_id, multiaddress) expected"
335335
)]
336-
pub enum MultiaddrConfig {
336+
pub enum PeerAddress {
337337
Compact(CompactMultiaddress),
338338
PeerIdAndMultiaddr((PeerId, Multiaddr)),
339339
}
340340

341-
impl From<&MultiaddrConfig> for (PeerId, Multiaddr) {
342-
fn from(value: &MultiaddrConfig) -> Self {
341+
impl From<&PeerAddress> for (PeerId, Multiaddr) {
342+
fn from(value: &PeerAddress) -> Self {
343343
match value {
344-
MultiaddrConfig::Compact(CompactMultiaddress(value)) => value.clone(),
345-
MultiaddrConfig::PeerIdAndMultiaddr(value) => value.clone(),
344+
PeerAddress::Compact(CompactMultiaddress(value)) => value.clone(),
345+
PeerAddress::PeerIdAndMultiaddr(value) => value.clone(),
346346
}
347347
}
348348
}

crawler/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use avail_light_core::{
88
Network,
99
},
1010
telemetry::otlp::OtelConfig,
11-
types::{block_matrix_partition_format, tracing_level_format, MultiaddrConfig, Origin},
11+
types::{block_matrix_partition_format, tracing_level_format, Origin, PeerAddress},
1212
};
1313
use avail_rust::kate_recovery::matrix::Partition;
1414
use clap::{command, Parser};
@@ -109,7 +109,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
109109
if let Some(network) = &opts.network {
110110
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
111111
config.rpc.full_node_ws = network.full_node_ws();
112-
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
112+
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
113113
config.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
114114
config.genesis_hash = network.genesis_hash().to_string();
115115
}

fat/src/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use avail_light_core::{
1111
telemetry::otlp::OtelConfig,
1212
types::{
1313
block_matrix_partition_format, option_duration_seconds_format, tracing_level_format,
14-
MultiaddrConfig, SecretKey,
14+
PeerAddress, SecretKey,
1515
},
1616
};
1717
use avail_rust::kate_recovery::matrix::Partition;
@@ -121,7 +121,7 @@ pub fn load(opts: &CliOpts) -> Result<Config> {
121121
if let Some(network) = &opts.network {
122122
let bootstrap = (network.bootstrap_peer_id(), network.bootstrap_multiaddr());
123123
config.rpc.full_node_ws = network.full_node_ws();
124-
config.libp2p.bootstraps = vec![MultiaddrConfig::PeerIdAndMultiaddr(bootstrap)];
124+
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
125125
config.otel.ot_collector_endpoint = network.ot_collector_endpoint().to_string();
126126
config.genesis_hash = network.genesis_hash().to_string();
127127
}

monitor-client/Cargo.toml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "avail-light-monitor"
3+
version = "0.1.0"
4+
authors.workspace = true
5+
build = "../build.rs"
6+
edition = "2021"
7+
repository.workspace = true
8+
9+
[dependencies]
10+
avail-light-core = { workspace = true }
11+
clap = { workspace = true }
12+
color-eyre = { workspace = true }
13+
libp2p = { workspace = true }
14+
serde = { workspace = true }
15+
tokio = { workspace = true }
16+
tracing = { workspace = true }
17+
tracing-subscriber = { workspace = true }
18+
19+
[features]
20+
rocksdb = ["avail-light-core/rocksdb"]
21+
default = ["rocksdb"]
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use avail_light_core::{network::p2p::Client, types::PeerAddress};
2+
use color_eyre::Result;
3+
use libp2p::swarm::dial_opts::PeerCondition;
4+
use tokio::time::Interval;
5+
use tracing::{error, info};
6+
7+
pub struct BootstrapMonitor {
8+
bootstraps: Vec<PeerAddress>,
9+
interval: Interval,
10+
p2p_client: Client,
11+
}
12+
13+
impl BootstrapMonitor {
14+
pub fn new(bootstraps: Vec<PeerAddress>, interval: Interval, p2p_client: Client) -> Self {
15+
Self {
16+
bootstraps,
17+
interval,
18+
p2p_client,
19+
}
20+
}
21+
22+
pub async fn start_monitoring(&mut self) -> Result<()> {
23+
info!("Bootstrap monitor started.");
24+
loop {
25+
self.interval.tick().await;
26+
27+
for (peer, addr) in self.bootstraps.iter().map(Into::into) {
28+
match self
29+
.p2p_client
30+
.dial_peer(peer, vec![addr.clone()], PeerCondition::Always)
31+
.await
32+
{
33+
Ok(_) => {
34+
info!("Bootstrap {peer} dialed successfully!");
35+
},
36+
Err(e) => {
37+
error!("Error dialing bootstrap: {e}");
38+
},
39+
}
40+
}
41+
}
42+
}
43+
}

monitor-client/src/config.rs

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
use std::num::NonZeroUsize;
2+
use std::time::Duration;
3+
4+
use avail_light_core::network::{p2p::configuration::LibP2PConfig, Network};
5+
use avail_light_core::types::{tracing_level_format, PeerAddress, SecretKey};
6+
use clap::Parser;
7+
use color_eyre::{eyre::eyre, Result};
8+
use serde::{Deserialize, Serialize};
9+
use tracing::Level;
10+
11+
#[derive(Parser)]
12+
#[command(version)]
13+
pub struct CliOpts {
14+
/// Sets verbosity level.
15+
#[arg(long)]
16+
pub verbosity: Option<Level>,
17+
/// Sets logs format to JSON.
18+
#[arg(long)]
19+
pub logs_json: bool,
20+
/// Cleans DB state.
21+
#[arg(long)]
22+
pub clean: bool,
23+
/// Testnet or devnet selection.
24+
#[arg(short, long, value_name = "network", default_value = "hex")]
25+
pub network: Network,
26+
/// Time interval for monitoring actions
27+
#[arg(long, default_value = "10")]
28+
pub interval: u64,
29+
/// Seed string for libp2p keypair generation
30+
#[arg(long)]
31+
pub seed: Option<String>,
32+
/// P2P port
33+
#[arg(short, long)]
34+
pub port: Option<u16>,
35+
/// RocksDB store location
36+
#[arg(long, default_value = "./db")]
37+
pub db_path: String,
38+
#[arg(long, default_value = "5")]
39+
pub connection_idle_timeout: Option<u64>,
40+
#[arg(long, default_value = "10000")]
41+
pub max_negotiating_inbound_streams: Option<usize>,
42+
#[arg(long, default_value = "30000")]
43+
pub task_command_buffer_size: Option<usize>,
44+
#[arg(long, default_value = "10000")]
45+
pub per_connection_event_buffer_size: Option<usize>,
46+
}
47+
48+
#[derive(Debug, Serialize, Deserialize)]
49+
#[serde(default)]
50+
pub struct Config {
51+
/// Genesis hash of the network to be connected to.
52+
/// Set to "DEV" to connect to any network.
53+
pub genesis_hash: String,
54+
/// Time interval for monitoring actions.
55+
pub interval: u64,
56+
/// Log level.
57+
#[serde(with = "tracing_level_format")]
58+
pub log_level: Level,
59+
/// Log format: JSON for `true`, plain text for `false`.
60+
pub log_format_json: bool,
61+
/// Database file system path.
62+
pub db_path: String,
63+
#[serde(flatten)]
64+
pub libp2p: LibP2PConfig,
65+
}
66+
67+
impl Default for Config {
68+
fn default() -> Self {
69+
Self {
70+
genesis_hash: "DEV".to_owned(),
71+
log_level: Level::INFO,
72+
log_format_json: false,
73+
db_path: "./db".to_string(),
74+
libp2p: Default::default(),
75+
interval: 10,
76+
}
77+
}
78+
}
79+
80+
pub fn load(opts: &CliOpts) -> Result<Config> {
81+
let mut config = Config::default();
82+
83+
config.log_level = opts.verbosity.unwrap_or(config.log_level);
84+
config.log_format_json = opts.logs_json || config.log_format_json;
85+
86+
let bootstrap = (
87+
opts.network.bootstrap_peer_id(),
88+
opts.network.bootstrap_multiaddr(),
89+
);
90+
config.libp2p.bootstraps = vec![PeerAddress::PeerIdAndMultiaddr(bootstrap)];
91+
config.genesis_hash = opts.network.genesis_hash().to_string();
92+
93+
if let Some(seed) = &opts.seed {
94+
config.libp2p.secret_key = Some(SecretKey::Seed {
95+
seed: seed.to_string(),
96+
})
97+
}
98+
99+
if let Some(port) = opts.port {
100+
config.libp2p.port = port;
101+
}
102+
103+
if let Some(connection_idle_timeout) = opts.connection_idle_timeout {
104+
config.libp2p.connection_idle_timeout = Duration::from_secs(connection_idle_timeout);
105+
}
106+
107+
if let Some(max_negotiating_inbound_streams) = opts.max_negotiating_inbound_streams {
108+
config.libp2p.max_negotiating_inbound_streams = max_negotiating_inbound_streams;
109+
}
110+
if let Some(task_command_buffer_size) = opts.task_command_buffer_size {
111+
config.libp2p.task_command_buffer_size =
112+
NonZeroUsize::new(task_command_buffer_size).unwrap();
113+
}
114+
115+
if let Some(per_connection_event_buffer_size) = opts.per_connection_event_buffer_size {
116+
config.libp2p.per_connection_event_buffer_size = per_connection_event_buffer_size;
117+
}
118+
119+
config.db_path = opts.db_path.to_string();
120+
config.interval = opts.interval;
121+
122+
if config.libp2p.bootstraps.is_empty() {
123+
return Err(eyre!("List of bootstraps must not be empty!"));
124+
}
125+
Ok(config)
126+
}

0 commit comments

Comments
 (0)