Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 8, 2023
1 parent 43d20b6 commit 93b0d93
Showing 1 changed file with 8 additions and 21 deletions.
29 changes: 8 additions & 21 deletions cluster-endpoints/examples/grpc_using_streams.rs
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
use std::collections::{HashMap, HashSet};
use std::fmt::{Display, Formatter};
use std::ops::{Add, Deref, Sub};
use std::path::PathBuf;
use std::pin::{pin, Pin};
use std::sync::Arc;
use std::thread;
use anyhow::{bail, Context};
use async_stream::stream;
use futures::{pin_mut, Stream, StreamExt};
use futures::stream::FuturesUnordered;
use itertools::{ExactlyOneError, Itertools};
use futures::{Stream, StreamExt};
use itertools::{Itertools};

use log::{debug, error, info, warn};
use serde::Serializer;
use serde_json::de::Read;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::CommitmentConfig;
use tokio::{select};
use tokio::sync::broadcast::{Receiver, Sender};
use tokio::sync::broadcast::error::TryRecvError;
use tokio::sync::RwLock;
use tokio::task::{JoinHandle, JoinSet};
use tokio::time::{sleep, Duration, timeout, Instant, sleep_until};
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::{CommitmentLevel, SubscribeRequestFilterBlocks, SubscribeRequestFilterBlocksMeta, SubscribeUpdate, SubscribeUpdateBlock, SubscribeUpdateBlockMeta};
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::tonic::Status;
use yellowstone_grpc_proto::tonic::transport::ClientTlsConfig;

use solana_lite_rpc_cluster_endpoints::grpc_subscription::{create_block_processing_task, map_produced_block};
Expand All @@ -37,7 +27,6 @@ use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
pub async fn main() {
// RUST_LOG=info,grpc_using_streams=debug
tracing_subscriber::fmt::init();
// TODO remove
// console_subscriber::init();

// mango validator (mainnet)
Expand Down Expand Up @@ -81,7 +70,7 @@ fn create_multiplex(
grpc_sources: Vec<GrpcSourceConfig>,
commitment_config: CommitmentConfig,
block_sx: Sender<ProducedBlock>,
) -> JoinHandle<()> {
) -> AnyhowJoinHandle {

if grpc_sources.len() < 1 {
panic!("Must have at least one source");
Expand All @@ -94,6 +83,7 @@ fn create_multiplex(

let mut futures = futures::stream::SelectAll::new();
for grpc_source in grpc_sources {
// note: stream never terminates
let stream = create_geyser_reconnecting_stream(grpc_source.clone()).await;
futures.push(Box::pin(stream));
}
Expand All @@ -109,7 +99,7 @@ fn create_multiplex(
map_filter_block_message(current_slot, message, commitment_config)
}
None => {
panic!("must not close the stream");
panic!("source stream is not supposed to terminate");
}
}
}
Expand All @@ -118,13 +108,13 @@ fn create_multiplex(
match block_cmd {
BlockCmd::ForwardBlock(block) => {
current_slot = block.slot;
block_sx.send(block).unwrap();
block_sx.send(block).context("send block to downstream")?;
}
BlockCmd::DiscardBlockBehindTip(slot) => {
debug!("Discarding redundand block #{}", slot);
debug!(". discarding redundant block #{}", slot);
}
BlockCmd::SkipMessage => {
debug!("Skipping this message by type");
debug!(". skipping this message by type");
}
}

Expand Down Expand Up @@ -191,9 +181,6 @@ async fn create_geyser_reconnecting_stream(grpc_source: GrpcSourceConfig) -> imp
sleep_until(throttle_barrier).await;
throttle_barrier = Instant::now().add(Duration::from_millis(1000));

// throws e.g. InvalidUri(InvalidUri(InvalidAuthority))
// GeyserGrpcClientError
// TODO extract parameters
let connect_result = GeyserGrpcClient::connect_with_timeout(
grpc_source.grpc_addr.clone(), grpc_source.grpc_x_token.clone(), grpc_source.tls_config.clone(),
Some(Duration::from_secs(2)), Some(Duration::from_secs(2)), false).await;
Expand Down

0 comments on commit 93b0d93

Please sign in to comment.