Skip to content

Commit

Permalink
Merge pull request #1369 from jbesraa/2025-01-21/template-provider-cl…
Browse files Browse the repository at this point in the history
…eanup

`TemplateProvider` cleanup
  • Loading branch information
plebhash authored Feb 16, 2025
2 parents 461e419 + b60dc61 commit fb496dd
Show file tree
Hide file tree
Showing 7 changed files with 95 additions and 90 deletions.
4 changes: 2 additions & 2 deletions roles/tests-integration/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use utils::get_available_address;

pub mod sniffer;
pub mod template_provider;
mod utils;
pub(crate) mod utils;

static LOGGER: Once = Once::new();

Expand Down Expand Up @@ -97,7 +97,7 @@ pub async fn start_pool(template_provider_address: Option<SocketAddr>) -> (PoolS
(pool, listening_address)
}

pub async fn start_template_provider(sv2_interval: Option<u32>) -> (TemplateProvider, SocketAddr) {
pub fn start_template_provider(sv2_interval: Option<u32>) -> (TemplateProvider, SocketAddr) {
let address = get_available_address();
let sv2_interval = sv2_interval.unwrap_or(20);
let template_provider = TemplateProvider::start(address.port(), sv2_interval);
Expand Down
87 changes: 6 additions & 81 deletions roles/tests-integration/lib/template_provider.rs
Original file line number Diff line number Diff line change
@@ -1,84 +1,9 @@
use corepc_node::{Conf, Node};
use flate2::read::GzDecoder;
use std::{
env,
fs::{create_dir_all, File},
io::{BufReader, Read},
path::{Path, PathBuf},
};
use tar::Archive;
use std::{env, fs::create_dir_all, path::PathBuf};

const VERSION_TP: &str = "0.1.13";

fn download_bitcoind_tarball(download_url: &str, retries: usize) -> Vec<u8> {
for attempt in 1..=retries {
let response = minreq::get(download_url).send();
match response {
Ok(res) if res.status_code == 200 => {
return res.as_bytes().to_vec();
}
Ok(res) if res.status_code == 503 => {
// If the response is 503, log and prepare for retry
eprintln!(
"Attempt {}: URL {} returned status code 503 (Service Unavailable)",
attempt + 1,
download_url
);
}
Ok(res) => {
// For other status codes, log and stop retrying
panic!(
"URL {} returned unexpected status code {}. Aborting.",
download_url, res.status_code
);
}
Err(err) => {
eprintln!(
"Attempt {}: Failed to fetch URL {}: {:?}",
attempt + 1,
download_url,
err
);
}
}

if attempt < retries {
let delay = 1u64 << (attempt - 1);
eprintln!("Retrying in {} seconds (exponential backoff)...", delay);
std::thread::sleep(std::time::Duration::from_secs(delay));
}
}
// If all retries fail, panic with an error message
panic!(
"Cannot reach URL {} after {} attempts",
download_url, retries
);
}
use crate::utils::{http, tarball};

fn read_tarball_from_file(path: &str) -> Vec<u8> {
let file = File::open(path).unwrap_or_else(|_| {
panic!(
"Cannot find {:?} specified with env var BITCOIND_TARBALL_FILE",
path
)
});
let mut reader = BufReader::new(file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).unwrap();
buffer
}

fn unpack_tarball(tarball_bytes: &[u8], destination: &Path) {
let decoder = GzDecoder::new(tarball_bytes);
let mut archive = Archive::new(decoder);
for mut entry in archive.entries().unwrap().flatten() {
if let Ok(file) = entry.path() {
if file.ends_with("bitcoind") {
entry.unpack_in(destination).unwrap();
}
}
}
}
const VERSION_TP: &str = "0.1.13";

fn get_bitcoind_filename(os: &str, arch: &str) -> String {
match (os, arch) {
Expand Down Expand Up @@ -128,7 +53,7 @@ impl TemplateProvider {

if !bitcoin_exe_home.exists() {
let tarball_bytes = match env::var("BITCOIND_TARBALL_FILE") {
Ok(path) => read_tarball_from_file(&path),
Ok(path) => tarball::read_from_file(&path),
Err(_) => {
let download_endpoint =
env::var("BITCOIND_DOWNLOAD_ENDPOINT").unwrap_or_else(|_| {
Expand All @@ -138,15 +63,15 @@ impl TemplateProvider {
"{}/sv2-tp-{}/{}",
download_endpoint, VERSION_TP, download_filename
);
download_bitcoind_tarball(&url, 5)
http::make_get_request(&url, 5)
}
};

if let Some(parent) = bitcoin_exe_home.parent() {
create_dir_all(parent).unwrap();
}

unpack_tarball(&tarball_bytes, &tp_dir);
tarball::unpack(&tarball_bytes, &tp_dir);

if os == "macos" {
let bitcoind_binary = bitcoin_exe_home.join("bitcoind");
Expand Down
80 changes: 80 additions & 0 deletions roles/tests-integration/lib/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,83 @@ fn get_available_port() -> u16 {
}
}
}

pub mod http {
pub fn make_get_request(download_url: &str, retries: usize) -> Vec<u8> {
for attempt in 1..=retries {
let response = minreq::get(download_url).send();
match response {
Ok(res) => {
let status_code = res.status_code;
if (200..300).contains(&status_code) {
return res.as_bytes().to_vec();
} else if (500..600).contains(&status_code) {
eprintln!(
"Attempt {}: URL {} returned a server error code {}",
attempt, download_url, status_code
);
} else {
panic!(
"URL {} returned unexpected status code {}. Aborting.",
download_url, status_code
);
}
}
Err(err) => {
eprintln!(
"Attempt {}: Failed to fetch URL {}: {:?}",
attempt + 1,
download_url,
err
);
}
}

if attempt < retries {
let delay = 1u64 << (attempt - 1);
eprintln!("Retrying in {} seconds (exponential backoff)...", delay);
std::thread::sleep(std::time::Duration::from_secs(delay));
}
}
// If all retries fail, panic with an error message
panic!(
"Cannot reach URL {} after {} attempts",
download_url, retries
);
}
}

pub mod tarball {
use flate2::read::GzDecoder;
use std::{
fs::File,
io::{BufReader, Read},
path::Path,
};
use tar::Archive;

pub fn read_from_file(path: &str) -> Vec<u8> {
let file = File::open(path).unwrap_or_else(|_| {
panic!(
"Cannot find {:?} specified with env var BITCOIND_TARBALL_FILE",
path
)
});
let mut reader = BufReader::new(file);
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).unwrap();
buffer
}

pub fn unpack(tarball_bytes: &[u8], destination: &Path) {
let decoder = GzDecoder::new(tarball_bytes);
let mut archive = Archive::new(decoder);
for mut entry in archive.entries().unwrap().flatten() {
if let Ok(file) = entry.path() {
if file.ends_with("bitcoind") {
entry.unpack_in(destination).unwrap();
}
}
}
}
}
2 changes: 1 addition & 1 deletion roles/tests-integration/tests/jd_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use roles_logic_sv2::parsers::{CommonMessages, PoolMessages};
// internal cleanup or reconnection attempts.
#[tokio::test]
async fn jds_should_not_panic_if_jdc_shutsdown() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (_tp, tp_addr) = start_template_provider(None);
let (_pool, pool_addr) = start_pool(Some(tp_addr)).await;
let (_jds, jds_addr) = start_jds(tp_addr).await;
let (jdc, jdc_addr) = start_jdc(pool_addr, tp_addr, jds_addr).await;
Expand Down
4 changes: 2 additions & 2 deletions roles/tests-integration/tests/pool_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use roles_logic_sv2::{
#[tokio::test]
async fn success_pool_template_provider_connection() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None).await;
let (_tp, tp_addr) = start_template_provider(None);
let (sniffer, sniffer_addr) = start_sniffer("".to_string(), tp_addr, true, None).await;
let _ = start_pool(Some(sniffer_addr)).await;
// here we assert that the downstream(pool in this case) have sent `SetupConnection` message
Expand Down Expand Up @@ -73,7 +73,7 @@ async fn success_pool_template_provider_connection() {
#[tokio::test]
async fn header_timestamp_value_assertion_in_new_extended_mining_job() {
let sv2_interval = Some(5);
let (_tp, tp_addr) = start_template_provider(sv2_interval).await;
let (_tp, tp_addr) = start_template_provider(sv2_interval);
let tp_pool_sniffer_identifier =
"header_timestamp_value_assertion_in_new_extended_mining_job tp_pool sniffer".to_string();
let (tp_pool_sniffer, tp_pool_sniffer_addr) =
Expand Down
6 changes: 3 additions & 3 deletions roles/tests-integration/tests/sniffer_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use std::convert::TryInto;
#[tokio::test]
async fn test_sniffer_intercept_to_downstream() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None).await;
let (_tp, tp_addr) = start_template_provider(None);
let message_replacement =
PoolMessages::Common(CommonMessages::SetupConnectionError(SetupConnectionError {
flags: 0,
Expand Down Expand Up @@ -61,7 +61,7 @@ async fn test_sniffer_intercept_to_downstream() {

#[tokio::test]
async fn test_sniffer_intercept_to_upstream() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (_tp, tp_addr) = start_template_provider(None);
let setup_connection = SetupConnection {
protocol: Protocol::TemplateDistributionProtocol,
min_version: 2,
Expand Down Expand Up @@ -112,7 +112,7 @@ async fn test_sniffer_intercept_to_upstream() {

#[tokio::test]
async fn test_sniffer_wait_for_message_type_with_remove() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (_tp, tp_addr) = start_template_provider(None);
let (sniffer, sniffer_addr) = start_sniffer("".to_string(), tp_addr, false, None).await;
let _ = start_pool(Some(sniffer_addr)).await;
assert!(
Expand Down
2 changes: 1 addition & 1 deletion roles/tests-integration/tests/translator_integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use roles_logic_sv2::parsers::{CommonMessages, Mining, PoolMessages};
#[tokio::test]
async fn translate_sv1_to_sv2_successfully() {
start_tracing();
let (_tp, tp_addr) = start_template_provider(None).await;
let (_tp, tp_addr) = start_template_provider(None);
let (_pool, pool_addr) = start_pool(Some(tp_addr)).await;
let (pool_translator_sniffer, pool_translator_sniffer_addr) =
start_sniffer("0".to_string(), pool_addr, false, None).await;
Expand Down

0 comments on commit fb496dd

Please sign in to comment.