Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proxy improvements conn prometheus #207

Closed
Closed
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ prometheus = "0.13.3"
lazy_static = "1.4.0"
dotenv = "0.15.0"
async-channel = "1.8.0"
quinn = "0.9.3"
rustls = { version = "=0.20.8", default-features = false }
quinn = "0.9.4"
rustls = { version = "=0.20.9", default-features = false }
solana-lite-rpc-services = {path = "services", version="0.2.3"}
solana-lite-rpc-core = {path = "core", version="0.2.3"}
solana-lite-rpc-cluster-endpoints = {path = "cluster-endpoints", version="0.2.3"}
Expand Down
1 change: 1 addition & 0 deletions bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,3 +231,4 @@ async fn bench(
metric.finalize();
metric
}

2 changes: 1 addition & 1 deletion core/src/structures/proxy_request_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TpuForwardingRequest t9 {} tpu nodes",
"TpuForwardingRequest to {} tpu nodes",
&self.tpu_nodes.len(),
)
}
Expand Down
2 changes: 1 addition & 1 deletion lite-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ fn configure_tpu_connection_path(quic_proxy_addr: Option<String>) -> TpuConnecti
Some(prox_address) => {
let proxy_socket_addr = parse_host_port(prox_address.as_str()).unwrap();
TpuConnectionPath::QuicForwardProxyPath {
// e.g. "127.0.0.1:11111" or "localhost:11111"
// e.g. "127.0.0.1:11111"
forward_proxy_address: proxy_socket_addr,
}
}
Expand Down
2 changes: 2 additions & 0 deletions quic-forward-proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ publish = false

