Skip to content

Commit 05f5f80

Browse files
committed
22: Fix max-concurrent-stream calculation
1 parent f9cdf59 commit 05f5f80

13 files changed

+1295
-174
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,14 @@ The minor version will be incremented upon a breaking change and the patch versi
1616

1717
### Fixes
1818

19+
## [11.1.2]
20+
21+
### Fixes
22+
23+
- Added stake-based max_streams formula when creating QUIC connection in `QuicPool` so max stream per connection is
24+
derived from current stake weight.
25+
- Remove custom max_stream from configuration options, it should stay dynamic and computed based off validator stake.
26+
1927
## [11.1.1]
2028

2129
### Fixes

Cargo.lock

+12-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "yellowstone-jet"
3-
version = "11.1.1"
3+
version = "11.1.2"
44
authors = ["Triton One"]
55
edition = "2021"
66
description = "Yellowstone Jet"
@@ -54,6 +54,7 @@ solana-streamer = "~2.1.11"
5454
solana-tpu-client = "~2.1.11"
5555
solana-transaction-status = "~2.1.11"
5656
solana-version = "~2.1.11"
57+
solana-rpc-client = "~2.1.11"
5758
solana-yellowstone-blocklist = { git = "https://github.com/rpcpool/solana-yellowstone-blocklist.git", tag = "1.0.0" }
5859
thiserror = "1.0.58"
5960
tokio = { version = "1.36.0", features = ["rt-multi-thread", "macros"] }
@@ -70,6 +71,7 @@ tracing-subscriber = { version = "0.3.1", features = [
7071
uuid = { version = "1.11.0", features = ["v4", "serde"] }
7172
yellowstone-grpc-client = "4.1.1"
7273
yellowstone-grpc-proto = "4.1.1"
74+
retry = "2.0.0"
7375

7476
[build-dependencies]
7577
anyhow = "1.0.69"

src/bin/jet.rs

+110-23
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ use {
33
clap::{Parser, Subcommand},
44
futures::future::{self, Either, FutureExt},
55
jsonrpsee::http_client::HttpClientBuilder,
6+
solana_client::rpc_client::RpcClientConfig,
7+
solana_rpc_client::http_sender::HttpSender,
68
solana_sdk::{
79
commitment_config::CommitmentConfig,
810
pubkey::Pubkey,
@@ -27,7 +29,9 @@ use {
2729
yellowstone_jet::{
2830
blockhash_queue::BlockhashQueue,
2931
cluster_tpu_info::{BlocklistUpdater, ClusterTpuInfo, LeadersSelector},
30-
config::{load_config, ConfigJet, ConfigJetGatewayClient, ConfigMetricsUpstream},
32+
config::{
33+
load_config, ConfigJet, ConfigJetGatewayClient, ConfigMetricsUpstream, RpcErrorStrategy,
34+
},
3135
feature_flags::FeatureSet,
3236
grpc_geyser::{GeyserStreams, GeyserSubscriber},
3337
grpc_jet::GrpcServer,
@@ -37,7 +41,8 @@ use {
3741
quic_solana::ConnectionCache,
3842
rpc::{rpc_admin::RpcClient, rpc_solana_like::RpcServerImpl, RpcServer, RpcServerType},
3943
setup_tracing,
40-
stake::StakeInfo,
44+
solana_rpc_utils::{RetryRpcSender, RetryRpcSenderStrategy},
45+
stake::{self, spawn_cache_stake_info_map, StakeInfoMap},
4146
task_group::TaskGroup,
4247
transactions::{GrpcRootedTxReceiver, SendTransactionsPool},
4348
util::{IdentityFlusherWaitGroup, PubkeySigner, ValueObserver, WaitShutdown},
@@ -153,6 +158,7 @@ async fn run_cmd_admin(config: ConfigJet, admin_cmd: ArgsCommandAdmin) -> anyhow
153158
}
154159

155160
async fn spawn_jet_gw_listener(
161+
stake_info: StakeInfoMap,
156162
jet_gw_config: ConfigJetGatewayClient,
157163
mut identity_observer: ValueObserver<PubkeySigner>,
158164
tx_sender: RpcServerImpl,
@@ -166,6 +172,7 @@ async fn spawn_jet_gw_listener(
166172
let features = features.clone();
167173
let mut identity_observer2 = identity_observer.clone();
168174
let (stop_tx2, stop_rx2) = tokio::sync::oneshot::channel();
175+
let stake_info2 = stake_info.clone();
169176
let fut = identity_observer.until_value_change(move |current_identity| {
170177
if let Some(expected_identity) = expected_identity {
171178
if current_identity.pubkey() != expected_identity {
@@ -176,6 +183,7 @@ async fn spawn_jet_gw_listener(
176183
} else {
177184
GrpcServer::run_with(
178185
Arc::new(current_identity),
186+
stake_info2,
179187
jet_gw_config2.clone(),
180188
tx_sender2.clone(),
181189
features,
@@ -185,6 +193,7 @@ async fn spawn_jet_gw_listener(
185193
} else {
186194
GrpcServer::run_with(
187195
Arc::new(current_identity),
196+
stake_info2,
188197
jet_gw_config2.clone(),
189198
tx_sender2.clone(),
190199
features,
@@ -252,12 +261,89 @@ fn spawn_lewis_metric_subscriber(
252261
})
253262
}
254263

264+
///
265+
/// This task keeps the stake metrics up to date for the current identity.
266+
///
267+
async fn keep_stake_metrics_up_to_date_task(
268+
mut stake_info_identity_observer: ValueObserver<Pubkey>,
269+
stake_info_map: StakeInfoMap,
270+
) {
271+
loop {
272+
let current_identy = stake_info_identity_observer.get_current();
273+
274+
let (stake, total_stake) = stake_info_map
275+
.get_stake_info_with_total_stake(current_identy)
276+
.unwrap_or((0, 0));
277+
278+
let max_pps = stake::stake_to_per100ms_limit(stake, total_stake);
279+
let max_streams = stake::stake_to_max_stream(stake, total_stake);
280+
281+
metrics::cluster_identity_stake_set(metrics::ClusterIdentityStakeKind::Jet, stake);
282+
metrics::cluster_identity_stake_set(metrics::ClusterIdentityStakeKind::Total, total_stake);
283+
metrics::cluster_identity_stake_set(
284+
metrics::ClusterIdentityStakeKind::MaxPermitPer100ms,
285+
max_pps,
286+
);
287+
metrics::cluster_identity_stake_set(
288+
metrics::ClusterIdentityStakeKind::MaxStreams,
289+
max_streams,
290+
);
291+
292+
tokio::select! {
293+
_ = tokio::time::sleep(std::time::Duration::from_secs(30)) => {}
294+
_ = stake_info_identity_observer.observe() => {}
295+
}
296+
}
297+
}
298+
255299
async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
256300
metrics::init();
257301
if let Some(identity) = config.identity.expected {
258302
metrics::quic_set_identity_expected(identity);
259303
}
260304

305+
let retry_strategy = match config.upstream.rpc_on_error.clone() {
306+
RpcErrorStrategy::Fixed { interval, retries } => Some(RetryRpcSenderStrategy::FixedDelay {
307+
delay: interval,
308+
max_retries: retries.get(),
309+
}),
310+
RpcErrorStrategy::Exponential {
311+
base,
312+
factor,
313+
retries,
314+
} => Some(RetryRpcSenderStrategy::ExponentialBackoff {
315+
base,
316+
exp: factor,
317+
max_retries: retries.get(),
318+
}),
319+
RpcErrorStrategy::Fail => None,
320+
};
321+
322+
// We are building a special HttpSender that automatically retry on transient network failure.
323+
// This allow client not to worry about retry logic.
324+
let rpc_sender = HttpSender::new(config.upstream.rpc.clone());
325+
let rpc_client_config = RpcClientConfig::with_commitment(CommitmentConfig::finalized());
326+
let rpc_client = match retry_strategy {
327+
Some(strategy) => {
328+
let rpc_sender = RetryRpcSender::new(rpc_sender, strategy);
329+
solana_client::nonblocking::rpc_client::RpcClient::new_sender(
330+
rpc_sender,
331+
rpc_client_config,
332+
)
333+
}
334+
None => solana_client::nonblocking::rpc_client::RpcClient::new_sender(
335+
rpc_sender,
336+
rpc_client_config,
337+
),
338+
};
339+
340+
let (stake_info_map, stake_info_bg_fut) = spawn_cache_stake_info_map(
341+
rpc_client,
342+
config.upstream.stake_update_interval,
343+
stake::SpawnMode::Detached,
344+
)
345+
.await;
346+
261347
let leaders_selector = Arc::new(
262348
LeadersSelector::new_from_blockchain(
263349
config.upstream.rpc.clone(),
@@ -303,6 +389,7 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
303389
let (quic_session, quic_identity_man) = ConnectionCache::new(
304390
config.quic.clone(),
305391
initial_identity,
392+
stake_info_map.clone(),
306393
identity_flusher_wg.clone(),
307394
);
308395

@@ -329,16 +416,7 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
329416
.add_flusher(Box::new(send_transactions.clone()))
330417
.await;
331418

332-
let rpc = solana_client::nonblocking::rpc_client::RpcClient::new_with_commitment(
333-
config.upstream.rpc.clone(),
334-
CommitmentConfig::finalized(),
335-
);
336-
let stake = StakeInfo::new(
337-
rpc,
338-
config.upstream.stake_update_interval,
339-
quic_identity_man.observe_identity_change(),
340-
);
341-
419+
let stake_info_identity_observer = quic_identity_man.observe_identity_change();
342420
let quic_identity_observer = quic_identity_man.observe_signer_change();
343421
// Run RPC admin
344422
let rpc_admin = RpcServer::new(
@@ -383,8 +461,10 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
383461
.expect("rpc server impl");
384462

385463
info!("starting jet-gateway listener");
464+
let stake_info = stake_info_map.clone();
386465
let h = tokio::spawn(async move {
387466
spawn_jet_gw_listener(
467+
stake_info,
388468
jet_gw_config,
389469
quic_identity_observer,
390470
tx_sender,
@@ -407,6 +487,13 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
407487

408488
let mut tg = TaskGroup::default();
409489

490+
tg.spawn_cancelable("stake_cache_refresh_task", stake_info_bg_fut);
491+
492+
tg.spawn_cancelable(
493+
"stake_info_metrics_update",
494+
keep_stake_metrics_up_to_date_task(stake_info_identity_observer, stake_info_map.clone()),
495+
);
496+
410497
tg.spawn_cancelable("lewis", async move {
411498
lewis.await.expect("lewis");
412499
});
@@ -445,17 +532,17 @@ async fn run_jet(config: ConfigJet) -> anyhow::Result<()> {
445532

446533
tg.spawn_cancelable("send_transactions_pool", send_tx_pool_fut);
447534

448-
tg.spawn_with_shutdown("stake", |mut stop| async move {
449-
tokio::select! {
450-
result = stake.clone().wait_shutdown() => {
451-
result.expect("stake");
452-
},
453-
_ = &mut stop => {
454-
stake.shutdown();
455-
stake.wait_shutdown().await.expect("stake shutdown");
456-
},
457-
}
458-
});
535+
// tg.spawn_with_shutdown("stake", |mut stop| async move {
536+
// tokio::select! {
537+
// result = stake.clone().wait_shutdown() => {
538+
// result.expect("stake");
539+
// },
540+
// _ = &mut stop => {
541+
// stake.shutdown();
542+
// stake.wait_shutdown().await.expect("stake shutdown");
543+
// },
544+
// }
545+
// });
459546

460547
if let Some(mut jet_gw_listener) = jet_gw_listener {
461548
tg.spawn_with_shutdown("jet_gw_listener", |mut stop| async move {

src/config.rs

+44-3
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,12 @@ pub struct ConfigUpstream {
135135
#[serde(default = "ConfigUpstream::default_rpc")]
136136
pub rpc: String,
137137

138+
///
139+
/// RPC retry strategy
140+
/// This strategy will be used when `rpc` call failed due to transient error.
141+
#[serde(default = "ConfigUpstream::default_rpc_retry")]
142+
pub rpc_on_error: RpcErrorStrategy,
143+
138144
/// Cluster nodes information update interval in milliseconds
139145
#[serde(
140146
default = "ConfigUpstream::default_cluster_nodes_update_interval",
@@ -151,6 +157,13 @@ pub struct ConfigUpstream {
151157
}
152158

153159
impl ConfigUpstream {
160+
const fn default_rpc_retry() -> RpcErrorStrategy {
161+
RpcErrorStrategy::Fixed {
162+
interval: Duration::from_millis(100),
163+
retries: unsafe { NonZeroUsize::new_unchecked(3) },
164+
}
165+
}
166+
154167
fn default_rpc() -> String {
155168
"http://127.0.0.1:8899".to_owned()
156169
}
@@ -184,9 +197,6 @@ impl ConfigUpstreamGrpc {
184197
#[derive(Clone, Debug, Deserialize)]
185198
#[serde(deny_unknown_fields)]
186199
pub struct ConfigJetGatewayClient {
187-
/// Max transactions requested from proxy (every 100ms), by default calculated from stake value
188-
pub max_streams: Option<u64>,
189-
190200
/// gRPC service endpoints, only one connection would be used
191201
pub endpoints: Vec<String>,
192202

@@ -566,3 +576,34 @@ impl YellowstoneBlocklist {
566576
.map_err(de::Error::custom)
567577
}
568578
}
579+
580+
///
581+
/// THIS CODE HAS BEEN COPY-PASTED FROM THE `jet-gateway` repo
582+
/// TODO: Refactor this code to be shared common lib.
583+
#[derive(Clone, Debug, Deserialize, PartialEq)]
584+
#[serde(tag = "strategy", rename_all = "lowercase")]
585+
pub enum RpcErrorStrategy {
586+
#[serde(rename = "fixed")]
587+
Fixed {
588+
#[serde(with = "humantime_serde")]
589+
interval: Duration,
590+
#[serde(default = "RpcErrorStrategy::default_retries")]
591+
retries: NonZeroUsize,
592+
},
593+
#[serde(rename = "exponential")]
594+
Exponential {
595+
#[serde(with = "humantime_serde")]
596+
base: Duration,
597+
factor: f64,
598+
#[serde(default = "RpcErrorStrategy::default_retries")]
599+
retries: NonZeroUsize,
600+
},
601+
#[serde(rename = "fail")]
602+
Fail,
603+
}
604+
605+
impl RpcErrorStrategy {
606+
const fn default_retries() -> NonZeroUsize {
607+
unsafe { NonZeroUsize::new_unchecked(3) }
608+
}
609+
}

0 commit comments

Comments
 (0)