Skip to content

Commit 817a9ca

Browse files
authored
Wait for target to be running in capture manager (#998)
* Add target running signal, wait on it in capture manager * Support no-target mode
1 parent a907335 commit 817a9ca

File tree

4 files changed

+29
-11
lines changed

4 files changed

+29
-11
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1010

1111
### Fixed
1212
- Warmup period is now respected when container targeting is in use.
13+
- Capture manager waits for the target to start running before recording data.
1314

1415
## [0.23.1]
1516
### Fixed

lading/src/bin/lading.rs

+7-7
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ async fn inner_main(
347347
) -> Result<(), Error> {
348348
let (shutdown_watcher, shutdown_broadcast) = lading_signal::signal();
349349
let (experiment_started_watcher, experiment_started_broadcast) = lading_signal::signal();
350+
let (target_running_watcher, target_running_broadcast) = lading_signal::signal();
350351

351352
// Set up the telemetry sub-system.
352353
//
@@ -390,6 +391,7 @@ async fn inner_main(
390391
path,
391392
shutdown_watcher.register()?,
392393
experiment_started_watcher.clone(),
394+
target_running_watcher.clone(),
393395
)
394396
.await?;
395397
for (k, v) in global_labels {
@@ -474,8 +476,6 @@ async fn inner_main(
474476
}
475477
}
476478

477-
let mut sequence_tgt_recv = tgt_snd.subscribe();
478-
479479
let mut tsrv_joinset = tokio::task::JoinSet::new();
480480
let mut osrv_joinset = tokio::task::JoinSet::new();
481481
//
@@ -491,20 +491,20 @@ async fn inner_main(
491491
// TARGET
492492
//
493493
let target_server = target::Server::new(target, shutdown_watcher.clone());
494-
tsrv_joinset.spawn(target_server.run(tgt_snd));
494+
tsrv_joinset.spawn(target_server.run(tgt_snd, target_running_broadcast));
495495
} else {
496-
// Many lading servers synchronize on target startup.
496+
// Many lading servers synchronize on target startup using the PID sender. Some by necessity, others by legacy.
497497
tgt_snd.send(None)?;
498+
// Newer usage prefers the `target_running` signal where the PID isn't needed.
499+
target_running_broadcast.signal();
498500
};
499501

500502
let (timer_watcher, timer_broadcast) = lading_signal::signal();
501503

502504
tokio::spawn(
503505
async move {
504506
info!("waiting for target startup");
505-
if let Err(e) = sequence_tgt_recv.recv().await {
506-
warn!("failed to wait: {:?}", e);
507-
}
507+
target_running_watcher.recv().await;
508508
info!("target is running, now sleeping for warmup");
509509
sleep(warmup_duration).await;
510510
experiment_started_broadcast.signal();

lading/src/captures.rs

+6
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ pub struct CaptureManager {
5959
capture_path: PathBuf,
6060
shutdown: lading_signal::Watcher,
6161
_experiment_started: lading_signal::Watcher,
62+
target_running: lading_signal::Watcher,
6263
inner: Arc<Inner>,
6364
global_labels: FxHashMap<String, String>,
6465
}
@@ -73,6 +74,7 @@ impl CaptureManager {
7374
capture_path: PathBuf,
7475
shutdown: lading_signal::Watcher,
7576
experiment_started: lading_signal::Watcher,
77+
target_running: lading_signal::Watcher,
7678
) -> Result<Self, io::Error> {
7779
let fp = tokio::fs::File::create(&capture_path).await?;
7880
let fp = fp.into_std().await;
@@ -83,6 +85,7 @@ impl CaptureManager {
8385
capture_path,
8486
shutdown,
8587
_experiment_started: experiment_started,
88+
target_running,
8689
inner: Arc::new(Inner {
8790
registry: Registry::atomic(),
8891
}),
@@ -190,6 +193,9 @@ impl CaptureManager {
190193
std::thread::Builder::new()
191194
.name("capture-manager".into())
192195
.spawn(move || {
196+
while let Ok(false) = self.target_running.try_recv() {
197+
std::thread::sleep(Duration::from_millis(100));
198+
}
193199
loop {
194200
if self.shutdown.try_recv().expect("polled after signal") {
195201
info!("shutdown signal received");

lading/src/target.rs

+15-4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ use std::{
2525
};
2626

2727
use bollard::Docker;
28+
use lading_signal::Broadcaster;
2829
use metrics::gauge;
2930
use nix::{
3031
errno::Errno,
@@ -221,7 +222,11 @@ impl Server {
221222
/// # Panics
222223
///
223224
/// None are known.
224-
pub async fn run(self, pid_snd: TargetPidSender) -> Result<(), Error> {
225+
pub async fn run(
226+
self,
227+
pid_snd: TargetPidSender,
228+
target_running: Broadcaster,
229+
) -> Result<(), Error> {
225230
let config = self.config;
226231

227232
// Note that each target mode has different expectations around target
@@ -230,13 +235,13 @@ impl Server {
230235
// signalled to exit.
231236
match config {
232237
Config::Pid(config) => {
233-
Self::watch(config, pid_snd, self.shutdown).await?;
238+
Self::watch(config, pid_snd, target_running, self.shutdown).await?;
234239
}
235240
Config::Docker(config) => {
236-
Self::watch_container(config, pid_snd, self.shutdown).await?;
241+
Self::watch_container(config, pid_snd, target_running, self.shutdown).await?;
237242
}
238243
Config::Binary(config) => {
239-
Self::execute_binary(config, pid_snd, self.shutdown).await?;
244+
Self::execute_binary(config, pid_snd, target_running, self.shutdown).await?;
240245
}
241246
}
242247

@@ -248,6 +253,7 @@ impl Server {
248253
async fn watch_container(
249254
config: DockerConfig,
250255
pid_snd: TargetPidSender,
256+
target_running: Broadcaster,
251257
shutdown: lading_signal::Watcher,
252258
) -> Result<(), Error> {
253259
let docker = Docker::connect_with_socket_defaults()?;
@@ -269,6 +275,7 @@ impl Server {
269275
pid.try_into().expect("cannot convert pid to 32 bit type"),
270276
))?;
271277
drop(pid_snd);
278+
target_running.signal();
272279

273280
// Use PIDfd to watch the target process (linux kernel 5.3 and up)
274281
#[cfg(target_os = "linux")]
@@ -329,6 +336,7 @@ impl Server {
329336
async fn watch(
330337
config: PidConfig,
331338
pid_snd: TargetPidSender,
339+
target_running: Broadcaster,
332340
shutdown: lading_signal::Watcher,
333341
) -> Result<(), Error> {
334342
// Convert pid config value to a plain i32 (no truncation concerns;
@@ -348,6 +356,7 @@ impl Server {
348356

349357
pid_snd.send(Some(config.pid.get()))?;
350358
drop(pid_snd);
359+
target_running.signal();
351360

352361
// Use PIDfd to watch the target process (linux kernel 5.3 and up)
353362
#[cfg(target_os = "linux")]
@@ -406,6 +415,7 @@ impl Server {
406415
async fn execute_binary(
407416
config: BinaryConfig,
408417
pid_snd: TargetPidSender,
418+
target_running: Broadcaster,
409419
shutdown: lading_signal::Watcher,
410420
) -> Result<ExitStatus, Error> {
411421
let mut target_cmd = Command::new(config.command);
@@ -424,6 +434,7 @@ impl Server {
424434
let target_id = target_child.id().ok_or(Error::ProcessFinished)?;
425435
pid_snd.send(Some(target_id))?;
426436
drop(pid_snd);
437+
target_running.signal();
427438

428439
let mut interval = time::interval(Duration::from_secs(400));
429440
let shutdown_wait = shutdown.recv();

0 commit comments

Comments
 (0)