Skip to content

Commit

Permalink
Merge pull request #1429 from jbesraa/2025-01-31/add-template-provide…
Browse files Browse the repository at this point in the history
…r-rpc-fetch

Add TemplateProvider::rpc_info
  • Loading branch information
plebhash authored Feb 17, 2025
2 parents fb496dd + fb9a683 commit 410cbdc
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 113 deletions.
1 change: 1 addition & 0 deletions common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
#[cfg(feature = "bitcoin")]
pub use bitcoin;
pub use secp256k1;
pub mod url;
73 changes: 73 additions & 0 deletions common/src/url.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/// This file contains utility functions for working with URLs.
/// Checks if a given string is a valid URL.
///
/// Very naive and simple check, only checks if the URL starts with http:// or https:// and if the
/// port is a valid number.
pub fn is_valid_url(url_str: &str) -> bool {
extract_host_and_port_from_url(url_str).is_ok()
}

fn is_valid_url_scheme(url_str: &str) -> bool {
url_str.starts_with("http://") || url_str.starts_with("https://") && url_str.len() > 8
}

fn extract_host_and_port_from_url(url_str: &str) -> Result<(String, u16), &'static str> {
if !is_valid_url_scheme(url_str) {
return Err("Invalid URL format: must start with http:// or https://");
}
let mut parts = url_str.split("://");
let scheme = parts.next().unwrap();
let rest = parts.next().unwrap();
let mut host_port = rest.split('/');
let host_and_port = host_port.next().unwrap();
let mut host_port_parts = host_and_port.split(':');
let host = host_port_parts.next().unwrap().to_string();
let port_str = host_port_parts.next();
let port: u16 = match port_str {
Some(p) => match p.parse() {
Ok(port) => port,
Err(_) => return Err("Invalid port number"),
},
None => {
if scheme == "https" {
443
} else {
80
}
}
};

Ok((host, port))
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_is_valid_url_basic() {
assert_eq!(is_valid_url_scheme("https://www.example.com"), true);
assert_eq!(is_valid_url_scheme("http://127.0.0.1:8080"), true);
assert_eq!(is_valid_url_scheme("invalid-url"), false);
}

