Skip to content

Commit ef542d5

Browse files
committed
Introduce more liveness with local spawning and no joining
Relates #1207 Allows enabled/disabled state of jobs to be reflected immediately when manipulated by the UX. Signed-off-by: Jim Crossley <[email protected]>
1 parent 33670c5 commit ef542d5

File tree

4 files changed

+87
-64
lines changed

4 files changed

+87
-64
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

modules/importer/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ chrono = { workspace = true }
1919
csaf = { workspace = true }
2020
csaf-walker = { workspace = true, features = ["crypto-openssl", "csaf"] }
2121
cve = { workspace = true }
22-
futures = { workspace = true }
2322
git2 = { workspace = true }
2423
humantime = { workspace = true }
2524
humantime-serde = { workspace = true }

modules/importer/src/runner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use trustify_common::db::Database;
2323
use trustify_module_analysis::service::AnalysisService;
2424
use trustify_module_storage::service::dispatch::DispatchBackend;
2525

26+
#[derive(Clone)]
2627
pub struct ImportRunner {
2728
pub db: Database,
2829
pub storage: DispatchBackend,

modules/importer/src/server/mod.rs

Lines changed: 86 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,12 @@ use crate::{
1010
server::context::ServiceRunContext,
1111
service::{Error, ImporterService},
1212
};
13-
use std::{path::PathBuf, time::Duration};
13+
use std::{collections::HashMap, path::PathBuf, time::Duration};
1414
use time::OffsetDateTime;
15-
use tokio::time::MissedTickBehavior;
15+
use tokio::{
16+
task::{spawn_local, JoinHandle, LocalSet},
17+
time::MissedTickBehavior,
18+
};
1619
use tracing::instrument;
1720
use trustify_common::db::Database;
1821
use trustify_module_analysis::service::AnalysisService;
@@ -63,26 +66,51 @@ struct Server {
6366

6467
impl Server {
6568
#[instrument(skip_all, ret)]
66-
async fn run(&self) -> anyhow::Result<()> {
67-
let service = ImporterService::new(self.db.clone());
69+
async fn run(self) -> anyhow::Result<()> {
70+
LocalSet::new().run_until(self.run_local()).await
71+
}
6872

73+
async fn run_local(self) -> anyhow::Result<()> {
74+
let service = ImporterService::new(self.db.clone());
75+
let runner = ImportRunner {
76+
db: self.db.clone(),
77+
storage: self.storage.clone(),
78+
working_dir: self.working_dir.clone(),
79+
analysis: self.analysis.clone(),
80+
};
6981
self.reset_all_jobs(&service).await?;
7082

7183
let mut interval = tokio::time::interval(Duration::from_secs(1));
7284
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
7385

86+
// Maintain a list of currently running jobs
87+
let mut runs = HashMap::<String, JoinHandle<_>>::default();
88+
7489
loop {
7590
interval.tick().await;
76-
futures::future::join_all(
77-
service
78-
.list()
79-
.await?
80-
.into_iter()
81-
.filter(|i| !(i.data.configuration.disabled || can_wait(i)))
82-
.take(self.concurrency)
83-
.map(|i| self.import(i, &service)),
84-
)
85-
.await;
91+
92+
// Remove jobs that are finished
93+
runs.retain(|_, job| !job.is_finished());
94+
95+
// Asynchronously fire off new jobs subject to max concurrency
96+
let todo: Vec<_> = service
97+
.list()
98+
.await?
99+
.into_iter()
100+
.filter(|i| {
101+
!(runs.contains_key(&i.name) || i.data.configuration.disabled || can_wait(i))
102+
})
103+
.take(self.concurrency - runs.len())
104+
.map(|i| {
105+
(
106+
i.name.clone(),
107+
spawn_local(import(runner.clone(), i, service.clone())),
108+
)
109+
})
110+
.collect();
111+
112+
// Add them to our list of currently running jobs
113+
runs.extend(todo.into_iter());
86114
}
87115
}
88116

@@ -116,65 +144,61 @@ impl Server {
116144

117145
Ok(())
118146
}
147+
}
119148

120-
async fn import(&self, importer: Importer, service: &ImporterService) -> Result<(), Error> {
121-
log::debug!(" {}: {:?}", importer.name, importer.data.configuration);
122-
123-
service.update_start(&importer.name, None).await?;
149+
async fn import(
150+
runner: ImportRunner,
151+
importer: Importer,
152+
service: ImporterService,
153+
) -> Result<(), Error> {
154+
log::debug!(" {}: {:?}", importer.name, importer.data.configuration);
124155

125-
// record timestamp before processing, so that we can use it as "since" marker
126-
let last_run = OffsetDateTime::now_utc();
156+
service.update_start(&importer.name, None).await?;
127157

128-
log::info!("Starting run: {}", importer.name);
158+
// record timestamp before processing, so that we can use it as "since" marker
159+
let last_run = OffsetDateTime::now_utc();
129160

130-
let context = ServiceRunContext::new(service.clone(), importer.name.clone());
161+
log::info!("Starting run: {}", importer.name);
131162

132-
let runner = ImportRunner {
133-
db: self.db.clone(),
134-
storage: self.storage.clone(),
135-
working_dir: self.working_dir.clone(),
136-
analysis: self.analysis.clone(),
137-
};
163+
let context = ServiceRunContext::new(service.clone(), importer.name.clone());
138164

139-
let (last_error, report, continuation) = match runner
140-
.run_once(
141-
context,
142-
importer.data.configuration,
143-
importer.data.last_success,
144-
importer.data.continuation,
145-
)
146-
.await
147-
{
148-
Ok(RunOutput {
165+
let (last_error, report, continuation) = match runner
166+
.run_once(
167+
context,
168+
importer.data.configuration,
169+
importer.data.last_success,
170+
importer.data.continuation,
171+
)
172+
.await
173+
{
174+
Ok(RunOutput {
175+
report,
176+
continuation,
177+
}) => (None, Some(report), continuation),
178+
Err(ScannerError::Normal {
179+
err,
180+
output: RunOutput {
149181
report,
150182
continuation,
151-
}) => (None, Some(report), continuation),
152-
Err(ScannerError::Normal {
153-
err,
154-
output:
155-
RunOutput {
156-
report,
157-
continuation,
158-
},
159-
}) => (Some(err.to_string()), Some(report), continuation),
160-
Err(ScannerError::Critical(err)) => (Some(err.to_string()), None, None),
161-
};
183+
},
184+
}) => (Some(err.to_string()), Some(report), continuation),
185+
Err(ScannerError::Critical(err)) => (Some(err.to_string()), None, None),
186+
};
162187

163-
log::info!("Import run complete: {last_error:?}");
188+
log::info!("Import run complete: {last_error:?}");
164189

165-
service
166-
.update_finish(
167-
&importer.name,
168-
None,
169-
last_run,
170-
last_error,
171-
continuation,
172-
report.and_then(|report| serde_json::to_value(report).ok()),
173-
)
174-
.await?;
190+
service
191+
.update_finish(
192+
&importer.name,
193+
None,
194+
last_run,
195+
last_error,
196+
continuation,
197+
report.and_then(|report| serde_json::to_value(report).ok()),
198+
)
199+
.await?;
175200

176-
Ok(())
177-
}
201+
Ok(())
178202
}
179203

180204
/// check if we need to run or skip the importer

0 commit comments

Comments
 (0)