Skip to content

Commit

Permalink
external task tracker
Browse files Browse the repository at this point in the history
  • Loading branch information
bragov4ik committed Feb 20, 2025
1 parent 76aeaac commit 0a960d1
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 16 deletions.
2 changes: 2 additions & 0 deletions libs/blockscout-service-launcher/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ actix-cors = { version = "0.7", optional = true }
actix-web-prom = { version = "0.6", optional = true }
anyhow = { version = "1.0", optional = true }
config = { version = "0.13", optional = true }
either = { version = "1.13", optional = true }
futures = { version = "0.3", optional = true }
cfg-if = { version = "1.0.0", optional = true }
keccak-hash = { version = "0.11.0", optional = true }
Expand Down Expand Up @@ -65,6 +66,7 @@ launcher = [
"dep:actix-cors",
"dep:anyhow",
"dep:config",
"dep:either",
"dep:futures",
"dep:once_cell",
"dep:prometheus",
Expand Down
103 changes: 88 additions & 15 deletions libs/blockscout-service-launcher/src/launcher/launch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use super::{
};
use actix_web::{middleware::Condition, App, HttpServer};
use actix_web_prom::PrometheusMetrics;
use std::{net::SocketAddr, time::Duration};
use std::{future::Future, net::SocketAddr, time::Duration};
use tokio::{task::JoinSet, time::timeout};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing_actix_web::TracingLogger;
Expand All @@ -20,11 +20,71 @@ pub struct LaunchSettings {
pub metrics: MetricsSettings,
}

#[derive(Clone)]
pub(crate) struct TaskTrackers {
pub local: TaskTracker,
pub external: Option<TaskTracker>,
}

impl TaskTrackers {
pub fn new(external: Option<TaskTracker>) -> Self {
Self {
local: TaskTracker::new(),
external,
}
}

pub fn close(&self) {
self.local.close();
if let Some(t) = &self.external {
t.close();
}
}

/// Should be cancel-safe, just like `TaskTracker::wait()`
pub async fn wait(&self) {
self.local.wait().await;
if let Some(t) = &self.external {
t.wait().await;
}
}

pub fn track_future<F>(&self, future: F) -> impl Future<Output = F::Output>
where
F: Future,
{
let future = self.local.track_future(future);
if let Some(t) = &self.external {
either::Left(t.track_future(future))
} else {
either::Right(future)
}
}
}

async fn spawn_and_track<F>(
futures: &mut JoinSet<F::Output>,
trackers: &TaskTrackers,
future: F,
) -> tokio::task::AbortHandle
where
F: Future,
F: Send + 'static,
F::Output: Send,
{
if let Some(t) = &trackers.external {
futures.spawn(trackers.local.track_future(t.track_future(future)))
} else {
futures.spawn(trackers.local.track_future(future))
}
}

pub async fn launch<R>(
settings: &LaunchSettings,
http: R,
grpc: tonic::transport::server::Router,
shutdown: Option<CancellationToken>,
task_tracker: Option<TaskTracker>,
) -> Result<(), anyhow::Error>
where
R: HttpRouter + Send + Sync + Clone + 'static,
Expand All @@ -35,7 +95,7 @@ where
.then(|| Metrics::new(&settings.service_name, &settings.metrics.route));

let mut futures = JoinSet::new();
let tracker = TaskTracker::new();
let trackers = TaskTrackers::new(task_tracker);

if settings.server.http.enabled {
let http_server = http_serve(
Expand All @@ -45,38 +105,46 @@ where
.map(|metrics| metrics.http_middleware().clone()),
&settings.server.http,
shutdown.clone(),
&trackers,
);
futures.spawn(async move { http_server.await.map_err(anyhow::Error::msg) });
spawn_and_track(&mut futures, &trackers, async move {
http_server.await.map_err(anyhow::Error::msg)
})
.await;
}

if settings.server.grpc.enabled {
let grpc_server = grpc_serve(grpc, settings.server.grpc.addr, shutdown.clone());
futures.spawn(
tracker.track_future(async move { grpc_server.await.map_err(anyhow::Error::msg) }),
);
spawn_and_track(&mut futures, &trackers, async move {
grpc_server.await.map_err(anyhow::Error::msg)
})
.await;
}

if let Some(metrics) = metrics {
let addr = settings.metrics.addr;
let shutdown = shutdown.clone();
futures.spawn(async move {
metrics.run_server(addr, shutdown).await?;
let trackers_ = trackers.clone();
spawn_and_track(&mut futures, &trackers, async move {
metrics.run_server(addr, shutdown, &trackers_).await?;
Ok(())
});
})
.await;
}
if let Some(ref shutdown) = shutdown {
let shutdown = shutdown.clone();
futures.spawn(tracker.track_future(async move {
spawn_and_track(&mut futures, &trackers, async move {
shutdown.cancelled().await;
Ok(())
}));
})
.await;
}

let res = futures.join_next().await.expect("future set is not empty");
tracker.close();
trackers.close();
if let Some(shutdown) = shutdown {
shutdown.cancel();
if timeout(Duration::from_secs(SHUTDOWN_TIMEOUT_SEC), tracker.wait())
if timeout(Duration::from_secs(SHUTDOWN_TIMEOUT_SEC), trackers.wait())
.await
.is_err()
{
Expand All @@ -86,7 +154,7 @@ where
} else {
futures.abort_all();
}
tracker.wait().await;
trackers.wait().await;
futures.join_all().await;
res?
}
Expand Down Expand Up @@ -114,6 +182,7 @@ fn http_serve<R>(
metrics: Option<PrometheusMetrics>,
settings: &HttpServerSettings,
shutdown: Option<CancellationToken>,
task_trackers: &TaskTrackers,
) -> actix_web::dev::Server
where
R: HttpRouter + Send + Sync + Clone + 'static,
Expand Down Expand Up @@ -155,7 +224,11 @@ where
.run()
};
if let Some(shutdown) = shutdown {
tokio::spawn(stop_actix_server_on_cancel(server.handle(), shutdown, true));
tokio::spawn(task_trackers.track_future(stop_actix_server_on_cancel(
server.handle(),
shutdown,
true,
)));
}
server
}
Expand Down
9 changes: 8 additions & 1 deletion libs/blockscout-service-launcher/src/launcher/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use tokio_util::sync::CancellationToken;

use crate::launcher::launch::{stop_actix_server_on_cancel, SHUTDOWN_TIMEOUT_SEC};

use super::launch::TaskTrackers;

#[derive(Clone)]
pub struct Metrics {
metrics_middleware: PrometheusMetrics,
Expand Down Expand Up @@ -40,6 +42,7 @@ impl Metrics {
self,
addr: SocketAddr,
shutdown: Option<CancellationToken>,
task_trackers: &TaskTrackers,
) -> actix_web::dev::Server {
tracing::info!(addr = ?addr, "starting metrics server");
let server = HttpServer::new(move || App::new().wrap(self.metrics_middleware.clone()))
Expand All @@ -48,7 +51,11 @@ impl Metrics {
.unwrap()
.run();
if let Some(shutdown) = shutdown {
tokio::spawn(stop_actix_server_on_cancel(server.handle(), shutdown, true));
tokio::spawn(task_trackers.track_future(stop_actix_server_on_cancel(
server.handle(),
shutdown,
true,
)));
}
server
}
Expand Down

0 comments on commit 0a960d1

Please sign in to comment.