Skip to content

Commit

Permalink
use task tracker + join set instead of vec of futures for more contro…
Browse files Browse the repository at this point in the history
…l over shutdown process
  • Loading branch information
bragov4ik committed Nov 19, 2024
1 parent eb3461a commit 44c0d06
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 26 deletions.
2 changes: 1 addition & 1 deletion libs/blockscout-service-launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ reqwest = { version = "0.11", features = ["json"], optional = true }
serde = { version = "1.0", features = ["derive"], optional = true }
serde_json = {version = "1", optional = true }
tokio = { version = "1", optional = true }
tokio-util = { version = "0.7.12", optional = true }
tokio-util = { version = "0.7.12", features = ["rt"], optional = true }
tonic = { version = "0.8", optional = true }
tracing = { version = "0.1", optional = true }
tracing-actix-web = { package = "blockscout-tracing-actix-web", version = "0.8.0", optional = true }
Expand Down
54 changes: 29 additions & 25 deletions libs/blockscout-service-launcher/src/launcher/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,9 @@ use super::{
};
use actix_web::{middleware::Condition, App, HttpServer};
use actix_web_prom::PrometheusMetrics;
use std::net::SocketAddr;
use tokio_util::sync::CancellationToken;
use std::{net::SocketAddr, time::Duration};
use tokio::task::JoinSet;
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing_actix_web::TracingLogger;

pub(crate) const SHUTDOWN_TIMEOUT_SEC: u64 = 10;
Expand All @@ -33,43 +34,46 @@ where
.enabled
.then(|| Metrics::new(&settings.service_name, &settings.metrics.route));

let mut futures = vec![];
let mut futures = JoinSet::new();
let tracker = TaskTracker::new();

if settings.server.http.enabled {
let http_server = {
let http_server_future = http_serve(
http,
metrics
.as_ref()
.map(|metrics| metrics.http_middleware().clone()),
&settings.server.http,
shutdown.clone(),
);
tokio::spawn(async move { http_server_future.await.map_err(anyhow::Error::msg) })
};
futures.push(http_server)
let http_server = http_serve(
http,
metrics
.as_ref()
.map(|metrics| metrics.http_middleware().clone()),
&settings.server.http,
shutdown.clone(),
);
futures.spawn(async move { http_server.await.map_err(anyhow::Error::msg) });
}

if settings.server.grpc.enabled {
let grpc_server = {
let grpc_server_future = grpc_serve(grpc, settings.server.grpc.addr, shutdown.clone());
tokio::spawn(async move { grpc_server_future.await.map_err(anyhow::Error::msg) })
};
futures.push(grpc_server)
let grpc_server = grpc_serve(grpc, settings.server.grpc.addr, shutdown.clone());
futures.spawn(
tracker.track_future(async move { grpc_server.await.map_err(anyhow::Error::msg) }),
);
}

if let Some(metrics) = metrics {
let addr = settings.metrics.addr;
futures.push(tokio::spawn(async move {
let shutdown = shutdown.clone();
futures.spawn(async move {
metrics.run_server(addr, shutdown).await?;
Ok(())
}));
});
}

let (res, _, others) = futures::future::select_all(futures).await;
for future in others.into_iter() {
future.abort()
let res = futures.join_next().await.expect("future set is not empty");
tracker.close();
shutdown.map(|s| s.cancel());
match tokio::time::timeout(Duration::from_secs(SHUTDOWN_TIMEOUT_SEC), tracker.wait()).await {
Ok(_) => (),
Err(_) => futures.abort_all(),
}
tracker.wait().await;
futures.join_all().await;
res?
}

Expand Down

0 comments on commit 44c0d06

Please sign in to comment.