Skip to content

Commit

Permalink
Merge pull request #1321 from Shourya742/2024-12-23-fix-sigterm-jdc
Browse files Browse the repository at this point in the history
fix jdc sigterm signalling issue
  • Loading branch information
plebhash authored Jan 15, 2025
2 parents 4ad657a + 6d1632b commit 69a5398
Showing 1 changed file with 96 additions and 72 deletions.
168 changes: 96 additions & 72 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::{
str::FromStr,
sync::Arc,
};
use tokio::task::AbortHandle;
use tokio::{sync::Notify, task::AbortHandle};

use tracing::{error, info};

Expand Down Expand Up @@ -58,111 +58,132 @@ pub static IS_NEW_TEMPLATE_HANDLED: AtomicBool = AtomicBool::new(true);
pub struct JobDeclaratorClient {
/// Configuration of the proxy server [`JobDeclaratorClient`] is connected to.
config: ProxyConfig,
// Used for notifying the [`JobDeclaratorClient`] to shutdown gracefully.
shutdown: Arc<Notify>,
}

impl JobDeclaratorClient {
pub fn new(config: ProxyConfig) -> Self {
Self { config }
Self {
config,
shutdown: Arc::new(Notify::new()),
}
}

pub async fn start(self) {
let mut upstream_index = 0;
let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse());

// Channel used to manage failed tasks
let (tx_status, rx_status) = unbounded();

let task_collector = Arc::new(Mutex::new(vec![]));

let proxy_config = &self.config;
tokio::spawn({
let shutdown_signal = self.shutdown();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
shutdown_signal.notify_one();
}
}
});

loop {
let proxy_config = self.config;
'outer: loop {
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
let proxy_config = proxy_config.clone();
if let Some(upstream) = proxy_config.upstreams.get(upstream_index) {
self.initialize_jd(tx_status.clone(), task_collector.clone(), upstream.clone())
.await;
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
let upstream = upstream.clone();
tokio::spawn(async move {
Self::initialize_jd(proxy_config, tx_status, task_collector, upstream).await;
});
} else {
self.initialize_jd_as_solo_miner(tx_status.clone(), task_collector.clone())
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
tokio::spawn(async move {
Self::initialize_jd_as_solo_miner(
proxy_config,
tx_status.clone(),
task_collector.clone(),
)
.await;
});
}
// Check all tasks if is_finished() is true, if so exit
loop {
let task_status = select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = interrupt_signal_future => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
// we also shut down in case of error
},
}
std::process::exit(0);
}
};
let task_status: status::Status = task_status.unwrap();

match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
select! {
task_status = rx_status.recv().fuse() => {
if let Ok(task_status) = task_status {
match task_status.state {
// Should only be sent by the downstream listener
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamRogue => {
error!("Changin Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
status::State::UpstreamRogue => {
error!("Changing Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
} else {
info!("Received unknown task. Shutting down.");
break 'outer;
}
},
_ = self.shutdown.notified().fuse() => {
info!("Shutting down gracefully...");
std::process::exit(0);
}
}
};
}
}
}

async fn initialize_jd_as_solo_miner(
&self,
proxy_config: ProxyConfig,
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
) {
let proxy_config = &self.config;
let timeout = proxy_config.timeout;
let miner_tx_out = proxy_config::get_coinbase_output(proxy_config).unwrap();
let miner_tx_out = proxy_config::get_coinbase_output(&proxy_config).unwrap();

// When Downstream receive a share that meets bitcoin target it transformit in a
// SubmitSolution and send it to the TemplateReceiver
Expand Down Expand Up @@ -212,12 +233,11 @@ impl JobDeclaratorClient {
}

async fn initialize_jd(
&self,
proxy_config: ProxyConfig,
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
upstream_config: proxy_config::Upstream,
) {
let proxy_config = &self.config;
let timeout = proxy_config.timeout;
let test_only_do_not_send_solution_to_tp = proxy_config
.test_only_do_not_send_solution_to_tp
Expand Down Expand Up @@ -347,6 +367,10 @@ impl JobDeclaratorClient {
)
.await;
}

pub fn shutdown(&self) -> Arc<Notify> {
self.shutdown.clone()
}
}

#[derive(Debug)]
Expand Down

0 comments on commit 69a5398

Please sign in to comment.