From 3d123d02cbad9e614d1e384dc61ce8cdc75d8557 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Thu, 7 Dec 2023 15:09:56 +0100 Subject: [PATCH] clarify the highest block --- cluster-endpoints/Cargo.toml | 1 + .../examples/drain_to_tip_pattern.rs | 35 +++++++++++----- cluster-endpoints/examples/stream_via_grpc.rs | 42 ++++++++++++------- 3 files changed, 53 insertions(+), 25 deletions(-) diff --git a/cluster-endpoints/Cargo.toml b/cluster-endpoints/Cargo.toml index 4b44b4ca..20b57f4a 100644 --- a/cluster-endpoints/Cargo.toml +++ b/cluster-endpoints/Cargo.toml @@ -27,6 +27,7 @@ futures = { workspace = true } bytes = { workspace = true } anyhow = { workspace = true } log = { workspace = true } +tracing = { workspace = true } dashmap = { workspace = true } quinn = { workspace = true } chrono = { workspace = true } diff --git a/cluster-endpoints/examples/drain_to_tip_pattern.rs b/cluster-endpoints/examples/drain_to_tip_pattern.rs index 0b8ecb67..0471fb31 100644 --- a/cluster-endpoints/examples/drain_to_tip_pattern.rs +++ b/cluster-endpoints/examples/drain_to_tip_pattern.rs @@ -27,31 +27,35 @@ async fn main() { tracing_subscriber::fmt::init(); let (tx, rx) = tokio::sync::broadcast::channel::(1000); - let (tx_tip, rx_tip) = tokio::sync::watch::channel::(Message::new(0)); + let (tx_tip, _) = tokio::sync::watch::channel::(Message::new(0)); - start_progressor(rx, rx_tip.clone()).await; + start_progressor(rx, tx_tip.subscribe()).await; send_stream(tx.clone()).await; + // move tip; current tip is 3; next offered slot is 4 + info!("Force tip to 6"); + tx_tip.send(Message::new(6)).unwrap(); info!("Blocking main thread for some time to allow the system to operate..."); sleep(tokio::time::Duration::from_secs(4)).await; + info!("Num broadcast subscribers: {}", tx_tip.receiver_count()); info!("Shutting down...."); drop(tx_tip); - sleep(tokio::time::Duration::from_secs(1)).await; + sleep(Duration::from_secs(1)).await; info!("Shutdown completed."); } -async fn start_progressor(blocks_notifier: Receiver, mut rx_tip: tokio::sync::watch::Receiver) { +async fn start_progressor(mut blocks_notifier: Receiver, mut rx_tip: tokio::sync::watch::Receiver) { info!("Started progressor"); tokio::spawn(async move { - let mut blocks_notifier = blocks_notifier.resubscribe(); let mut local_tip = Message::new(3); // block after tip offered by this stream - let mut block_after_tip = Message::new(0); + // TODO: block_after_tip is only valid/useful if greater than tip + let mut highest_block = Message::new(0); 'main_loop: loop { select! { @@ -62,16 +66,20 @@ async fn start_progressor(blocks_notifier: Receiver, mut rx_tip: tokio: } local_tip = rx_tip.borrow_and_update().clone(); info!("++> tip changed to {}", local_tip); + if local_tip.slot >= highest_block.slot { + info!("!! next offered slot is invalid: {} >= {}", local_tip, highest_block.slot); + } // slow down in case of loop // sleep(Duration::from_millis(100)).await; } - recv_result = blocks_notifier.recv(), if !(block_after_tip.slot > local_tip.slot) => { + recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip.slot) => { + debug!("block_after_tip.slot > local_tip.slot: {} > {}", highest_block.slot, local_tip.slot); match recv_result { Ok(msg) => { - info!("=> recv on: {}", msg); + info!("=> recv: {}", msg); if msg.slot > local_tip.slot { - info!("==> beyond tip ({} > {})", msg.slot, local_tip); - block_after_tip = msg; + info!("==> offer next slot ({} -> {})", local_tip, msg.slot); + highest_block = msg; // offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap(); // this thread will sleep and not issue any recvs until we get tip.changed signal continue 'main_loop; @@ -94,11 +102,18 @@ async fn start_progressor(blocks_notifier: Receiver, mut rx_tip: tokio: async fn send_stream(message_channel: Sender) { + // tip is 3 + + // drain 0 to 3; offer 4, then block for i in 0..10 { + info!("sending {}", i); message_channel.send(Message::new(i)).unwrap(); + info!("queue size: {}", message_channel.len()); sleep(Duration::from_millis(300)).await; } + assert_eq!(message_channel.len(), 5); + } diff --git a/cluster-endpoints/examples/stream_via_grpc.rs b/cluster-endpoints/examples/stream_via_grpc.rs index 846188fb..c183ae85 100644 --- a/cluster-endpoints/examples/stream_via_grpc.rs +++ b/cluster-endpoints/examples/stream_via_grpc.rs @@ -4,7 +4,6 @@ use std::ops::Deref; use std::path::PathBuf; use std::sync::Arc; use std::thread; -use std::time::Duration; use futures::StreamExt; use itertools::{ExactlyOneError, Itertools}; @@ -16,7 +15,7 @@ use tokio::select; use tokio::sync::broadcast::{Receiver, Sender}; use tokio::sync::broadcast::error::TryRecvError; use tokio::sync::RwLock; -use tokio::time::{sleep, timeout}; +use tokio::time::{sleep, Duration, timeout}; use yellowstone_grpc_proto::geyser::CommitmentLevel; use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_block_processing_task; @@ -25,7 +24,8 @@ use solana_lite_rpc_core::structures::produced_block::ProducedBlock; pub const GRPC_VERSION: &str = "1.16.1"; -#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +#[tokio::main] +// #[tokio::main(flavor = "multi_thread", worker_threads = 16)] pub async fn main() { // info,solana_lite_rpc_cluster_endpoints=debug,stream_via_grpc=trace tracing_subscriber::fmt::init(); @@ -40,8 +40,8 @@ pub async fn main() { // testnet - NOTE: this connection has terrible lags (almost 5 minutes) // let grpc_addr = "http://147.28.169.13:10000".to_string(); - let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000); - // let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::(1000); + // let (block_sx_green, blocks_notifier_green) = tokio::sync::broadcast::channel(1000); + let (block_sx_green, blocks_notifier_green) = start_monkey_broadcast::(1000); let (block_sx_blue, blocks_notifier_blue) = tokio::sync::broadcast::channel(1000); let grpc_x_token = None; @@ -71,8 +71,8 @@ pub async fn main() { let (offer_block_sender, mut offer_block_notifier) = tokio::sync::mpsc::channel::(100); - start_progressor("green".to_string(), blocks_notifier_green, rx_tip.clone(), offer_block_sender.clone()); - start_progressor("blue".to_string(), blocks_notifier_blue, rx_tip.clone(), offer_block_sender.clone()); + start_progressor("green".to_string(), blocks_notifier_green, rx_tip.clone(), offer_block_sender.clone()).await; + start_progressor("blue".to_string(), blocks_notifier_blue, rx_tip.clone(), offer_block_sender.clone()).await; // test @@ -167,7 +167,13 @@ pub async fn main() { }); // "infinite" sleep - sleep(Duration::from_secs(1800)).await + sleep(Duration::from_secs(1800)).await; + + info!("Shutting down..."); + info!("...tip variable"); + drop(tx_tip); + info!("Shutdown completed."); + } #[derive(Clone, Debug)] @@ -190,9 +196,10 @@ enum OfferBlockMsg { NextSlot(String, BlockRef), } -fn start_progressor(label: String, blocks_notifier: Receiver, mut rx_tip: tokio::sync::watch::Receiver, +async fn start_progressor(label: String, blocks_notifier: Receiver, mut rx_tip: tokio::sync::watch::Receiver, offer_block_sender: tokio::sync::mpsc::Sender) { tokio::spawn(async move { + // TODO is .resubscribe what we want? let mut blocks_notifier = blocks_notifier.resubscribe(); // for test only // let start_slot = blocks_notifier.recv().await.unwrap().slot; @@ -201,25 +208,30 @@ fn start_progressor(label: String, blocks_notifier: Receiver, mut let mut local_tip = 0; // block after tip offered by this stream - let mut block_after_tip: BlockRef = BlockRef { + // TODO: block_after_tip is only valid/useful if greater than tip + let mut highest_block: BlockRef = BlockRef { slot: 0, parent_slot: 0, }; 'main_loop: loop { select! { - _ = rx_tip.changed() => { + result = rx_tip.changed() => { + if result.is_err() { + debug!("Tip variable closed for {}", label); + break 'main_loop; + } local_tip = rx_tip.borrow_and_update().clone(); info!("++> {} tip changed to {}", label, local_tip); // TODO update local tip } - recv_result = blocks_notifier.recv(), if !(block_after_tip.slot > local_tip) => { + recv_result = blocks_notifier.recv(), if !(highest_block.slot > local_tip) => { match recv_result { Ok(block) => { info!("=> recv on {}: {}",label, format_block(&block)); if block.slot > local_tip { info!("==> {}: beyond tip ({} > {})", label, block.slot, local_tip); - block_after_tip = BlockRef::from(block); - offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), block_after_tip.clone())).await.unwrap(); + highest_block = BlockRef::from(block); + offer_block_sender.send(OfferBlockMsg::NextSlot(label.clone(), highest_block.clone())).await.unwrap(); // this thread will sleep and not issue any recvs until we get tip.changed signal continue 'main_loop; } @@ -227,7 +239,7 @@ fn start_progressor(label: String, blocks_notifier: Receiver, mut Err(e) => { // TODO what to do? error!("Error receiving block: {}", e); - continue 'main_loop; + break 'main_loop; } } }