Skip to content

Commit 0d79b33

Browse files
committed
Execute enabled importers concurrently
Signed-off-by: Jim Crossley <[email protected]>
1 parent c999828 commit 0d79b33

File tree

3 files changed

+71
-65
lines changed

3 files changed

+71
-65
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/importer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ chrono = { workspace = true }
1919
csaf = { workspace = true }
2020
csaf-walker = { workspace = true, features = ["crypto-openssl", "csaf"] }
2121
cve = { workspace = true }
22+
futures = { workspace = true }
2223
git2 = { workspace = true }
2324
humantime = { workspace = true }
2425
humantime-serde = { workspace = true }

modules/importer/src/server/mod.rs

Lines changed: 69 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{
88
ImportRunner,
99
},
1010
server::context::ServiceRunContext,
11-
service::ImporterService,
11+
service::{Error, ImporterService},
1212
};
1313
use std::{path::PathBuf, time::Duration};
1414
use time::OffsetDateTime;
@@ -70,70 +70,15 @@ impl Server {
7070

7171
loop {
7272
interval.tick().await;
73-
log::debug!("checking importers");
74-
75-
let importers = service.list().await?;
76-
for importer in importers {
77-
// FIXME: could add that to the query/list operation
78-
if importer.data.configuration.disabled || can_wait(&importer) {
79-
continue;
80-
}
81-
82-
log::debug!(" {}: {:?}", importer.name, importer.data.configuration);
83-
84-
service.update_start(&importer.name, None).await?;
85-
86-
// record timestamp before processing, so that we can use it as "since" marker
87-
let last_run = OffsetDateTime::now_utc();
88-
89-
log::info!("Starting run: {}", importer.name);
90-
91-
let context = ServiceRunContext::new(service.clone(), importer.name.clone());
92-
93-
let runner = ImportRunner {
94-
db: self.db.clone(),
95-
storage: self.storage.clone(),
96-
working_dir: self.working_dir.clone(),
97-
analysis: self.analysis.clone(),
98-
};
99-
100-
let (last_error, report, continuation) = match runner
101-
.run_once(
102-
context,
103-
importer.data.configuration,
104-
importer.data.last_success,
105-
importer.data.continuation,
106-
)
107-
.await
108-
{
109-
Ok(RunOutput {
110-
report,
111-
continuation,
112-
}) => (None, Some(report), continuation),
113-
Err(ScannerError::Normal {
114-
err,
115-
output:
116-
RunOutput {
117-
report,
118-
continuation,
119-
},
120-
}) => (Some(err.to_string()), Some(report), continuation),
121-
Err(ScannerError::Critical(err)) => (Some(err.to_string()), None, None),
122-
};
123-
124-
log::info!("Import run complete: {last_error:?}");
125-
126-
service
127-
.update_finish(
128-
&importer.name,
129-
None,
130-
last_run,
131-
last_error,
132-
continuation,
133-
report.and_then(|report| serde_json::to_value(report).ok()),
134-
)
135-
.await?;
136-
}
73+
tracing::debug!(jim = "", "checking importers");
74+
75+
let importers = service
76+
.list()
77+
.await?
78+
.into_iter()
79+
.filter(|i| !(i.data.configuration.disabled || can_wait(i)))
80+
.map(|i| self.import(i, &service));
81+
futures::future::join_all(importers).await;
13782
}
13883
}
13984

@@ -167,6 +112,65 @@ impl Server {
167112

168113
Ok(())
169114
}
115+
116+
async fn import(&self, importer: Importer, service: &ImporterService) -> Result<(), Error> {
117+
log::debug!(" {}: {:?}", importer.name, importer.data.configuration);
118+
119+
service.update_start(&importer.name, None).await?;
120+
121+
// record timestamp before processing, so that we can use it as "since" marker
122+
let last_run = OffsetDateTime::now_utc();
123+
124+
log::info!("Starting run: {}", importer.name);
125+
126+
let context = ServiceRunContext::new(service.clone(), importer.name.clone());
127+
128+
let runner = ImportRunner {
129+
db: self.db.clone(),
130+
storage: self.storage.clone(),
131+
working_dir: self.working_dir.clone(),
132+
analysis: self.analysis.clone(),
133+
};
134+
135+
let (last_error, report, continuation) = match runner
136+
.run_once(
137+
context,
138+
importer.data.configuration,
139+
importer.data.last_success,
140+
importer.data.continuation,
141+
)
142+
.await
143+
{
144+
Ok(RunOutput {
145+
report,
146+
continuation,
147+
}) => (None, Some(report), continuation),
148+
Err(ScannerError::Normal {
149+
err,
150+
output:
151+
RunOutput {
152+
report,
153+
continuation,
154+
},
155+
}) => (Some(err.to_string()), Some(report), continuation),
156+
Err(ScannerError::Critical(err)) => (Some(err.to_string()), None, None),
157+
};
158+
159+
log::info!("Import run complete: {last_error:?}");
160+
161+
service
162+
.update_finish(
163+
&importer.name,
164+
None,
165+
last_run,
166+
last_error,
167+
continuation,
168+
report.and_then(|report| serde_json::to_value(report).ok()),
169+
)
170+
.await?;
171+
172+
Ok(())
173+
}
170174
}
171175

172176
/// check if we need to run or skip the importer

0 commit comments

Comments
 (0)