Skip to content

Commit 3d123d0

Browse files
clarify the highest block
1 parent f4f5b45 commit 3d123d0

File tree

3 files changed

+53
-25
lines changed

3 files changed

+53
-25
lines changed

cluster-endpoints/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ futures = { workspace = true }
2727
bytes = { workspace = true }
2828
anyhow = { workspace = true }
2929
log = { workspace = true }
30+
tracing = { workspace = true }
3031
dashmap = { workspace = true }
3132
quinn = { workspace = true }
3233
chrono = { workspace = true }

cluster-endpoints/examples/drain_to_tip_pattern.rs

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,35 @@ async fn main() {
2727
tracing_subscriber::fmt::init();
2828

2929
let (tx, rx) = tokio::sync::broadcast::channel::<Message>(1000);
30-
let (tx_tip, rx_tip) = tokio::sync::watch::channel::<Message>(Message::new(0));
30+
let (tx_tip, _) = tokio::sync::watch::channel::<Message>(Message::new(0));
3131

32-
start_progressor(rx, rx_tip.clone()).await;
32+
start_progressor(rx, tx_tip.subscribe()).await;
3333

3434
send_stream(tx.clone()).await;
3535

36+
// move tip; current tip is 3; next offered slot is 4
37+
info!("Force tip to 6");
38+
tx_tip.send(Message::new(6)).unwrap();
3639

3740
info!("Blocking main thread for some time to allow the system to operate...");
3841
sleep(tokio::time::Duration::from_secs(4)).await;
42+
info!("Num broadcast subscribers: {}", tx_tip.receiver_count());
3943

4044
info!("Shutting down....");
4145
drop(tx_tip);
42-
sleep(tokio::time::Duration::from_secs(1)).await;
46+
sleep(Duration::from_secs(1)).await;
4347
info!("Shutdown completed.");
4448
}
4549

4650

4751

