Skip to content

Commit

Permalink
clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
grooviegermanikus committed Dec 22, 2023
1 parent 62bd200 commit 7837647
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 55 deletions.
54 changes: 22 additions & 32 deletions cluster-endpoints/src/grpc_multiplex.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
use std::collections::HashMap;
use crate::grpc_stream_utils::channelize_stream;
use crate::grpc_subscription::map_block_update;
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig, Message};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{
create_geyser_reconnecting_stream, GeyserFilter, GrpcConnectionTimeouts, GrpcSourceConfig,
};
use geyser_grpc_connector::grpcmultiplex_fastestwins::{
create_multiplexed_stream, FromYellowstoneExtractor,
};
use log::info;
use merge_streams::MergeStreams;
use solana_lite_rpc_core::structures::produced_block::ProducedBlock;
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use solana_lite_rpc_core::AnyhowJoinHandle;
use solana_sdk::clock::Slot;
use solana_sdk::commitment_config::{CommitmentConfig, CommitmentLevel};
use solana_sdk::commitment_config::CommitmentConfig;
use std::collections::HashMap;
use std::time::Duration;
use futures::Stream;
use itertools::Itertools;
use solana_sdk::commitment_config;
use tokio::sync::broadcast::Receiver;
use yellowstone_grpc_proto::geyser::subscribe_update::UpdateOneof;
use yellowstone_grpc_proto::geyser::{SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdate};
use solana_lite_rpc_core::structures::slot_notification::SlotNotification;
use yellowstone_grpc_proto::geyser::{
SubscribeRequest, SubscribeRequestFilterSlots, SubscribeUpdate,
};

struct BlockExtractor(CommitmentConfig);

Expand Down Expand Up @@ -46,7 +47,7 @@ pub fn create_grpc_multiplex_blocks_subscription(
info!("- connection to {}", grpc_source);
}

