From 6b61a1417832672562b1bf99c93ded06c5b925a2 Mon Sep 17 00:00:00 2001 From: j-mendez Date: Mon, 30 Dec 2024 09:13:47 -0500 Subject: [PATCH] perf(runtime): add dedicated thread for request connect --- Cargo.lock | 14 ++- spider/Cargo.toml | 6 +- spider/src/utils/connect.rs | 187 ++++++++++++++++++++++++++++++ spider/src/utils/mod.rs | 2 + spider/src/website.rs | 15 +++ spider_chrome/Cargo.toml | 2 +- spider_cli/Cargo.toml | 2 +- spider_transformations/Cargo.toml | 2 +- spider_utils/Cargo.toml | 2 +- spider_worker/Cargo.toml | 2 +- 10 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 spider/src/utils/connect.rs diff --git a/Cargo.lock b/Cargo.lock index a0e0f90133..d49c4445f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5367,7 +5367,7 @@ dependencies = [ [[package]] name = "spider" -version = "2.22.16" +version = "2.22.19" dependencies = [ "ahash", "aho-corasick", @@ -5392,6 +5392,7 @@ dependencies = [ "httpdate", "itertools 0.13.0", "lazy_static", + "libc", "log", "lol_html", "moka", @@ -5399,6 +5400,7 @@ dependencies = [ "percent-encoding", "phf 0.11.2", "phf_codegen 0.11.2", + "pin-project-lite", "quick-xml", "rand 0.8.5", "regex", @@ -5428,7 +5430,7 @@ dependencies = [ [[package]] name = "spider_chrome" -version = "2.22.16" +version = "2.22.19" dependencies = [ "adblock", "aho-corasick", @@ -5518,7 +5520,7 @@ dependencies = [ [[package]] name = "spider_cli" -version = "2.22.16" +version = "2.22.19" dependencies = [ "clap", "env_logger", @@ -5561,7 +5563,7 @@ dependencies = [ [[package]] name = "spider_transformations" -version = "2.22.16" +version = "2.22.19" dependencies = [ "aho-corasick", "fast_html2md", @@ -5584,7 +5586,7 @@ dependencies = [ [[package]] name = "spider_utils" -version = "2.22.16" +version = "2.22.19" dependencies = [ "indexmap 1.9.3", "serde", @@ -5597,7 +5599,7 @@ dependencies = [ [[package]] name = "spider_worker" -version = "2.22.16" +version = "2.22.19" dependencies = [ "env_logger", "lazy_static", diff --git a/spider/Cargo.toml b/spider/Cargo.toml index 55f6665577..289260e614 100644 --- a/spider/Cargo.toml +++ b/spider/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider" -version = "2.22.16" +version = "2.22.19" authors = [ "j-mendez " ] @@ -68,6 +68,7 @@ sysinfo = { version = "0.33", default-features = false, features = ["system"], o sqlx = { version = "0.8", features = [ "runtime-tokio", "sqlite" ], optional = true } h2 = "0.4" tower = { version = "0.5", features = ["limit"]} +pin-project-lite = "0.2" [dependencies.spider_chrome] version = "2" @@ -78,6 +79,9 @@ features = [ "stream" ] +[target.'cfg(target_os = "linux")'.dependencies] +libc = "0.2" + [target.'cfg(all(not(windows), not(target_os = "android"), not(target_env = "musl")))'.dependencies] tikv-jemallocator = { version = "0.6", optional = true } diff --git a/spider/src/utils/connect.rs b/spider/src/utils/connect.rs new file mode 100644 index 0000000000..d6c433af78 --- /dev/null +++ b/spider/src/utils/connect.rs @@ -0,0 +1,187 @@ +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + sync::atomic::AtomicBool, + task::{Context, Poll}, +}; +use tokio::{ + select, + sync::{mpsc::error::SendError, OnceCell}, +}; +use tower::{BoxError, Layer, Service}; + +/// A threadpool dedicated for connecting to services. +static CONNECT_THREAD_POOL: OnceCell< + tokio::sync::mpsc::UnboundedSender + Send + 'static>>>, +> = OnceCell::const_new(); + +/// Is the background thread connect enabled. +static BACKGROUND_THREAD_CONNECT_ENABLED: AtomicBool = AtomicBool::new(true); + +/// Is the background thread inited. +pub(crate) fn background_connect_threading() -> bool { + BACKGROUND_THREAD_CONNECT_ENABLED.load(std::sync::atomic::Ordering::Relaxed) +} + +/// Init a background thread for request connect handling. +pub(crate) fn init_background_runtime() { + let _ = CONNECT_THREAD_POOL.set({ + let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel(); + let builder = std::thread::Builder::new(); + + if let Err(_) = builder.spawn(move || { + match tokio::runtime::Builder::new_multi_thread() + .thread_name("connect-background-pool-thread") + .worker_threads(num_cpus::get() as usize) + .on_thread_start(move || { + #[cfg(target_os = "linux")] + unsafe { + if libc::nice(10) == -1 && *libc::__errno_location() != 0 { + let error = std::io::Error::last_os_error(); + log::error!("failed to set threadpool niceness: {}", error); + } + } + }) + .enable_all() + .build() + { + Ok(rt) => { + rt.block_on(async move { + while let Some(work) = rx.recv().await { + tokio::task::spawn(work); + } + }); + } + _ => { + BACKGROUND_THREAD_CONNECT_ENABLED + .store(false, std::sync::atomic::Ordering::Relaxed); + } + } + }) { + let _ = tx.downgrade(); + BACKGROUND_THREAD_CONNECT_ENABLED.store(false, std::sync::atomic::Ordering::Relaxed); + }; + + tx + }); +} + +/// This tower layer injects futures with a oneshot channel, and then sends them to the background runtime for processing. +#[derive(Copy, Clone)] +pub struct BackgroundProcessorLayer; + +impl BackgroundProcessorLayer { + /// A new background proccess layer shortcut. + pub fn new() -> Self { + Self + } +} +impl Layer for BackgroundProcessorLayer { + type Service = BackgroundProcessor; + fn layer(&self, service: S) -> Self::Service { + BackgroundProcessor::new(service) + } +} + +impl std::fmt::Debug for BackgroundProcessorLayer { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.debug_struct("BackgroundProcessorLayer").finish() + } +} + +/// Send to the background runtime. +fn send_to_background_runtime(future: impl Future + Send + 'static) { + let tx = CONNECT_THREAD_POOL.get().expect( + "background runtime should be initialized by calling init_background_runtime before use", + ); + + if let Err(SendError(_)) = tx.send(Box::pin(future)) { + log::error!("Failed to send future - background connect runtime channel is closed. Abandoning task."); + } +} + +/// This tower service injects futures with a oneshot channel, and then sends them to the background runtime for processing. +#[derive(Debug, Clone)] +pub struct BackgroundProcessor { + inner: S, +} + +impl BackgroundProcessor { + /// Setup a new connect background processor. + pub fn new(inner: S) -> Self { + BackgroundProcessor { inner } + } +} + +impl Service for BackgroundProcessor +where + S: Service, + S::Response: Send + 'static, + S::Error: Into + Send, + S::Future: Send + 'static, +{ + type Response = S::Response; + type Error = BoxError; + type Future = BackgroundResponseFuture; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + match self.inner.poll_ready(cx) { + Poll::Pending => Poll::Pending, + Poll::Ready(r) => Poll::Ready(r.map_err(Into::into)), + } + } + + fn call(&mut self, req: Request) -> Self::Future { + let response = self.inner.call(req); + let (mut tx, rx) = tokio::sync::oneshot::channel(); + + let future = async move { + select! { + _ = tx.closed() => (), + result = response => { + let _ = tx.send(result.map_err(Into::into)); + } + } + }; + + send_to_background_runtime(future); + BackgroundResponseFuture::new(rx) + } +} + +pin_project! { + #[derive(Debug)] + /// A new background response future. + pub struct BackgroundResponseFuture { + #[pin] + rx: tokio::sync::oneshot::Receiver>, + } +} + +impl BackgroundResponseFuture { + pub(crate) fn new(rx: tokio::sync::oneshot::Receiver>) -> Self { + BackgroundResponseFuture { rx } + } +} + +impl Future for BackgroundResponseFuture +where + S: Send + 'static, +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + match this.rx.poll(cx) { + Poll::Ready(v) => match v { + Ok(v) => Poll::Ready(v.map_err(Into::into)), + Err(err) => Poll::Ready(Err(Box::new(err) as BoxError)), + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/spider/src/utils/mod.rs b/spider/src/utils/mod.rs index 1e0a8bbbca..2df392781b 100644 --- a/spider/src/utils/mod.rs +++ b/spider/src/utils/mod.rs @@ -1,5 +1,7 @@ /// Absolute path domain handling. pub mod abs; +/// Connect layer for reqwest. +pub mod connect; /// Utils to modify the HTTP header. pub mod header_utils; /// String interner. diff --git a/spider/src/website.rs b/spider/src/website.rs index 2b7d154696..178f3f0c53 100644 --- a/spider/src/website.rs +++ b/spider/src/website.rs @@ -1056,6 +1056,12 @@ impl Website { _ => client, }; + let client = if crate::utils::connect::background_connect_threading() { + client.connector_layer(crate::utils::connect::BackgroundProcessorLayer::new()) + } else { + client + }; + let client = match self.configuration.concurrency_limit { Some(limit) => { client.connector_layer(tower::limit::concurrency::ConcurrencyLimitLayer::new(limit)) @@ -1121,6 +1127,13 @@ impl Website { }; let client = self.configure_http_client_cookies(client); + + let client = if crate::utils::connect::background_connect_threading() { + client.connector_layer(crate::utils::connect::BackgroundProcessorLayer::new()) + } else { + client + }; + let client = match self.configuration.concurrency_limit { Some(limit) => { client.connector_layer(tower::limit::concurrency::ConcurrencyLimitLayer::new(limit)) @@ -1416,6 +1429,7 @@ impl Website { async fn setup(&mut self) -> (Client, Option<(Arc, tokio::task::JoinHandle<()>)>) { self.determine_limits(); self.setup_disk(); + crate::utils::connect::init_background_runtime(); if self.status != CrawlStatus::Active { self.clear_all().await; @@ -1437,6 +1451,7 @@ impl Website { async fn setup(&mut self) -> (Client, Option<(Arc, tokio::task::JoinHandle<()>)>) { self.determine_limits(); self.setup_disk(); + crate::utils::connect::init_background_runtime(); if self.status != CrawlStatus::Active { self.clear_all().await; diff --git a/spider_chrome/Cargo.toml b/spider_chrome/Cargo.toml index b177e2e27e..718401048a 100644 --- a/spider_chrome/Cargo.toml +++ b/spider_chrome/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_chrome" -version = "2.22.16" +version = "2.22.19" rust-version = "1.70" authors = [ "j-mendez " diff --git a/spider_cli/Cargo.toml b/spider_cli/Cargo.toml index da3b1f1dec..7e1474765c 100644 --- a/spider_cli/Cargo.toml +++ b/spider_cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_cli" -version = "2.22.16" +version = "2.22.19" authors = [ "j-mendez " ] diff --git a/spider_transformations/Cargo.toml b/spider_transformations/Cargo.toml index 7c9135adc9..fc47d3cd9d 100644 --- a/spider_transformations/Cargo.toml +++ b/spider_transformations/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_transformations" -version = "2.22.16" +version = "2.22.19" authors = [ "j-mendez " ] diff --git a/spider_utils/Cargo.toml b/spider_utils/Cargo.toml index e6e1a78e71..98ac02ca95 100644 --- a/spider_utils/Cargo.toml +++ b/spider_utils/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_utils" -version = "2.22.16" +version = "2.22.19" authors = [ "j-mendez " ] diff --git a/spider_worker/Cargo.toml b/spider_worker/Cargo.toml index 4715d9e524..fe626182c8 100644 --- a/spider_worker/Cargo.toml +++ b/spider_worker/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "spider_worker" -version = "2.22.16" +version = "2.22.19" authors = [ "j-mendez " ]