Skip to content

Commit d61db23

Browse files
committed
feat(common): bytes::Buf wrapper that notifies subscribers on EOS
1 parent 9214294 commit d61db23

File tree

2 files changed

+66
-0
lines changed

2 files changed

+66
-0
lines changed

src/common/buf.rs

+65
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
use hyper::body::Buf;
2+
use std::sync::atomic::{AtomicBool, Ordering};
3+
use std::sync::Arc;
4+
use tokio::sync::Notify;
5+
6+
#[derive(Clone)]
7+
pub struct EosSignaler {
8+
notifier: Arc<Notify>,
9+
}
10+
11+
impl EosSignaler {
12+
fn notify_eos(&self) {
13+
self.notifier.notify_waiters();
14+
}
15+
16+
pub async fn wait_till_eos(self) {
17+
self.notifier.notified().await;
18+
}
19+
}
20+
21+
pub struct AlertOnEos<B> {
22+
inner: B,
23+
signaler: EosSignaler,
24+
// I'd rather we consumed the signaler, but it would require something like AtomicOption,
25+
// arc_swap::ArcSwapOption was tried, but it only returns an Arc, and the value cannot be consumed (swapped).
26+
// One could write an AtomicOption type (like this https://docs.rs/atomic-option/0.1.2/atomic_option/),
27+
// but it requires both unsafe and additional heap allocation, which is not worth it.
28+
has_already_signaled: AtomicBool,
29+
}
30+
31+
impl<B> AlertOnEos<B> {
32+
pub fn new(inner: B) -> (Self, EosSignaler) {
33+
let signal = EosSignaler {
34+
notifier: Arc::new(Notify::new()),
35+
};
36+
let this = Self {
37+
inner,
38+
signaler: signal.clone(),
39+
has_already_signaled: AtomicBool::new(false),
40+
};
41+
(this, signal)
42+
}
43+
}
44+
45+
impl<B: Buf> Buf for AlertOnEos<B> {
46+
fn remaining(&self) -> usize {
47+
self.inner.remaining()
48+
}
49+
50+
fn chunk(&self) -> &[u8] {
51+
self.inner.chunk()
52+
}
53+
54+
fn advance(&mut self, cnt: usize) {
55+
self.inner.advance(cnt);
56+
if !self.inner.has_remaining() && !self.has_already_signaled.swap(true, Ordering::AcqRel) {
57+
self.signaler.notify_eos();
58+
}
59+
}
60+
}
61+
62+
#[cfg(test)]
63+
mod tests {
64+
65+
}

src/common/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,5 @@ macro_rules! ready {
1010
}
1111

1212
pub(crate) use ready;
13+
pub mod buf;
1314
pub(crate) mod exec;

0 commit comments

Comments
 (0)