Skip to content

Commit

Permalink
add peer & address log, timeout on bad dials
Browse files Browse the repository at this point in the history
  • Loading branch information
erhant committed Feb 6, 2025
1 parent fce0863 commit 406185b
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 23 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ default-members = ["compute"]

[workspace.package]
edition = "2021"
version = "0.3.2"
version = "0.3.3"
license = "Apache-2.0"
readme = "README.md"

Expand Down
23 changes: 14 additions & 9 deletions compute/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use dkn_p2p::{libp2p::Multiaddr, DriaNetworkType};
use dkn_p2p::{
libp2p::{Multiaddr, PeerId},
DriaNetworkType,
};
use dkn_workflows::DriaWorkflowsConfig;
use eyre::{eyre, Result};
use libsecp256k1::{PublicKey, SecretKey};
Expand All @@ -15,8 +18,10 @@ pub struct DriaComputeNodeConfig {
pub secret_key: SecretKey,
/// Wallet public key, derived from the secret key.
pub public_key: PublicKey,
/// Wallet address, derived from the public key.
pub address: [u8; 20],
/// Wallet address in hex, derived from the public key.
pub address: String,
/// Peer ID of the node.
pub peer_id: PeerId,
/// P2P listen address, e.g. `/ip4/0.0.0.0/tcp/4001`.
pub p2p_listen_addr: Multiaddr,
/// Workflow configurations, e.g. models and providers.
Expand Down Expand Up @@ -64,14 +69,13 @@ impl DriaComputeNodeConfig {
hex::encode(public_key.serialize_compressed())
);

let address = public_key_to_address(&public_key);
log::info!("Node Address: 0x{}", hex::encode(address));
// print address
let address = hex::encode(public_key_to_address(&public_key));
log::info!("Node Address: 0x{}", address);

// to this here to log the peer id at start
log::info!(
"Node PeerID: {}",
secret_to_keypair(&secret_key).public().to_peer_id()
);
let peer_id = secret_to_keypair(&secret_key).public().to_peer_id();
log::info!("Node PeerID: {}", peer_id);

// parse listen address
let p2p_listen_addr_str = env::var("DKN_P2P_LISTEN_ADDR")
Expand All @@ -94,6 +98,7 @@ impl DriaComputeNodeConfig {
secret_key,
public_key,
address,
peer_id,
workflows,
p2p_listen_addr,
network_type,
Expand Down
10 changes: 5 additions & 5 deletions compute/src/node/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ impl DriaComputeNode {
/// Number of seconds between refreshing for diagnostic prints.
const DIAGNOSTIC_REFRESH_INTERVAL_SECS: u64 = 30;
/// Number of seconds between refreshing the available nodes.
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 30 * 60; // 30 minutes
const AVAILABLE_NODES_REFRESH_INTERVAL_SECS: u64 = 10 * 60; // 30 minutes

// prepare durations for sleeps
let mut diagnostic_refresh_interval =
Expand Down Expand Up @@ -41,22 +41,22 @@ impl DriaComputeNode {
// a GossipSub message is received from the channel
// this is expected to be sent by the p2p client
gossipsub_msg_opt = self.gossip_message_rx.recv() => {
let (peer_id, message_id, message) = gossipsub_msg_opt.ok_or(eyre!("message_rx channel closed unexpectedly."))?;
let (propagation_peer_id, message_id, message) = gossipsub_msg_opt.ok_or(eyre!("message_rx channel closed unexpectedly"))?;

// handle the message, returning a message acceptance for the received one
let acceptance = self.handle_message((peer_id, &message_id, message)).await;
let acceptance = self.handle_message((propagation_peer_id, &message_id, message)).await;

// validate the message based on the acceptance
// cant do anything but log if this gives an error as well
if let Err(e) = self.p2p.validate_message(&message_id, &peer_id, acceptance).await {
if let Err(e) = self.p2p.validate_message(&message_id, &propagation_peer_id, acceptance).await {
log::error!("Error validating message {}: {:?}", message_id, e);
}

},

// a Request is received from the channel, sent by p2p client
request_msg_opt = self.request_rx.recv() => {
let request = request_msg_opt.ok_or(eyre!("request_rx channel closed unexpectedly."))?;
let request = request_msg_opt.ok_or(eyre!("request_rx channel closed unexpectedly"))?;
if let Err(e) = self.handle_request(request).await {
log::error!("Error handling request: {:?}", e);
}
Expand Down
20 changes: 18 additions & 2 deletions compute/src/node/diagnostic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ impl DriaComputeNode {
));
}

// print peer id and address
diagnostics.push(format!("Peer ID: {}", self.config.peer_id));
diagnostics.push(format!("Address: 0x{}", self.config.address));

// print models
diagnostics.push(format!(
"Models: {}",
Expand Down Expand Up @@ -84,8 +88,20 @@ impl DriaComputeNode {
// dial all rpc nodes
for rpc_addr in self.dria_nodes.rpc_nodes.iter() {
log::info!("Dialling RPC node: {}", rpc_addr);
if let Err(e) = self.p2p.dial(rpc_addr.clone()).await {
log::warn!("Error dialling RPC node: {:?}", e);

let fut = self.p2p.dial(rpc_addr.clone());
match tokio::time::timeout(Duration::from_secs(10), fut).await {
Err(timeout) => {
log::error!("Timeout dialling RPC node: {:?}", timeout);
}
Ok(res) => match res {
Err(e) => {
log::warn!("Error dialling RPC node: {:?}", e);
}
Ok(_) => {
log::info!("Successfully dialled RPC node: {}", rpc_addr);
}
},
};
}

Expand Down
2 changes: 1 addition & 1 deletion compute/src/node/gossipsub.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use dkn_p2p::libp2p::gossipsub::{Message, MessageAcceptance, MessageId, TopicSubscriptionFilter};
use dkn_p2p::libp2p::gossipsub::{Message, MessageAcceptance, MessageId};
use dkn_p2p::libp2p::PeerId;
use eyre::Result;

Expand Down
2 changes: 1 addition & 1 deletion p2p/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl DriaP2PClient {
}

// do a random-walk on the DHT with a random peer
log::info!("Searching for random peers.");
log::info!("Bootstrapping Kademlia DHT.");
let random_peer = PeerId::random();
swarm
.behaviour_mut()
Expand Down

0 comments on commit 406185b

Please sign in to comment.