Skip to content

Commit 9ce58f2

Browse files
committed
feat: make max concurrent importers configurable
Signed-off-by: Jim Crossley <[email protected]>
1 parent 5c5816b commit 9ce58f2

File tree

4 files changed

+29
-14
lines changed

4 files changed

+29
-14
lines changed

modules/importer/src/server/mod.rs

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ pub async fn importer(
2424
storage: DispatchBackend,
2525
working_dir: Option<PathBuf>,
2626
analysis: Option<AnalysisService>,
27+
concurrency: usize,
2728
) -> anyhow::Result<()> {
2829
Server {
2930
db,
3031
storage,
3132
working_dir,
3233
analysis,
34+
concurrency,
3335
}
3436
.run()
3537
.await
@@ -56,6 +58,7 @@ struct Server {
5658
storage: DispatchBackend,
5759
working_dir: Option<PathBuf>,
5860
analysis: Option<AnalysisService>,
61+
concurrency: usize,
5962
}
6063

6164
impl Server {
@@ -70,15 +73,16 @@ impl Server {
7073

7174
loop {
7275
interval.tick().await;
73-
tracing::debug!(jim = "", "checking importers");
74-
75-
let importers = service
76-
.list()
77-
.await?
78-
.into_iter()
79-
.filter(|i| !(i.data.configuration.disabled || can_wait(i)))
80-
.map(|i| self.import(i, &service));
81-
futures::future::join_all(importers).await;
76+
futures::future::join_all(
77+
service
78+
.list()
79+
.await?
80+
.into_iter()
81+
.filter(|i| !(i.data.configuration.disabled || can_wait(i)))
82+
.take(self.concurrency)
83+
.map(|i| self.import(i, &service)),
84+
)
85+
.await;
8286
}
8387
}
8488

modules/importer/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ impl ImporterService {
110110
.into_iter()
111111
.map(Importer::try_from)
112112
.collect::<Result<_, _>>()?;
113-
result.sort_unstable_by_key(|i| (i.data.configuration.disabled, i.name.clone()));
113+
result.sort_unstable_by_key(|i| (i.data.configuration.disabled, i.data.last_run));
114114
Ok(result)
115115
}
116116

modules/ingestor/src/graph/advisory/advisory_vulnerability.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ impl AdvisoryVulnerabilityContext<'_> {
4545
)
4646
}
4747

48-
#[instrument(skip(self, connection), ret)]
48+
#[instrument(skip(self, connection))]
4949
pub async fn ingest_package_status<C: ConnectionTrait>(
5050
&self,
5151
cpe_context: Option<Cpe>,

server/src/profile/importer.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,18 @@ use utoipa_redoc::{Redoc, Servable};
4949
#[derive(clap::Args, Debug)]
5050
pub struct Run {
5151
/// The importer working directory
52-
#[arg(long, env)]
52+
#[arg(long, id = "working_dir", env = "IMPORTER_WORKING_DIR")]
5353
pub working_dir: Option<PathBuf>,
5454

55+
/// The max number of concurrent importer runs
56+
#[arg(
57+
long,
58+
id = "concurrency",
59+
env = "IMPORTER_CONCURRENCY",
60+
default_value = "1"
61+
)]
62+
pub concurrency: usize,
63+
5564
// flattened commands must go last
5665
//
5766
/// Database configuration
@@ -73,6 +82,7 @@ struct InitData {
7382
storage: DispatchBackend,
7483
tracing: Tracing,
7584
working_dir: Option<PathBuf>,
85+
concurrency: usize,
7686
}
7787

7888
impl Run {
@@ -126,6 +136,7 @@ impl InitData {
126136
tracing: run.infra.tracing,
127137
storage,
128138
working_dir: run.working_dir,
139+
concurrency: run.concurrency,
129140
})
130141
}
131142

@@ -138,8 +149,8 @@ impl InitData {
138149
db,
139150
storage,
140151
self.working_dir,
141-
// Running the importer, we don't need an analysis graph update
142-
None,
152+
None, // Running the importer, we don't need an analysis graph update
153+
self.concurrency,
143154
)
144155
.await
145156
}

0 commit comments

Comments
 (0)