Skip to content

Commit f4f5b45

Browse files
drain pattern
1 parent 3e57410 commit f4f5b45

File tree

1 file changed

+107
-0
lines changed

1 file changed

+107
-0
lines changed
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/// Experiments with the drain-to-tip pattern used for GRPC multiplexing.
2+
///
3+
use std::fmt::Display;
4+
use derive_more::Display;
5+
use futures::StreamExt;
6+
7+
use log::{debug, error, info, warn};
8+
use serde::Serializer;
9+
use tokio::select;
10+
use tokio::sync::broadcast::{Receiver, Sender};
11+
use tokio::time::{Duration, sleep};
12+
13+
#[derive(Debug, Clone, Display)]
14+
struct Message {
15+
slot: u64,
16+
}
17+
18+
impl Message {
19+
fn new(slot: u64) -> Self {
20+
Message { slot }
21+
}
22+
}
23+
24+
#[tokio::main]
25+
async fn main() {
26+
// RUST_LOG=info,drain_to_tip_pattern=debug
27+
tracing_subscriber::fmt::init();
28+
29+
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000);
30+
let (tx_tip, rx_tip) = tokio::sync::watch::channel::<Message>(Message::new(0));
31+
32+
start_progressor(rx, rx_tip.clone()).await;
33+
34+
send_stream(tx.clone()).await;
35+
36+
37+
info!("Blocking main thread for some time to allow the system to operate...");
38+
sleep(tokio::time::Duration::from_secs(4)).await;
39+
40+
info!("Shutting down....");
41+
drop(tx_tip);
42+
sleep(tokio::time::Duration::from_secs(1)).await;
43+
info!("Shutdown completed.");
44+
}
45+
46+
47+
48+
async fn start_progressor(blocks_notifier: Receiver<Message>, mut rx_tip: tokio::sync::watch::Receiver<Message>) {
49+
info!("Started progressor");
50+
tokio::spawn(async move {
51+
let mut blocks_notifier = blocks_notifier.resubscribe();
52+
let mut local_tip = Message::new(3);
53+
// block after tip offered by this stream
54+
let mut block_after_tip = Message::new(0);
55+
56+
'main_loop: loop {
57+
select! {
58+
result = rx_tip.changed() => {
59+
if result.is_err() {
60+
debug!("Tip variable closed");
61+
break 'main_loop;
62+
}
63+
local_tip = rx_tip.borrow_and_update().clone();
64+
info!("++> tip changed to {}", local_tip);
65+
// slow down in case of loop
66+
// sleep(Duration::from_millis(100)).await;
67+
}
68+
recv_result = blocks_notifier.recv(), if !(block_after_tip.slot > local_tip.slot) => {
69+
match recv_result {
70+
Ok(msg) => {
71+
info!("=> recv on: {}", msg);
72+
if msg.slot > local_tip.slot {
73+
info!("==> beyond tip ({} > {})", msg.slot, local_tip);
74+
block_after_tip = msg;
75+
// offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap();
76+
// this thread will sleep and not issue any recvs until we get tip.changed signal
77+
continue 'main_loop;
78+
}
79+
}
80+
Err(e) => {
81+
// TODO what to do?
82+
error!("Error receiving block: {}", e);
83+
break 'main_loop;
84+
}
85+
}
86+
}
87+
}
88+
} // -- main loop
89+
90+
info!("Shutting down progressor.");
91+
});
92+
}
93+
94+
95+
async fn send_stream(message_channel: Sender<Message>) {
96+
97+
for i in 0..10 {
98+
message_channel.send(Message::new(i)).unwrap();
99+
sleep(Duration::from_millis(300)).await;
100+
}
101+
102+
}
103+
104+
105+
106+
107+

0 commit comments

Comments
 (0)