Skip to content

Commit eaa5c2c

Browse files
committed
Introduce more liveness with local spawning and no joining
Relates #1207 Allows enabled/disabled state of jobs to be reflected immediately when manipulated by the UX. Signed-off-by: Jim Crossley <[email protected]>
1 parent 33670c5 commit eaa5c2c

File tree

4 files changed

+82
-63
lines changed

4 files changed

+82
-63
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/importer/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ chrono = { workspace = true }
1919
csaf = { workspace = true }
2020
csaf-walker = { workspace = true, features = ["crypto-openssl", "csaf"] }
2121
cve = { workspace = true }
22-
futures = { workspace = true }
2322
git2 = { workspace = true }
2423
humantime = { workspace = true }
2524
humantime-serde = { workspace = true }

modules/importer/src/runner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use trustify_common::db::Database;
2323
use trustify_module_analysis::service::AnalysisService;
2424
use trustify_module_storage::service::dispatch::DispatchBackend;
2525

26+
#[derive(Clone)]
2627
pub struct ImportRunner {
2728
pub db: Database,
2829
pub storage: DispatchBackend,

modules/importer/src/server/mod.rs

Lines changed: 81 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use crate::{
1010
server::context::ServiceRunContext,
1111
service::{Error, ImporterService},
1212
};
13-
use std::{path::PathBuf, time::Duration};
13+
use std::{collections::HashMap, path::PathBuf, time::Duration};
1414
use time::OffsetDateTime;
15-
use tokio::time::MissedTickBehavior;
15+
use tokio::{
16+
task::{spawn_local, JoinHandle},
17+
time::MissedTickBehavior,
18+
};
1619
use tracing::instrument;
1720
use trustify_common::db::Database;
1821
use trustify_module_analysis::service::AnalysisService;
@@ -65,24 +68,45 @@ impl Server {
6568
#[instrument(skip_all, ret)]
6669
async fn run(&self) -> anyhow::Result<()> {
6770
let service = ImporterService::new(self.db.clone());
68-
71+
let runner = ImportRunner {
72+
db: self.db.clone(),
73+
storage: self.storage.clone(),
74+
working_dir: self.working_dir.clone(),
75+
analysis: self.analysis.clone(),
76+
};
6977
self.reset_all_jobs(&service).await?;
7078

7179
let mut interval = tokio::time::interval(Duration::from_secs(1));
7280
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
7381

82+
// Maintain a list of currently running jobs
83+
let mut runs = HashMap::<String, JoinHandle<_>>::default();
84+
7485
loop {
7586
interval.tick().await;
76-
futures::future::join_all(
77-
service
78-
.list()
79-
.await?
80-
.into_iter()
81-
.filter(|i| !(i.data.configuration.disabled || can_wait(i)))
82-
.take(self.concurrency)
83-
.map(|i| self.import(i, &service)),
84-
)
85-
.await;
87+
88+
// Remove jobs that are finished
89+
runs.retain(|_, job| !job.is_finished());
90+
91+
// Asynchronously fire off new jobs subject to max concurrency
92+
let todo: Vec<_> = service
93+
.list()
94+
.await?
95+
.into_iter()
96+
.filter(|i| {
97+
!(runs.contains_key(&i.name) || i.data.configuration.disabled || can_wait(i))
98+
})
99+
.take(self.concurrency - runs.len())
100+
.map(|i| {
101+
(
102+
i.name.clone(),
103+
spawn_local(import(runner.clone(), i, service.clone())),
104+
)
105+
})
106+
.collect();
107+
108+
// Add them to our list of currently running jobs
109+
runs.extend(todo.into_iter());
86110
}
87111
}
88112

@@ -116,65 +140,61 @@ impl Server {
116140

117141
Ok(())
118142
}
143+
}
119144

120-
async fn import(&self, importer: Importer, service: &ImporterService) -> Result<(), Error> {
121-
log::debug!(" {}: {:?}", importer.name, importer.data.configuration);
122-
123-
service.update_start(&importer.name, None).await?;
145+
async fn import(
146+
runner: ImportRunner,
147+
importer: Importer,
148+
service: ImporterService,
149+
) -> Result<(), Error> {
150+
log::debug!(" {}: {:?}", importer.name, importer.data.configuration);
124151

125-
// record timestamp before processing, so that we can use it as "since" marker
126-
let last_run = OffsetDateTime::now_utc();
152+
service.update_start(&importer.name, None).await?;
127153

128-
log::info!("Starting run: {}", importer.name);
154+
// record timestamp before processing, so that we can use it as "since" marker
155+
let last_run = OffsetDateTime::now_utc();
129156

130-
let context = ServiceRunContext::new(service.clone(), importer.name.clone());
157+
log::info!("Starting run: {}", importer.name);
131158

132-
let runner = ImportRunner {
133-
db: self.db.clone(),
134-
storage: self.storage.clone(),
135-
working_dir: self.working_dir.clone(),
136-
analysis: self.analysis.clone(),
137-
};
159+
let context = ServiceRunContext::new(service.clone(), importer.name.clone());
138160

139-
let (last_error, report, continuation) = match runner
140-
.run_once(
141-
context,
142-
importer.data.configuration,
143-
importer.data.last_success,
144-
importer.data.continuation,
145-
)
146-
.await
147-
{
148-
Ok(RunOutput {
161+
let (last_error, report, continuation) = match runner
162+
.run_once(
163+
context,
164+
importer.data.configuration,
165+
importer.data.last_success,
166+
importer.data.continuation,
167+
)
168+
.await
169+
{
170+
Ok(RunOutput {
171+
report,
172+
continuation,
173+
}) => (None, Some(report), continuation),
174+
Err(ScannerError::Normal {
175+
err,
176+
output: RunOutput {
149177
report,
150178
continuation,
151-
}) => (None, Some(report), continuation),
152-
Err(ScannerError::Normal {
153-
err,
154-
output:
155-
RunOutput {
156-
report,
157-
continuation,
158-
},
159-
}) => (Some(err.to_string()), Some(report), continuation),
160-
Err(ScannerError::Critical(err)) => (Some(err.to_string()), None, None),
161-
};
179+
},
180+
}) => (Some(err.to_string()), Some(report), continuation),
181+
Err(ScannerError::Critical(err)) => (Some(err.to_string()), None, None),
182+
};
162183

163-
log::info!("Import run complete: {last_error:?}");
184+
log::info!("Import run complete: {last_error:?}");
164185

165-
service
166-
.update_finish(
167-
&importer.name,
168-
None,
169-
last_run,
170-
last_error,
171-
continuation,
172-
report.and_then(|report| serde_json::to_value(report).ok()),
173-
)
174-
.await?;
186+
service
187+
.update_finish(
188+
&importer.name,
189+
None,
190+
last_run,
191+
last_error,
192+
continuation,
193+
report.and_then(|report| serde_json::to_value(report).ok()),
194+
)
195+
.await?;
175196

176-
Ok(())
177-
}
197+
Ok(())
178198
}
179199

180200
/// check if we need to run or skip the importer

0 commit comments

Comments
 (0)