[dependencies]
solana-lite-rpc-core = { workspace = true }
# required for promentheus-sync
solana-lite-rpc-services = { workspace = true }
solana-sdk = { workspace = true }
solana-streamer = { workspace = true }
solana-transaction-status = { workspace = true }
Expand Down
6 changes: 6 additions & 0 deletions quic-forward-proxy/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ TPU has complex logic to assign connection capacity to TPU clients (see Solana q
* it keeps connections on a per peer address / peer identity basis
* ...


Monitoring
---------------------------
The Quic Proxy exposes prometheus metrics on address configured using _prometheus_addr_ (e.g. localhost:9092).


## License & Copyright

Copyright (c) 2022 Blockworks Foundation
Expand Down
5 changes: 4 additions & 1 deletion quic-forward-proxy/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ use clap::Parser;
pub struct Args {
#[arg(short = 'k', long, default_value_t = String::new())]
pub identity_keypair: String,
// e.g. 0.0.0.0:11111 or "localhost:11111"
// e.g. 0.0.0.0:11111
#[arg(short = 'l', long, env)]
pub proxy_listen_addr: String,
/// enable metrics to prometheus at addr
#[arg(short = 'm', long, default_value_t = String::from("[::]:9092"))]
pub prometheus_addr: String,
}
17 changes: 11 additions & 6 deletions quic-forward-proxy/src/inbound/proxy_listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::shared::ForwardPacket;
use crate::tls_config_provider_server::ProxyTlsConfigProvider;
use crate::tls_self_signed_pair_generator::SelfSignedTlsConfigProvider;
use crate::util::FALLBACK_TIMEOUT;
use anyhow::{anyhow, bail, Context};
use anyhow::{bail, Context};
use log::{debug, error, info, trace, warn};
use quinn::{Connecting, Endpoint, ServerConfig, VarInt};
use solana_sdk::packet::PACKET_DATA_SIZE;
Expand Down Expand Up @@ -133,7 +133,8 @@ impl ProxyListener {

for tpu_node in proxy_request.get_tpu_nodes() {
let tpu_address = tpu_node.tpu_socket_addr;
forwarder_channel_copy

let send_result = forwarder_channel_copy
.send_timeout(
ForwardPacket::new(
txs.clone(),
Expand All @@ -143,8 +144,12 @@ impl ProxyListener {
FALLBACK_TIMEOUT,
)
.await
.context("sending internal packet from proxy to forwarder")
.unwrap();
.context("sending internal packet from proxy to forwarder");

if let Err(err) = send_result {
error!("send failed: {}", err);
return;
}
}
});

Expand All @@ -156,10 +161,10 @@ impl ProxyListener {
Err(quinn::ConnectionError::ApplicationClosed(reason)) => {
debug!("connection closed by client - reason: {:?}", reason);
if reason.error_code != VarInt::from_u32(0) {
return Err(anyhow!(
bail!(
"connection closed by client with unexpected reason: {:?}",
reason
));
);
}
debug!("connection gracefully closed by client");
return Ok(());
Expand Down
10 changes: 7 additions & 3 deletions quic-forward-proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use clap::Parser;
use dotenv::dotenv;
use log::info;
use solana_lite_rpc_core::keypair_loader::load_identity_keypair;
use solana_lite_rpc_services::prometheus_sync::PrometheusSync;
use std::sync::Arc;

use crate::validator_identity::ValidatorIdentity;
Expand All @@ -31,6 +32,7 @@ pub async fn main() -> anyhow::Result<()> {
let Args {
identity_keypair,
proxy_listen_addr,
prometheus_addr,
} = Args::parse();
dotenv().ok();

Expand All @@ -42,15 +44,17 @@ pub async fn main() -> anyhow::Result<()> {
.await?
.start_services();

let prometheus = PrometheusSync::sync(prometheus_addr);

let ctrl_c_signal = tokio::signal::ctrl_c();

tokio::select! {
res = main_services => {
bail!("Services quit unexpectedly {res:?}");
},
// res = test_client => {
// bail!("Test Client quit unexpectedly {res:?}");
// },
res = prometheus => {
bail!("Prometheus sync service exited unexpectedly {res:?}");
}
_ = ctrl_c_signal => {
info!("Received ctrl+c signal");

Expand Down
41 changes: 35 additions & 6 deletions quic-forward-proxy/src/outbound/tx_forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::validator_identity::ValidatorIdentity;
use anyhow::{bail, Context};
use futures::future::join_all;
use log::{debug, info, trace, warn};
use prometheus::{opts, register_int_counter, register_int_gauge, IntCounter, IntGauge};
use quinn::{
ClientConfig, Endpoint, EndpointConfig, IdleTimeout, TokioRuntime, TransportConfig, VarInt,
};
Expand All @@ -22,9 +23,21 @@ use std::time::{Duration, Instant};
use tokio::sync::mpsc::Receiver;
use tokio::sync::RwLock;

const MAX_PARALLEL_STREAMS: usize = 6;
const MAX_PARALLEL_STREAMS: usize = 50;
pub const PARALLEL_TPU_CONNECTION_COUNT: usize = 4;
const AGENT_SHUTDOWN_IDLE: Duration = Duration::from_millis(2500); // ms; should be 4x400ms+buffer
// changed from 2500 to 30000 to avoid the risk of early shutdown
const AGENT_SHUTDOWN_IDLE: Duration = Duration::from_millis(30000); // ms; should be 4x400ms+buffer

lazy_static::lazy_static! {
static ref OUTBOUND_SEND_TX: IntCounter =
register_int_counter!(opts!("literpcproxy_send_tx", "Proxy to TPU send transaction")).unwrap();
static ref OUTBOUND_SEND_ERRORS: IntCounter =
register_int_counter!(opts!("literpcproxy_send_errors", "Proxy to TPU send errors")).unwrap();
static ref OUTBOUND_BATCH_SIZE: IntGauge =
register_int_gauge!(opts!("literpcproxy_batch_size", "Proxy to TPU tx batch size")).unwrap();
static ref OUTBOUND_PACKET_SIZE: IntGauge =
register_int_gauge!(opts!("literpcproxy_packet_size", "Proxy to TPU packet size")).unwrap();
}

struct AgentHandle {
pub tpu_address: SocketAddr,
Expand Down Expand Up @@ -63,7 +76,7 @@ pub async fn tx_forwarder(
transaction_channel
.recv()
.await
.expect("channel closed unexpectedly"),
.ok_or(anyhow::anyhow!("transaction_channel closed"))?,
);
let tpu_address = forward_packet.tpu_address;

Expand Down Expand Up @@ -166,15 +179,21 @@ pub async fn tx_forwarder(
auto_connection.target_address
));

OUTBOUND_BATCH_SIZE.set(transactions_batch.len() as i64);

OUTBOUND_PACKET_SIZE.set(count_bytes(&transactions_batch));

match result {
Ok(()) => {
OUTBOUND_SEND_TX.inc();
debug!("send_txs_to_tpu_static sent {}", transactions_batch.len());
debug!(
"Outbound connection stats: {}",
&auto_connection.connection_stats().await
);
}
Err(err) => {
OUTBOUND_SEND_ERRORS.inc();
warn!("got send_txs_to_tpu_static error {} - loop over errors", err);
}
}
Expand Down Expand Up @@ -208,14 +227,24 @@ pub async fn tx_forwarder(
debug!("tx-forward queue len: {}", broadcast_in.len())
}

broadcast_in
.send(forward_packet)
.expect("send must succeed");
let enqueue_result = broadcast_in.send(forward_packet);

if let Err(e) = enqueue_result {
warn!("broadcast channel send error: {}", e);
}
} // -- loop over transactions from upstream channels

// not reachable
}

fn count_bytes(tx_vec: &Vec<Vec<u8>>) -> i64 {
let mut total_bytes = 0;
for tx in tx_vec {
total_bytes += tx.len();
}
total_bytes as i64
}

async fn cleanup_agents(
agents: &mut HashMap<SocketAddr, AgentHandle>,
current_tpu_address: &SocketAddr,
Expand Down
2 changes: 1 addition & 1 deletion quic-forward-proxy/src/proxy_request_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ impl Display for TpuForwardingRequest {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"TpuForwardingRequest t9 {} tpu nodes",
"TpuForwardingRequest to {} tpu nodes",
&self.tpu_nodes.len(),
)
}
Expand Down
7 changes: 4 additions & 3 deletions quic-forward-proxy/src/quinn_auto_reconnect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,12 @@ impl AutoReconnect {
}

pub async fn send_uni(&self, payload: &Vec<u8>) -> anyhow::Result<()> {
let mut send_stream = timeout(SEND_TIMEOUT, self.refresh_and_get().await?.open_uni())
let connection = self.refresh_and_get().await?;
let mut send_stream = timeout(SEND_TIMEOUT, connection.open_uni())
.await
.context("open uni stream for sending")??;
send_stream.write_all(payload.as_slice()).await?;
send_stream.finish().await?;
timeout(SEND_TIMEOUT, send_stream.write_all(payload.as_slice())).await??;
timeout(SEND_TIMEOUT, send_stream.finish()).await??;
Ok(())
}

Expand Down
1 change: 1 addition & 0 deletions services/src/tpu_utils/quic_proxy_connection_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ impl QuicProxyConnectionManager {
const ALPN_TPU_FORWARDPROXY_PROTOCOL_ID: &[u8] = b"solana-tpu-forward-proxy";

let mut endpoint = {
// Binding on :: will also listen on IPv4 (dual-stack).
let client_socket = UdpSocket::bind("[::]:0").unwrap();
let config = EndpointConfig::default();
Endpoint::new(config, None, client_socket, TokioRuntime)
Expand Down
Loading