Skip to content

Commit f5883d5

Browse files
Merge pull request #706 from Mark-Simulacrum/better-status
Progress reporting data collection
2 parents 9e8ac65 + 0f2f247 commit f5883d5

File tree

6 files changed

+147
-412
lines changed

6 files changed

+147
-412
lines changed

collector/src/main.rs

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
extern crate clap;
55

66
use anyhow::{bail, Context};
7-
use database::{pool::Connection, ArtifactId, Commit};
7+
use database::{ArtifactId, Commit};
88
use log::debug;
99
use std::collections::HashSet;
1010
use std::fs;
@@ -185,7 +185,7 @@ impl BenchmarkErrors {
185185

186186
fn bench(
187187
rt: &mut Runtime,
188-
mut conn: Box<dyn Connection>,
188+
pool: database::Pool,
189189
cid: &ArtifactId,
190190
build_kinds: &[BuildKind],
191191
run_kinds: &[RunKind],
@@ -194,6 +194,15 @@ fn bench(
194194
iterations: usize,
195195
self_profile: bool,
196196
) -> BenchmarkErrors {
197+
let mut conn = rt.block_on(pool.connection());
198+
let status_conn;
199+
let status_conn: Option<&dyn database::Connection> =
200+
if conn.separate_transaction_for_collector() {
201+
status_conn = rt.block_on(pool.connection());
202+
Some(&*status_conn)
203+
} else {
204+
None
205+
};
197206
let mut errors = BenchmarkErrors::new();
198207
eprintln!("Benchmarking {} for triple {}", cid, compiler.triple);
199208

@@ -223,7 +232,21 @@ fn bench(
223232
let interned_cid = rt.block_on(tx.conn().artifact_id(&cid));
224233

225234
let start = Instant::now();
235+
let steps = benchmarks
236+
.iter()
237+
.map(|b| b.name.to_string())
238+
.collect::<Vec<_>>();
239+
rt.block_on(
240+
status_conn
241+
.unwrap_or_else(|| &*tx.conn())
242+
.collector_start(interned_cid, &steps),
243+
);
226244
for (nth_benchmark, benchmark) in benchmarks.iter().enumerate() {
245+
rt.block_on(
246+
status_conn
247+
.unwrap_or_else(|| &*tx.conn())
248+
.collector_start_step(interned_cid, &benchmark.name.to_string()),
249+
);
227250
rt.block_on(
228251
tx.conn()
229252
.record_benchmark(benchmark.name.0.as_str(), benchmark.supports_stable()),
@@ -254,6 +277,11 @@ fn bench(
254277
&format!("{:?}", s),
255278
));
256279
};
280+
rt.block_on(
281+
status_conn
282+
.unwrap_or_else(|| &*tx.conn())
283+
.collector_end_step(interned_cid, &benchmark.name.to_string()),
284+
);
257285
}
258286
let end = start.elapsed();
259287

@@ -516,15 +544,14 @@ fn main_result() -> anyhow::Result<i32> {
516544
let self_profile = sub_m.is_present("SELF_PROFILE");
517545

518546
let pool = database::Pool::open(db);
519-
let conn = rt.block_on(pool.connection());
520547

521548
let (rustc, rustdoc, cargo) = get_local_toolchain(&build_kinds, rustc, rustdoc, cargo)?;
522549

523550
let benchmarks = get_benchmarks(&benchmark_dir, include, exclude)?;
524551

525552
let res = bench(
526553
&mut rt,
527-
conn,
554+
pool,
528555
&ArtifactId::Artifact(id.to_string()),
529556
&build_kinds,
530557
&run_kinds,
@@ -567,7 +594,6 @@ fn main_result() -> anyhow::Result<i32> {
567594
let commit = get_commit_or_fake_it(&commit)?;
568595

569596
let pool = database::Pool::open(db);
570-
let conn = rt.block_on(pool.connection());
571597

572598
let sysroot = Sysroot::install(commit.sha.to_string(), "x86_64-unknown-linux-gnu")
573599
.with_context(|| format!("failed to install sysroot for {:?}", commit))?;
@@ -576,7 +602,7 @@ fn main_result() -> anyhow::Result<i32> {
576602

577603
let res = bench(
578604
&mut rt,
579-
conn,
605+
pool,
580606
&ArtifactId::Commit(commit),
581607
&BuildKind::all(),
582608
&RunKind::all(),
@@ -608,7 +634,6 @@ fn main_result() -> anyhow::Result<i32> {
608634
}
609635

610636
let pool = database::Pool::open(db);
611-
let conn = rt.block_on(pool.connection());
612637

613638
let run_kinds = if collector::version_supports_incremental(toolchain) {
614639
RunKind::all()
@@ -647,7 +672,7 @@ fn main_result() -> anyhow::Result<i32> {
647672

648673
let res = bench(
649674
&mut rt,
650-
conn,
675+
pool,
651676
&ArtifactId::Artifact(toolchain.to_string()),
652677
&build_kinds,
653678
&run_kinds,

database/src/bin/ingest-json.rs

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -600,13 +600,6 @@ async fn main() {
600600
Pool::Postgres(mut p) => {
601601
postgres = Some(p.raw().open().await.into());
602602
}
603-
Pool::Both {
604-
sqlite: mut s,
605-
postgres: mut p,
606-
} => {
607-
sqlite = Some(s.raw().open().await.into_inner().unwrap());
608-
postgres = Some(p.raw().open().await.into());
609-
}
610603
}
611604

612605
if let Some(s) = &mut sqlite {

database/src/pool.rs

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use std::sync::{Arc, Mutex};
55
use std::time::Duration;
66
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
77

8-
pub mod both;
98
pub mod postgres;
109
pub mod sqlite;
1110

@@ -62,6 +61,24 @@ pub trait Connection: Send + Sync {
6261
async fn pr_attach_commit(&self, pr: u32, sha: &str, parent_sha: &str) -> bool;
6362
async fn queued_commits(&self) -> Vec<QueuedCommit>;
6463
async fn mark_complete(&self, sha: &str) -> Option<QueuedCommit>;
64+
65+
// Collector status API
66+
67+
async fn collector_start(&self, aid: ArtifactIdNumber, steps: &[String]);
68+
async fn collector_start_step(&self, aid: ArtifactIdNumber, step: &str);
69+
async fn collector_end_step(&self, aid: ArtifactIdNumber, step: &str);
70+
71+
// This returns `true` if the collector commands can be placed in a separate
72+
// transaction.
73+
//
74+
// Currently, the sqlite backend does not support "regular" usage where they
75+
// are used for genuine progress reporting. sqlite does not support
76+
// concurrent writers -- it will return an error (or wait, if a busy timeout
77+
// is configured).
78+
//
79+
// For now we don't care much as sqlite is not used in production and in
80+
// local usage you can just look at the logs.
81+
fn separate_transaction_for_collector(&self) -> bool;
6582
}
6683

6784
#[async_trait::async_trait]
@@ -160,43 +177,19 @@ where
160177
pub enum Pool {
161178
Sqlite(ConnectionPool<sqlite::Sqlite>),
162179
Postgres(ConnectionPool<postgres::Postgres>),
163-
Both {
164-
sqlite: ConnectionPool<sqlite::Sqlite>,
165-
postgres: ConnectionPool<postgres::Postgres>,
166-
},
167180
}
168181

169182
impl Pool {
170183
pub async fn connection(&self) -> Box<dyn Connection> {
171184
match self {
172185
Pool::Sqlite(p) => Box::new(sqlite::SqliteConnection::new(p.get().await)),
173186
Pool::Postgres(p) => Box::new(p.get().await),
174-
Pool::Both { sqlite, postgres } => Box::new(both::BothConnection::new(
175-
sqlite::SqliteConnection::new(sqlite.get().await),
176-
postgres.get().await,
177-
)),
178187
}
179188
}
180189

181190
pub fn open(uri: &str) -> Pool {
182191
if uri.starts_with("postgres") {
183192
Pool::Postgres(ConnectionPool::new(postgres::Postgres::new(uri.into())))
184-
} else if uri.starts_with("both://") {
185-
let mut parts = uri["both://".len()..].rsplitn(2, ';');
186-
let p1 = parts.next().unwrap();
187-
let p2 = parts.next().unwrap();
188-
match (Pool::open(p1), Pool::open(p2)) {
189-
(Pool::Sqlite(s), Pool::Postgres(p)) | (Pool::Postgres(p), Pool::Sqlite(s)) => {
190-
Pool::Both {
191-
sqlite: s,
192-
postgres: p,
193-
}
194-
}
195-
_ => panic!(
196-
"unsupported inputs, must be sqlite and postgres: {} and {}",
197-
p1, p2
198-
),
199-
}
200193
} else {
201194
Pool::Sqlite(ConnectionPool::new(sqlite::Sqlite::new(uri.into())))
202195
}

0 commit comments

Comments
 (0)