Skip to content

Commit

Permalink
Merge pull request #215 from hatoo/fix-canceling
Browse files Browse the repository at this point in the history
Fix work canceling issue
  • Loading branch information
hatoo authored Feb 19, 2023
2 parents dc05708 + cf6078c commit b6cba31
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 39 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# Unreleased

- Fix `-z` behaviour to cancel workers at the dead line.
- Fix align of histogram #210

# 0.5.6 (2023-02-02)
Expand Down
71 changes: 32 additions & 39 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@ pub async fn work_until(
.map(|_| {
let report_tx = report_tx.clone();
let mut w = client_builder.build();
tokio::spawn(tokio::time::timeout_at(dead_line.into(), async move {
tokio::spawn(async move {
loop {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
Expand All @@ -687,12 +687,13 @@ pub async fn work_until(
break;
}
}
}))
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
for f in futures {
let _ = f.await;
f.abort();
}
}

Expand All @@ -707,7 +708,7 @@ pub async fn work_until_with_qps(
) {
let (tx, rx) = flume::bounded(qps);

let gen = tokio::spawn(async move {
tokio::spawn(async move {
for i in 0.. {
if std::time::Instant::now() > dead_line {
break;
Expand All @@ -728,27 +729,23 @@ pub async fn work_until_with_qps(
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::time::timeout_at(
dead_line.into(),
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
tokio::spawn(async move {
while let Ok(()) = rx.recv_async().await {
let res = w.work().await;
let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
}),
)
}
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
for f in futures {
let _ = f.await;
f.abort();
}

let _ = gen.await;
}

/// Run until dead_line by n workers limit to qps works in a second with latency correction
Expand All @@ -762,7 +759,7 @@ pub async fn work_until_with_qps_latency_correction(
) {
let (tx, rx) = flume::unbounded();

let gen = tokio::spawn(async move {
tokio::spawn(async move {
for i in 0.. {
let now = std::time::Instant::now();
if now > dead_line {
Expand All @@ -784,30 +781,26 @@ pub async fn work_until_with_qps_latency_correction(
let mut w = client_builder.build();
let report_tx = report_tx.clone();
let rx = rx.clone();
tokio::time::timeout_at(
dead_line.into(),
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res = w.work().await;

if let Ok(request_result) = &mut res {
request_result.start = start;
}
tokio::spawn(async move {
while let Ok(start) = rx.recv_async().await {
let mut res = w.work().await;

let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
if let Ok(request_result) = &mut res {
request_result.start = start;
}
}),
)

let is_cancel = is_too_many_open_files(&res);
report_tx.send_async(res).await.unwrap();
if is_cancel {
break;
}
}
})
})
.collect::<Vec<_>>();

tokio::time::sleep_until(dead_line.into()).await;
for f in futures {
let _ = f.await;
f.abort();
}

let _ = gen.await;
}

0 comments on commit b6cba31

Please sign in to comment.