Skip to content

Commit

Permalink
Merge pull request #1360 from Shourya742/2025-01-20-change-shutdown-a…
Browse files Browse the repository at this point in the history
…pi-and-add-unit-test

Add jds-do-not-fail-on-jdc-shutdown integration test
  • Loading branch information
plebhash authored Feb 1, 2025
2 parents 4637a82 + 1fe2395 commit 1ee8977
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 182 deletions.
10 changes: 0 additions & 10 deletions .github/workflows/mg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,6 @@ jobs:
- name: Run jds-do-not-fail-on-wrong-tsdatasucc
run: sh ./test/message-generator/test/jds-do-not-fail-on-wrong-tsdatasucc/jds-do-not-fail-on-wrong-tsdatasucc.sh

jds-do-not-panic-if-jdc-close-connection:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Run jds-do-not-panic-if-jdc-close-connection
run: sh ./test/message-generator/test/jds-do-not-panic-if-jdc-close-connection/jds-do-not-panic-if-jdc-close-connection.sh

jds-do-not-stackoverflow-when-no-token:
runs-on: ubuntu-latest
steps:
Expand Down Expand Up @@ -148,7 +140,6 @@ jobs:
interop-proxy-with-multi-ups,
interop-proxy-with-multi-ups-extended,
jds-do-not-fail-on-wrong-tsdatasucc,
jds-do-not-panic-if-jdc-close-connection,
jds-do-not-stackoverflow-when-no-token,
jds-receive-solution-while-processing-declared-job,
pool-sri-test-1-standard,
Expand All @@ -167,7 +158,6 @@ jobs:
[ "${{ needs.interop-proxy-with-multi-ups.result }}" != "success" ] ||
[ "${{ needs.interop-proxy-with-multi-ups-extended.result }}" != "success" ] ||
[ "${{ needs.jds-do-not-fail-on-wrong-tsdatasucc.result }}" != "success" ] ||
[ "${{ needs.jds-do-not-panic-if-jdc-close-connection.result }}" != "success" ] ||
[ "${{ needs.jds-do-not-stackoverflow-when-no-token.result }}" != "success" ] ||
[ "${{ needs.jds-receive-solution-while-processing-declared-job.result }}" != "success" ] ||
[ "${{ needs.pool-sri-test-1-standard.result }}" != "success" ] ||
Expand Down
73 changes: 67 additions & 6 deletions roles/jd-client/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ impl JobDeclaratorClient {
let task_collector = Arc::new(Mutex::new(vec![]));

tokio::spawn({
let shutdown_signal = self.shutdown();
let shutdown_signal = self.shutdown.clone();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
Expand All @@ -93,17 +93,18 @@ impl JobDeclaratorClient {
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
let proxy_config = proxy_config.clone();
let root_handler;
if let Some(upstream) = proxy_config.upstreams.get(upstream_index) {
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
let upstream = upstream.clone();
tokio::spawn(async move {
root_handler = tokio::spawn(async move {
Self::initialize_jd(proxy_config, tx_status, task_collector, upstream).await;
});
} else {
let tx_status = tx_status.clone();
let task_collector = task_collector.clone();
tokio::spawn(async move {
root_handler = tokio::spawn(async move {
Self::initialize_jd_as_solo_miner(
proxy_config,
tx_status.clone(),
Expand Down Expand Up @@ -165,12 +166,28 @@ impl JobDeclaratorClient {
}
} else {
info!("Received unknown task. Shutting down.");
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
root_handler.abort();
break 'outer;
}
},
_ = self.shutdown.notified().fuse() => {
info!("Shutting down gracefully...");
std::process::exit(0);
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
root_handler.abort();
break 'outer;
}
};
}
Expand Down Expand Up @@ -371,8 +388,14 @@ impl JobDeclaratorClient {
.await;
}

pub fn shutdown(&self) -> Arc<Notify> {
self.shutdown.clone()
/// Closes JDC role and any open connection associated with it.
///
/// Note that this method will result in a full exit of the running
/// jd-client and any open connection most be re-initiated upon new
/// start.
#[allow(dead_code)]
pub fn shutdown(&self) {
self.shutdown.notify_one();
}
}

Expand Down Expand Up @@ -409,3 +432,41 @@ impl PoolChangerTrigger {
}
}
}

#[cfg(test)]
mod tests {
use ext_config::{Config, File, FileFormat};

use crate::*;

#[tokio::test]
async fn test_shutdown() {
let config_path = "config-examples/jdc-config-hosted-example.toml";
let config: ProxyConfig = match Config::builder()
.add_source(File::new(config_path, FileFormat::Toml))
.build()
{
Ok(settings) => match settings.try_deserialize::<ProxyConfig>() {
Ok(c) => c,
Err(e) => {
dbg!(&e);
return;
}
},
Err(e) => {
dbg!(&e);
return;
}
};
let jdc = JobDeclaratorClient::new(config.clone());
let cloned = jdc.clone();
tokio::spawn(async move {
cloned.start().await;
});
jdc.shutdown();
let ip = config.downstream_address.clone();
let port = config.downstream_port;
let jdc_addr = format!("{}:{}", ip, port);
assert!(std::net::TcpListener::bind(jdc_addr).is_ok());
}
}
37 changes: 37 additions & 0 deletions roles/tests-integration/tests/jd_integration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
use integration_tests_sv2::*;

use roles_logic_sv2::parsers::{CommonMessages, PoolMessages};

// This test verifies that the `jds` (Job Distributor Server) does not panic when the `jdc`
// (Job Distributor Client) shuts down.
//
// The test follows these steps:
// 1. Start a Template Provider (`tp`) and a Pool.
// 2. Start the `jds` and the `jdc`, ensuring the `jdc` connects to the `jds`.
// 3. Shut down the `jdc` and ensure the `jds` remains operational without panicking.
// 4. Verify that the `jdc`'s address can be reused by asserting that a new `TcpListener` can bind
// to the same address.
// 5. Start a Sniffer as a proxy for observing messages exchanged between the `jds` and other
// components.
// 6. Reconnect a new `jdc` to the Sniffer and ensure the expected `SetupConnection` message is
// received from the `jdc`.
//
// This ensures that the shutdown of the `jdc` is handled gracefully by the `jds`, and subsequent
// connections or operations continue without issues.
//
// # Notes
// - The test waits for a brief duration (`1 second`) after shutting down the `jdc` to allow for any
// internal cleanup or reconnection attempts.
#[tokio::test]
async fn jds_should_not_panic_if_jdc_shutsdown() {
let (_tp, tp_addr) = start_template_provider(None).await;
let (_pool, pool_addr) = start_pool(Some(tp_addr)).await;
let (_jds, jds_addr) = start_jds(tp_addr).await;
let (jdc, jdc_addr) = start_jdc(pool_addr, tp_addr, jds_addr).await;
jdc.shutdown();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
assert!(tokio::net::TcpListener::bind(jdc_addr).await.is_ok());
let (sniffer, sniffer_addr) = start_sniffer("0".to_string(), jds_addr, false, None).await;
let (_jdc, _jdc_addr) = start_jdc(pool_addr, tp_addr, sniffer_addr).await;
assert_common_message!(sniffer.next_message_from_downstream(), SetupConnection);
}
56 changes: 51 additions & 5 deletions roles/translator/src/lib/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ impl TranslatorSv2 {
// Check all tasks if is_finished() is true, if so exit

tokio::spawn({
let shutdown_signal = self.shutdown();
let shutdown_signal = self.shutdown.clone();
async move {
if tokio::signal::ctrl_c().await.is_ok() {
info!("Interrupt received");
Expand All @@ -94,7 +94,7 @@ impl TranslatorSv2 {
match task_status_.state {
State::DownstreamShutdown(err) | State::BridgeShutdown(err) | State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
self.shutdown().notify_one();
self.shutdown();
}
State::UpstreamTryReconnect(err) => {
error!("Trying to reconnect the Upstream because of: {}", err);
Expand Down Expand Up @@ -126,16 +126,18 @@ impl TranslatorSv2 {
}
State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
self.shutdown().notify_one();
self.shutdown();
}
}
} else {
info!("Channel closed");
kill_tasks(task_collector.clone());
break; // Channel closed
}
}
_ = self.shutdown.notified() => {
info!("Shutting down gracefully...");
kill_tasks(task_collector.clone());
break;
}
}
Expand Down Expand Up @@ -288,8 +290,13 @@ impl TranslatorSv2 {
task_collector.safe_lock(|t| t.push((task.abort_handle(), "init task".to_string())));
}

pub fn shutdown(&self) -> Arc<Notify> {
self.shutdown.clone()
/// Closes Translator role and any open connection associated with it.
///
/// Note that this method will result in a full exit of the running
/// Translator and any open connection most be re-initiated upon new
/// start.
pub fn shutdown(&self) {
self.shutdown.notify_one();
}
}

Expand All @@ -301,3 +308,42 @@ fn kill_tasks(task_collector: Arc<Mutex<Vec<(AbortHandle, String)>>>) {
}
});
}

#[cfg(test)]
mod tests {
use super::TranslatorSv2;
use ext_config::{Config, File, FileFormat};

use crate::*;

#[tokio::test]
async fn test_shutdown() {
let config_path = "config-examples/tproxy-config-hosted-pool-example.toml";
let config: ProxyConfig = match Config::builder()
.add_source(File::new(config_path, FileFormat::Toml))
.build()
{
Ok(settings) => match settings.try_deserialize::<ProxyConfig>() {
Ok(c) => c,
Err(e) => {
dbg!(&e);
return;
}
},
Err(e) => {
dbg!(&e);
return;
}
};
let translator = TranslatorSv2::new(config.clone());
let cloned = translator.clone();
tokio::spawn(async move {
cloned.start().await;
});
translator.shutdown();
let ip = config.downstream_address.clone();
let port = config.downstream_port;
let translator_addr = format!("{}:{}", ip, port);
assert!(std::net::TcpListener::bind(translator_addr).is_ok());
}
}

This file was deleted.

This file was deleted.

Loading

0 comments on commit 1ee8977

Please sign in to comment.