let timeouts = GrpcConnectionTimeouts {
let _timeouts = GrpcConnectionTimeouts {
connect_timeout: Duration::from_secs(5),
request_timeout: Duration::from_secs(5),
subscribe_timeout: Duration::from_secs(5),
Expand All @@ -57,11 +58,10 @@ pub fn create_grpc_multiplex_blocks_subscription(

let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let stream =
create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
streams.push(stream);
}

Expand All @@ -73,11 +73,10 @@ pub fn create_grpc_multiplex_blocks_subscription(

let mut streams = Vec::new();
for grpc_source in &grpc_sources {
let stream =
create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
let stream = create_geyser_reconnecting_stream(
grpc_source.clone(),
GeyserFilter(commitment_config).blocks_and_txs(),
);
streams.push(stream);
}

Expand All @@ -93,8 +92,6 @@ pub fn create_grpc_multiplex_blocks_subscription(
(multiplexed_finalized_blocks, jh_channelizer)
}



struct SlotExtractor {}

impl FromYellowstoneExtractor for crate::grpc_multiplex::SlotExtractor {
Expand Down Expand Up @@ -127,7 +124,6 @@ pub fn create_grpc_multiplex_slots_subscription(
let multiplex_stream = {
let mut streams = Vec::new();
for grpc_source in &grpc_sources {

let mut slots = HashMap::new();
slots.insert(
"client".to_string(),
Expand All @@ -137,7 +133,7 @@ pub fn create_grpc_multiplex_slots_subscription(
);

let filter = SubscribeRequest {
slots: slots,
slots,
accounts: Default::default(),
transactions: HashMap::new(),
entry: Default::default(),
Expand All @@ -148,20 +144,14 @@ pub fn create_grpc_multiplex_slots_subscription(
ping: None,
};

let stream =
create_geyser_reconnecting_stream(
grpc_source.clone(),
filter,
);
let stream = create_geyser_reconnecting_stream(grpc_source.clone(), filter);
streams.push(stream);
}

create_multiplexed_stream(streams, SlotExtractor{})
create_multiplexed_stream(streams, SlotExtractor {})
};

let (multiplexed_stream, jh_channelizer) =
channelize_stream(multiplex_stream);
let (multiplexed_stream, jh_channelizer) = channelize_stream(multiplex_stream);

(multiplexed_stream, jh_channelizer)
}

25 changes: 14 additions & 11 deletions cluster-endpoints/src/grpc_subscription.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
use crate::grpc_multiplex::{create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_slots_subscription};
use crate::grpc_multiplex::{
create_grpc_multiplex_blocks_subscription, create_grpc_multiplex_slots_subscription,
};
use crate::{
endpoint_stremers::EndpointStreaming, grpc_inspect,
rpc_polling::vote_accounts_and_cluster_info_polling::poll_vote_accounts_and_cluster_info,
};
use anyhow::{bail, Context};
use anyhow::Context;
use futures::StreamExt;
use geyser_grpc_connector::grpc_subscription_autoreconnect::GrpcSourceConfig;
use itertools::Itertools;
use solana_client::nonblocking::rpc_client::RpcClient;
use solana_lite_rpc_core::{
encoding::BASE64,
structures::{
produced_block::{ProducedBlock, TransactionInfo},
slot_notification::SlotNotification,
},
structures::produced_block::{ProducedBlock, TransactionInfo},
AnyhowJoinHandle,
};
use solana_sdk::{
Expand All @@ -31,13 +31,12 @@ use solana_sdk::{
};
use solana_transaction_status::{Reward, RewardType};
use std::{collections::HashMap, sync::Arc};
use geyser_grpc_connector::grpc_subscription_autoreconnect::{create_geyser_reconnecting_stream, GrpcSourceConfig};
use tokio::sync::broadcast::Sender;
use yellowstone_grpc_client::GeyserGrpcClient;
use yellowstone_grpc_proto::geyser::SubscribeRequest;

use yellowstone_grpc_proto::prelude::{
subscribe_update::UpdateOneof, CommitmentLevel, SubscribeRequestFilterBlocks,
SubscribeRequestFilterSlots, SubscribeUpdateBlock,
SubscribeUpdateBlock,
};

pub fn map_block_update(
Expand Down Expand Up @@ -332,7 +331,7 @@ pub fn create_grpc_subscription(
create_grpc_multiplex_slots_subscription(grpc_sources.clone());

let (block_multiplex_channel, jh_multiplex_blockstream) =
create_grpc_multiplex_blocks_subscription(grpc_sources.clone());
create_grpc_multiplex_blocks_subscription(grpc_sources);

grpc_inspect::block_debug_listen(
block_multiplex_channel.resubscribe(),
Expand All @@ -353,6 +352,10 @@ pub fn create_grpc_subscription(
vote_account_notifier,
};

let endpoint_tasks = vec![jh_multiplex_slotstream, jh_multiplex_blockstream, cluster_info_polling];
let endpoint_tasks = vec![
jh_multiplex_slotstream,
jh_multiplex_blockstream,
cluster_info_polling,
];
Ok((streamers, endpoint_tasks))
}
1 change: 0 additions & 1 deletion cluster-endpoints/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,3 @@ pub mod rpc_polling;

pub use geyser_grpc_connector::grpc_subscription_autoreconnect;
pub use yellowstone_grpc_proto::geyser::CommitmentLevel;

14 changes: 9 additions & 5 deletions lite-rpc/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,8 +139,14 @@ impl Config {
.map(Some)
.unwrap_or(config.grpc_x_token);

assert!(env::var("GRPC_ADDR1").is_err(), "use GRPC_ADDR instead of GRPC_ADDR1");
assert!(env::var("GRPC_X_TOKEN1").is_err(), "use GRPC_X_TOKEN instead of GRPC_X_TOKEN1");
assert!(
env::var("GRPC_ADDR1").is_err(),
"use GRPC_ADDR instead of GRPC_ADDR1"
);
assert!(
env::var("GRPC_X_TOKEN1").is_err(),
"use GRPC_X_TOKEN instead of GRPC_X_TOKEN1"
);

// source 2
config.grpc_addr2 = env::var("GRPC_ADDR2")
Expand All @@ -165,7 +171,7 @@ impl Config {
config.grpc_x_token4 = env::var("GRPC_X_TOKEN4")
.map(Some)
.unwrap_or(config.grpc_x_token4);

config.postgres = PostgresSessionConfig::new_from_env()?.or(config.postgres);

Ok(config)
Expand Down Expand Up @@ -212,7 +218,6 @@ impl Config {
}

pub fn get_grpc_sources(&self) -> Vec<GrpcSource> {

let mut sources: Vec<GrpcSource> = vec![];

sources.push(GrpcSource {
Expand Down Expand Up @@ -243,7 +248,6 @@ impl Config {

sources
}

}

#[derive(Debug, Clone)]
Expand Down
17 changes: 11 additions & 6 deletions lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,19 @@ use std::time::Duration;
use anyhow::bail;
use dashmap::DashMap;
use lite_rpc::bridge::LiteBridge;
use lite_rpc::cli::{Config, GrpcSource};
use lite_rpc::cli::Config;
use lite_rpc::postgres_logger::PostgresLogger;
use lite_rpc::service_spawner::ServiceSpawner;
use lite_rpc::{DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE, GRPC_VERSION};
use lite_rpc::DEFAULT_MAX_NUMBER_OF_TXS_IN_QUEUE;
use solana_lite_rpc_history::postgres::postgres_config::PostgresSessionConfig;

use crate::rpc_tester::RpcTester;
use log::info;
use solana_lite_rpc_cluster_endpoints::endpoint_stremers::EndpointStreaming;
use solana_lite_rpc_cluster_endpoints::grpc_subscription::create_grpc_subscription;
use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{
GrpcConnectionTimeouts, GrpcSourceConfig,
};
use solana_lite_rpc_cluster_endpoints::json_rpc_leaders_getter::JsonRpcLeaderGetter;
use solana_lite_rpc_cluster_endpoints::json_rpc_subscription::create_json_rpc_polling_subscription;
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
Expand Down Expand Up @@ -47,7 +50,6 @@ use solana_sdk::signer::Signer;
use std::net::{SocketAddr, ToSocketAddrs};
use std::sync::Arc;
use tokio::sync::mpsc;
use solana_lite_rpc_cluster_endpoints::grpc_subscription_autoreconnect::{GrpcConnectionTimeouts, GrpcSourceConfig};

async fn get_latest_block(
mut block_stream: BlockStream,
Expand Down Expand Up @@ -119,9 +121,12 @@ pub async fn start_lite_rpc(args: Config, rpc_client: Arc<RpcClient>) -> anyhow:

create_grpc_subscription(
rpc_client.clone(),
grpc_sources.iter().map(
|s| GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone())
).collect(),
grpc_sources
.iter()
.map(|s| {
GrpcSourceConfig::new(s.addr.clone(), s.x_token.clone(), None, timeouts.clone())
})
.collect(),
)?
} else {
info!("Creating RPC poll subscription...");
Expand Down

0 comments on commit 7837647

Please sign in to comment.