Skip to content

Commit 6f6fced

Browse files
authored
feat: implement Barrier using Condvar
1 parent 10f7abb commit 6f6fced

File tree

2 files changed

+16
-46
lines changed

2 files changed

+16
-46
lines changed

Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ default = [
3131
"pin-project-lite",
3232
]
3333
docs = ["attributes", "unstable", "default"]
34-
unstable = ["std", "broadcaster"]
34+
unstable = ["std"]
3535
attributes = ["async-attributes"]
3636
std = [
3737
"alloc",
@@ -55,7 +55,6 @@ alloc = [
5555
[dependencies]
5656
async-attributes = { version = "1.1.1", optional = true }
5757
async-task = { version = "3.0.0", optional = true }
58-
broadcaster = { version = "1.0.0", optional = true }
5958
crossbeam-utils = { version = "0.7.2", optional = true }
6059
futures-core = { version = "0.3.4", optional = true, default-features = false }
6160
futures-io = { version = "0.3.4", optional = true }

src/sync/barrier.rs

+15-44
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
1-
use broadcaster::BroadcastChannel;
2-
3-
use crate::sync::Mutex;
1+
use crate::sync::{Condvar,Mutex};
42

53
/// A barrier enables multiple tasks to synchronize the beginning
64
/// of some computation.
@@ -36,14 +34,13 @@ use crate::sync::Mutex;
3634
#[derive(Debug)]
3735
pub struct Barrier {
3836
state: Mutex<BarrierState>,
39-
wait: BroadcastChannel<(usize, usize)>,
40-
n: usize,
37+
cvar: Condvar,
38+
num_tasks: usize,
4139
}
4240

4341
// The inner state of a double barrier
4442
#[derive(Debug)]
4543
struct BarrierState {
46-
waker: BroadcastChannel<(usize, usize)>,
4744
count: usize,
4845
generation_id: usize,
4946
}
@@ -81,25 +78,14 @@ impl Barrier {
8178
///
8279
/// let barrier = Barrier::new(10);
8380
/// ```
84-
pub fn new(mut n: usize) -> Barrier {
85-
let waker = BroadcastChannel::new();
86-
let wait = waker.clone();
87-
88-
if n == 0 {
89-
// if n is 0, it's not clear what behavior the user wants.
90-
// in std::sync::Barrier, an n of 0 exhibits the same behavior as n == 1, where every
91-
// .wait() immediately unblocks, so we adopt that here as well.
92-
n = 1;
93-
}
94-
81+
pub fn new(n: usize) -> Barrier {
9582
Barrier {
9683
state: Mutex::new(BarrierState {
97-
waker,
9884
count: 0,
9985
generation_id: 1,
10086
}),
101-
n,
102-
wait,
87+
cvar: Condvar::new(),
88+
num_tasks: n,
10389
}
10490
}
10591

@@ -143,35 +129,20 @@ impl Barrier {
143129
/// # });
144130
/// ```
145131
pub async fn wait(&self) -> BarrierWaitResult {
146-
let mut lock = self.state.lock().await;
147-
let local_gen = lock.generation_id;
148-
149-
lock.count += 1;
132+
let mut state = self.state.lock().await;
133+
let local_gen = state.generation_id;
134+
state.count += 1;
150135

151-
if lock.count < self.n {
152-
let mut wait = self.wait.clone();
153-
154-
let mut generation_id = lock.generation_id;
155-
let mut count = lock.count;
156-
157-
drop(lock);
158-
159-
while local_gen == generation_id && count < self.n {
160-
let (g, c) = wait.recv().await.expect("sender has not been closed");
161-
generation_id = g;
162-
count = c;
136+
if state.count < self.num_tasks {
137+
while local_gen == state.generation_id && state.count < self.num_tasks {
138+
state = self.cvar.wait(state).await;
163139
}
164140

165141
BarrierWaitResult(false)
166142
} else {
167-
lock.count = 0;
168-
lock.generation_id = lock.generation_id.wrapping_add(1);
169-
170-
lock.waker
171-
.send(&(lock.generation_id, lock.count))
172-
.await
173-
.expect("there should be at least one receiver");
174-
143+
state.count = 0;
144+
state.generation_id = state.generation_id.wrapping_add(1);
145+
self.cvar.notify_all();
175146
BarrierWaitResult(true)
176147
}
177148
}

0 commit comments

Comments
 (0)