#[test]
fn test_extract_host_and_port_from_url() {
let (host, port) = extract_host_and_port_from_url("http://127.0.0.1:8080").unwrap();
assert_eq!(host, "127.0.0.1");
assert_eq!(port, 8080);

let (host, port) = extract_host_and_port_from_url("http://example.com").unwrap(); // No explicit port
assert_eq!(host, "example.com");
assert_eq!(port, 80); // Default HTTP port

let (host, port) = extract_host_and_port_from_url("https://example.com").unwrap();
assert_eq!(host, "example.com");
assert_eq!(port, 443); // Default HTTPS port

let (host, port) = extract_host_and_port_from_url("https://example.com:8443").unwrap();
assert_eq!(host, "example.com");
assert_eq!(port, 8443);
}
}
1 change: 0 additions & 1 deletion protocols/v2/roles-logic-sv2/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ pub fn merkle_root_from_path<T: AsRef<[u8]>>(
coinbase.extend_from_slice(coinbase_tx_prefix);
coinbase.extend_from_slice(extranonce);
coinbase.extend_from_slice(coinbase_tx_suffix);
dbg!(&coinbase.len());
let coinbase = match Transaction::deserialize(&coinbase[..]) {
Ok(trans) => trans,
Err(e) => {
Expand Down
2 changes: 2 additions & 0 deletions roles/jd-server/src/lib/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub enum JdsError {
MempoolError(JdsMempoolError),
ImpossibleToReconstructBlock(String),
NoLastDeclaredJob,
InvalidRPCUrl,
}

impl std::fmt::Display for JdsError {
Expand All @@ -48,6 +49,7 @@ impl std::fmt::Display for JdsError {
write!(f, "Error in reconstructing the block: {:?}", e)
}
NoLastDeclaredJob => write!(f, "Last declared job not found"),
InvalidRPCUrl => write!(f, "Invalid Template Provider RPC URL"),
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions roles/jd-server/src/lib/mempool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ impl JDsMempool {
}
}

/// Checks if the rpc client is accessible.
pub async fn health(self_: Arc<Mutex<Self>>) -> Result<(), JdsMempoolError> {
let client = self_
.safe_lock(|a| a.get_client())?
.ok_or(JdsMempoolError::NoClient)?;
client.health().await.map_err(JdsMempoolError::Rpc)
}

// this functions fill in the mempool the transactions with the given txid and insert the given
// transactions. The ids are for the transactions that are already known to the node, the
// unknown transactions are provided directly as a vector
Expand Down
143 changes: 83 additions & 60 deletions roles/jd-server/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ pub mod mempool;
pub mod status;

use async_channel::{bounded, unbounded, Receiver, Sender};
use error::JdsError;
use error_handling::handle_result;
use job_declarator::JobDeclarator;
use mempool::error::JdsMempoolError;
Expand All @@ -22,7 +23,10 @@ use std::{
convert::{TryFrom, TryInto},
time::Duration,
};
use stratum_common::bitcoin::{Script, TxOut};
use stratum_common::{
bitcoin::{Script, TxOut},
url::is_valid_url,
};

pub type Message = JdsMessages<'static>;
pub type StdFrame = StandardSv2Frame<Message>;
Expand All @@ -34,10 +38,14 @@ pub struct JobDeclaratorServer {
}

impl JobDeclaratorServer {
pub fn new(config: Configuration) -> Self {
Self { config }
pub fn new(config: Configuration) -> Result<Self, Box<JdsError>> {
let url = config.core_rpc_url.clone() + ":" + &config.core_rpc_port.clone().to_string();
if !is_valid_url(&url) {
return Err(Box::new(JdsError::InvalidRPCUrl));
}
Ok(Self { config })
}
pub async fn start(&self) {
pub async fn start(&self) -> Result<(), JdsError> {
let config = self.config.clone();
let url = config.core_rpc_url.clone() + ":" + &config.core_rpc_port.clone().to_string();
let username = config.core_rpc_user.clone();
Expand All @@ -53,75 +61,74 @@ impl JobDeclaratorServer {
)));
let mempool_update_interval = config.mempool_update_interval;
let mempool_cloned_ = mempool.clone();
let mempool_cloned_1 = mempool.clone();
if let Err(e) = mempool::JDsMempool::health(mempool_cloned_1.clone()).await {
error!("{:?}", e);
return Err(JdsError::MempoolError(e));
}
let (status_tx, status_rx) = unbounded();
let sender = status::Sender::Downstream(status_tx.clone());
let mut last_empty_mempool_warning =
std::time::Instant::now().sub(std::time::Duration::from_secs(60));

// TODO if the jd-server is launched with core_rpc_url empty, the following flow is never
// taken. Consequentally new_block_receiver in JDsMempool::on_submit is never read, possibly
// reaching the channel bound. The new_block_sender is given as input to
// JobDeclarator::start()
if url.contains("http") {
let sender_update_mempool = sender.clone();
task::spawn(async move {
loop {
let update_mempool_result: Result<(), mempool::error::JdsMempoolError> =
mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
if let Err(err) = update_mempool_result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
JdsMempoolError::NoClient => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
JdsMempoolError::Rpc(_) => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
JdsMempoolError::PoisonLock(_) => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
let sender_update_mempool = sender.clone();
task::spawn(async move {
loop {
let update_mempool_result: Result<(), mempool::error::JdsMempoolError> =
mempool::JDsMempool::update_mempool(mempool_cloned_.clone()).await;
if let Err(err) = update_mempool_result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
JdsMempoolError::NoClient => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
JdsMempoolError::Rpc(_) => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
JdsMempoolError::PoisonLock(_) => {
mempool::error::handle_error(&err);
handle_result!(sender_update_mempool, Err(err));
}
}
tokio::time::sleep(mempool_update_interval).await;
// DO NOT REMOVE THIS LINE
//let _transactions =
// mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone());
}
});

let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
task::spawn(async move {
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
if let Err(err) = result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
_ => {
// TODO here there should be a better error managmenet
mempool::error::handle_error(&err);
handle_result!(sender_submit_solution, Err(err));
tokio::time::sleep(mempool_update_interval).await;
// DO NOT REMOVE THIS LINE
//let _transactions =
// mempool::JDsMempool::_get_transaction_list(mempool_cloned_.clone());
}
});

let mempool_cloned = mempool.clone();
let sender_submit_solution = sender.clone();
task::spawn(async move {
loop {
let result = mempool::JDsMempool::on_submit(mempool_cloned.clone()).await;
if let Err(err) = result {
match err {
JdsMempoolError::EmptyMempool => {
if last_empty_mempool_warning.elapsed().as_secs() >= 60 {
warn!("{:?}", err);
warn!("Template Provider is running, but its mempool is empty (possible reasons: you're testing in testnet, signet, or regtest)");
last_empty_mempool_warning = std::time::Instant::now();
}
}
_ => {
// TODO here there should be a better error managmenet
mempool::error::handle_error(&err);
handle_result!(sender_submit_solution, Err(err));
}
}
}
});
};
}
});

let cloned = config.clone();
let mempool_cloned = mempool.clone();
Expand Down Expand Up @@ -199,6 +206,7 @@ impl JobDeclaratorServer {
}
}
}
Ok(())
}
}

Expand Down Expand Up @@ -366,6 +374,21 @@ mod tests {
settings.try_deserialize().expect("Failed to parse config")
}

#[tokio::test]
async fn test_invalid_rpc_url() {
let mut config = load_config("config-examples/jds-config-hosted-example.toml");
config.core_rpc_url = "invalid".to_string();
assert!(JobDeclaratorServer::new(config).is_err());
}

#[tokio::test]
async fn test_offline_rpc_url() {
let mut config = load_config("config-examples/jds-config-hosted-example.toml");
config.core_rpc_url = "http://127.0.0.1".to_string();
let jd = JobDeclaratorServer::new(config).unwrap();
assert!(jd.start().await.is_err());
}

#[test]
fn test_get_coinbase_output_non_empty() {
let config = load_config("config-examples/jds-config-hosted-example.toml");
Expand Down
1 change: 1 addition & 0 deletions roles/jd-server/src/lib/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub async fn handle_error(sender: &Sender, e: JdsError) -> error_handling::Error
JdsError::NoLastDeclaredJob => {
send_status(sender, e, error_handling::ErrorBranch::Continue).await
}
JdsError::InvalidRPCUrl => send_status(sender, e, error_handling::ErrorBranch::Break).await,
}
}

Expand Down
5 changes: 4 additions & 1 deletion roles/jd-server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,5 +102,8 @@ async fn main() {
}
};

lib::JobDeclaratorServer::new(config).start().await;
let _ = lib::JobDeclaratorServer::new(config)
.expect("Failed to start JDS")
.start()
.await;
}
12 changes: 12 additions & 0 deletions roles/roles-utils/rpc/src/mini_rpc_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,18 @@ impl MiniRpcClient {
}
}

/// Checks the health of the RPC connection by sending a request to the blockchain info
/// endpoint
pub async fn health(&self) -> Result<(), RpcError> {
let response = self
.send_json_rpc_request("getblockchaininfo", json!([]))
.await;
match response {
Ok(_) => Ok(()),
Err(error) => Err(error),
}
}

async fn send_json_rpc_request(
&self,
method: &str,
Expand Down
Loading

0 comments on commit 410cbdc

Please sign in to comment.