48-
async fn start_progressor(blocks_notifier: Receiver<Message>, mut rx_tip: tokio::sync::watch::Receiver<Message>) {
52+
async fn start_progressor(mut blocks_notifier: Receiver<Message>, mut rx_tip: tokio::sync::watch::Receiver<Message>) {
4953
info!("Started progressor");
5054
tokio::spawn(async move {
51-
let mut blocks_notifier = blocks_notifier.resubscribe();
5255
let mut local_tip = Message::new(3);
5356
// block after tip offered by this stream
54-
let mut block_after_tip = Message::new(0);
57+
// TODO: block_after_tip is only valid/useful if greater than tip
58+
let mut highest_block = Message::new(0);
5559

5660
'main_loop: loop {
5761
select! {
@@ -62,16 +66,20 @@ async fn start_progressor(blocks_notifier: Receiver<Message>, mut rx_tip: tokio:
6266
}
6367
local_tip = rx_tip.borrow_and_update().clone();
6468
info!("++> tip changed to {}", local_tip);
69+
if local_tip.slot >= highest_block.slot {
70+
info!("!! next offered slot is invalid: {} >= {}", local_tip, highest_block.slot);
71+
}
6572
// slow down in case of loop
6673
// sleep(Duration::from_millis(100)).await;
6774
}
68-
recv_result = blocks_notifier.recv(), if !(block_after_tip.slot > local_tip.slot) => {
75+
recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip.slot) => {
76+
debug!("block_after_tip.slot > local_tip.slot: {} > {}", highest_block.slot, local_tip.slot);
6977
match recv_result {
7078
Ok(msg) => {
71-
info!("=> recv on: {}", msg);
79+
info!("=> recv: {}", msg);
7280
if msg.slot > local_tip.slot {
73-
info!("==> beyond tip ({} > {})", msg.slot, local_tip);
74-
block_after_tip = msg;
81+
info!("==> offer next slot ({} -> {})", local_tip, msg.slot);
82+
highest_block = msg;
7583
// offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap();
7684
// this thread will sleep and not issue any recvs until we get tip.changed signal
7785
continue 'main_loop;
@@ -94,11 +102,18 @@ async fn start_progressor(blocks_notifier: Receiver<Message>, mut rx_tip: tokio:
94102

95103
async fn send_stream(message_channel: Sender<Message>) {
96104

105+
// tip is 3
106+
107+
// drain 0 to 3; offer 4, then block
97108
for i in 0..10 {
109+
info!("sending {}", i);
98110
message_channel.send(Message::new(i)).unwrap();
111+
info!("queue size: {}", message_channel.len());
99112
sleep(Duration::from_millis(300)).await;
100113
}
101114

115+
assert_eq!(message_channel.len(), 5);
116+
102117
}
103118

104119

cluster-endpoints/examples/stream_via_grpc.rs

Lines changed: 27 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ use std::ops::Deref;
44
use std::path::PathBuf;
55
use std::sync::Arc;
66
use std::thread;
7-
use std::time::Duration;
87
use futures::StreamExt;
98
use itertools::{ExactlyOneError, Itertools};
109

@@ -16,7 +15,7 @@ use tokio::select;
1615
use tokio::sync::broadcast::{Receiver, Sender};
1716
use tokio::sync::broadcast::error::TryRecvError;
1817
use tokio::sync::RwLock;
19-
use tokio::time::{sleep, timeout};
18+
use tokio::time::{sleep, Duration, timeout};
2019
use yellowstone_grpc_proto::geyser::CommitmentLevel;
2120

2221
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_block_processing_task;
@@ -25,7 +24,8 @@ use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
2524

2625
pub const GRPC_VERSION: &str = "1.16.1";
2726

28-
#[tokio::main(flavor = "multi_thread", worker_threads = 16)]
27+
#[tokio::main]
28+
// #[tokio::main(flavor = "multi_thread", worker_threads = 16)]
2929
pub async fn main() {
3030
// info,solana_lite_rpc_cluster_endpoints=debug,stream_via_grpc=trace
3131
tracing_subscriber::fmt::init();
@@ -40,8 +40,8 @@ pub async fn main() {
4040
// testnet - NOTE: this connection has terrible lags (almost 5 minutes)
4141
// let grpc_addr = "http://147.28.169.13:10000".to_string();
4242

43-
let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000);
44-
// let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::<ProducedBlock>(1000);
43+
// let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000);
44+
let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::<ProducedBlock>(1000);
4545
let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000);
4646

4747
let grpc_x_token = None;
@@ -71,8 +71,8 @@ pub async fn main() {
7171

7272
let (offer_block_sender, mut offer_block_notifier) = tokio::sync::mpsc::channel::<OfferBlockMsg>(100);
7373

74-
start_progressor("green".to_string(), blocks_notifier_green, rx_tip.clone(), offer_block_sender.clone());
75-
start_progressor("blue".to_string(), blocks_notifier_blue, rx_tip.clone(), offer_block_sender.clone());
74+
start_progressor("green".to_string(), blocks_notifier_green, rx_tip.clone(), offer_block_sender.clone()).await;
75+
start_progressor("blue".to_string(), blocks_notifier_blue, rx_tip.clone(), offer_block_sender.clone()).await;
7676

7777

7878
// test
@@ -167,7 +167,13 @@ pub async fn main() {
167167
});
168168

169169
// "infinite" sleep
170-
sleep(Duration::from_secs(1800)).await
170+
sleep(Duration::from_secs(1800)).await;
171+
172+
info!("Shutting down...");
173+
info!("...tip variable");
174+
drop(tx_tip);
175+
info!("Shutdown completed.");
176+
171177
}
172178

173179
#[derive(Clone, Debug)]
@@ -190,9 +196,10 @@ enum OfferBlockMsg {
190196
NextSlot(String, BlockRef),
191197
}
192198

193-
fn start_progressor(label: String, blocks_notifier: Receiver<ProducedBlock>, mut rx_tip: tokio::sync::watch::Receiver<Slot>,
199+
async fn start_progressor(label: String, blocks_notifier: Receiver<ProducedBlock>, mut rx_tip: tokio::sync::watch::Receiver<Slot>,
194200
offer_block_sender: tokio::sync::mpsc::Sender<OfferBlockMsg>) {
195201
tokio::spawn(async move {
202+
// TODO is .resubscribe what we want?
196203
let mut blocks_notifier = blocks_notifier.resubscribe();
197204
// for test only
198205
// let start_slot = blocks_notifier.recv().await.unwrap().slot;
@@ -201,33 +208,38 @@ fn start_progressor(label: String, blocks_notifier: Receiver<ProducedBlock>, mut
201208
let mut local_tip = 0;
202209

203210
// block after tip offered by this stream
204-
let mut block_after_tip: BlockRef = BlockRef {
211+
// TODO: block_after_tip is only valid/useful if greater than tip
212+
let mut highest_block: BlockRef = BlockRef {
205213
slot: 0,
206214
parent_slot: 0,
207215
};
208216
'main_loop: loop {
209217
select! {
210-
_ = rx_tip.changed() => {
218+
result = rx_tip.changed() => {
219+
if result.is_err() {
220+
debug!("Tip variable closed for {}", label);
221+
break 'main_loop;
222+
}
211223
local_tip = rx_tip.borrow_and_update().clone();
212224
info!("++> {} tip changed to {}", label, local_tip);
213225
// TODO update local tip
214226
}
215-
recv_result = blocks_notifier.recv(), if !(block_after_tip.slot > local_tip) => {
227+
recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip) => {
216228
match recv_result {
217229
Ok(block) => {
218230
info!("=> recv on {}: {}",label, format_block(&block));
219231
if block.slot > local_tip {
220232
info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip);
221-
block_after_tip = BlockRef::from(block);
222-
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap();
233+
highest_block = BlockRef::from(block);
234+
offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), highest_block.clone())).await.unwrap();
223235
// this thread will sleep and not issue any recvs until we get tip.changed signal
224236
continue 'main_loop;
225237
}
226238
}
227239
Err(e) => {
228240
// TODO what to do?
229241
error!("Error receiving block: {}", e);
230-
continue 'main_loop;
242+
break 'main_loop;
231243
}
232244
}
233245
}

0 commit comments

Comments
 (0)