diff --git a/Cargo.lock b/Cargo.lock index 60728433..d46b09dd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -582,8 +582,10 @@ dependencies = [ "log", "rand 0.8.5", "rand_chacha 0.3.1", + "reqwest", "serde", "serde_json", + "solana-lite-rpc-util", "solana-rpc-client", "solana-rpc-client-api", "solana-sdk", @@ -592,6 +594,7 @@ dependencies = [ "tokio", "tracing", "tracing-subscriber", + "url", ] [[package]] @@ -2087,6 +2090,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "iana-time-zone" version = "0.1.60" @@ -3722,10 +3738,12 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3737,6 +3755,7 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", "tokio-util", "tower-service", diff --git a/bench/Cargo.toml b/bench/Cargo.toml index 4d7f98b1..9dc526af 100644 --- a/bench/Cargo.toml +++ b/bench/Cargo.toml @@ -10,12 +10,13 @@ path = "src/main.rs" [[bin]] # WIP name = "benchnew" -path = "src/cli.rs" +path = "src/benchnew.rs" [dependencies] clap = { workspace = true } csv = "1.2.1" dirs = "5.0.0" +solana-lite-rpc-util = { workspace = true } solana-sdk = { workspace = true } solana-rpc-client = { workspace = true } solana-transaction-status = { workspace = true } @@ -34,6 +35,8 @@ dashmap = { workspace = true } bincode = { workspace = true } itertools = "0.10.5" spl-memo = "4.0.0" +url = "*" +reqwest = "0.11.26" lazy_static = "1.4.0" [dev-dependencies] diff --git a/bench/src/benches/api_load.rs b/bench/src/benches/api_load.rs index bc244c55..ee2e957d 100644 --- a/bench/src/benches/api_load.rs +++ b/bench/src/benches/api_load.rs @@ -13,7 +13,12 @@ use solana_sdk::signature::{read_keypair_file, Keypair, Signer}; use crate::create_memo_tx_small; // TC3 measure how much load the API endpoint can take -pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyhow::Result<()> { +pub async fn api_load( + payer_path: &Path, + rpc_url: String, + test_duration_ms: u64, + cu_price_micro_lamports: u64, +) -> anyhow::Result<()> { warn!("THIS IS WORK IN PROGRESS"); let rpc = Arc::new(RpcClient::new(rpc_url)); @@ -29,7 +34,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho let hash = rpc.get_latest_blockhash().await?; let time = tokio::time::Instant::now(); - while time.elapsed().as_millis() < time_ms.into() { + while time.elapsed().as_millis() < test_duration_ms.into() { let rpc = rpc.clone(); let payer = payer.clone(); @@ -40,7 +45,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho tokio::spawn(async move { let msg = msg.as_bytes(); - let tx = create_memo_tx_small(msg, &payer, hash); + let tx = create_memo_tx_small(msg, &payer, hash, cu_price_micro_lamports); match rpc.send_transaction(&tx).await { Ok(_) => success.fetch_add(1, Ordering::Relaxed), Err(_) => failed.fetch_add(1, Ordering::Relaxed), @@ -50,7 +55,7 @@ pub async fn api_load(payer_path: &Path, rpc_url: String, time_ms: u64) -> anyho txs += 1; } - let calls_per_second = txs as f64 / (time_ms as f64 * 1000.0); + let calls_per_second = txs as f64 / (test_duration_ms as f64 * 1000.0); info!("calls_per_second: {}", calls_per_second); info!("failed: {}", failed.load(Ordering::Relaxed)); info!("success: {}", success.load(Ordering::Relaxed)); diff --git a/bench/src/benches/confirmation_rate.rs b/bench/src/benches/confirmation_rate.rs index 48ced54b..98263feb 100644 --- a/bench/src/benches/confirmation_rate.rs +++ b/bench/src/benches/confirmation_rate.rs @@ -1,89 +1,129 @@ -use crate::tx_size::TxSize; -use crate::{create_rng, generate_txs}; -use anyhow::{bail, Error}; -use futures::future::join_all; -use futures::TryFutureExt; -use itertools::Itertools; +use crate::{create_rng, generate_txs, BenchmarkTransactionParams}; +use anyhow::Context; use log::{debug, info, trace, warn}; -use std::collections::{HashMap, HashSet}; -use std::iter::zip; +use std::ops::Add; use std::path::Path; use std::sync::Arc; use std::time::Duration; +use crate::benches::rpc_interface::{ + send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc, +}; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_rpc_client::rpc_client::SerializableTransaction; -use solana_rpc_client_api::client_error::ErrorKind; -use solana_sdk::signature::{read_keypair_file, Signature, Signer}; -use solana_sdk::slot_history::Slot; -use solana_sdk::transaction::Transaction; -use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}; -use solana_transaction_status::TransactionConfirmationStatus; -use tokio::time::Instant; +use solana_sdk::signature::{read_keypair_file, Keypair, Signature, Signer}; #[derive(Debug, serde::Serialize)] pub struct RpcStat { - confirmation_time: f32, - mode_slot: u64, - confirmed: u64, - unconfirmed: u64, - failed: u64, + tx_sent: u64, + tx_confirmed: u64, + // in ms + average_confirmation_time: f32, + // in slots + average_slot_confirmation_time: f32, + tx_send_errors: u64, + tx_unconfirmed: u64, } -/// TC2 send multiple runs of num_txns, measure the confirmation rate +/// TC2 send multiple runs of num_txs, measure the confirmation rate pub async fn confirmation_rate( payer_path: &Path, rpc_url: String, - tx_size: TxSize, - txns_per_round: usize, - num_rounds: usize, + tx_params: BenchmarkTransactionParams, + max_timeout: Duration, + txs_per_run: usize, + num_of_runs: usize, ) -> anyhow::Result<()> { warn!("THIS IS WORK IN PROGRESS"); + assert!(num_of_runs > 0, "num_of_runs must be greater than 0"); + let rpc = Arc::new(RpcClient::new(rpc_url)); info!("RPC: {}", rpc.as_ref().url()); let payer: Arc = Arc::new(read_keypair_file(payer_path).unwrap()); info!("Payer: {}", payer.pubkey().to_string()); - let mut rpc_results = Vec::with_capacity(num_rounds); + let mut rpc_results = Vec::with_capacity(num_of_runs); - for _ in 0..num_rounds { - let stat: RpcStat = send_bulk_txs_and_wait(&rpc, &payer, txns_per_round, tx_size).await?; - rpc_results.push(stat); + for _ in 0..num_of_runs { + match send_bulk_txs_and_wait(&rpc, &payer, txs_per_run, &tx_params, max_timeout) + .await + .context("send bulk tx and wait") + { + Ok(stat) => { + rpc_results.push(stat); + } + Err(err) => { + warn!( + "Failed to send bulk txs and wait - no rpc stats available: {}", + err + ); + } + } } - info!("avg_rpc: {:?}", calc_stats_avg(&rpc_results)); + if !rpc_results.is_empty() { + info!("avg_rpc: {:?}", calc_stats_avg(&rpc_results)); + } else { + info!("avg_rpc: n/a"); + } Ok(()) } pub async fn send_bulk_txs_and_wait( rpc: &RpcClient, payer: &Keypair, - num_txns: usize, - tx_size: TxSize, + num_txs: usize, + tx_params: &BenchmarkTransactionParams, + max_timeout: Duration, ) -> anyhow::Result { - let hash = rpc.get_latest_blockhash().await?; + trace!("Get latest blockhash and generate transactions"); + let hash = rpc.get_latest_blockhash().await.map_err(|err| { + log::error!("Error get latest blockhash : {err:?}"); + err + })?; let mut rng = create_rng(None); - let txs = generate_txs(num_txns, payer, hash, &mut rng, tx_size); - - let started_at = tokio::time::Instant::now(); + let txs = generate_txs(num_txs, payer, hash, &mut rng, tx_params); + trace!("Sending {} transactions in bulk ..", txs.len()); let tx_and_confirmations_from_rpc: Vec<(Signature, ConfirmationResponseFromRpc)> = - send_and_confirm_bulk_transactions(rpc, &txs).await?; - - let elapsed_total = started_at.elapsed(); - - for (tx_sig, confirmation) in &tx_and_confirmations_from_rpc { - match confirmation { - ConfirmationResponseFromRpc::Success(slots_elapsed, level, elapsed) => { + send_and_confirm_bulk_transactions(rpc, &txs, max_timeout) + .await + .context("send and confirm bulk tx")?; + trace!("Done sending {} transaction.", txs.len()); + + let mut tx_sent = 0; + let mut tx_send_errors = 0; + let mut tx_confirmed = 0; + let mut tx_unconfirmed = 0; + let mut sum_confirmation_time = Duration::default(); + let mut sum_slot_confirmation_time = 0; + for (tx_sig, confirmation_response) in tx_and_confirmations_from_rpc { + match confirmation_response { + ConfirmationResponseFromRpc::Success( + slot_sent, + slot_confirmed, + commitment_status, + confirmation_time, + ) => { debug!( "Signature {} confirmed with level {:?} after {:.02}ms, {} slots", tx_sig, - level, - elapsed.as_secs_f32() * 1000.0, - slots_elapsed + commitment_status, + confirmation_time.as_secs_f64() * 1000.0, + slot_confirmed - slot_sent + ); + tx_sent += 1; + tx_confirmed += 1; + sum_confirmation_time = sum_confirmation_time.add(confirmation_time); + sum_slot_confirmation_time += slot_confirmed - slot_sent; + } + ConfirmationResponseFromRpc::SendError(error_kind) => { + debug!( + "Signature {} failed to get send via RPC: {:?}", + tx_sig, error_kind ); + tx_send_errors += 1; } ConfirmationResponseFromRpc::Timeout(elapsed) => { debug!( @@ -91,60 +131,30 @@ pub async fn send_bulk_txs_and_wait( tx_sig, elapsed.as_secs_f32() * 1000.0 ); - } - ConfirmationResponseFromRpc::SendError(_) => { - unreachable!() - } - } - } - - let (mut confirmed, mut unconfirmed, mut failed) = (0, 0, 0); - let mut slot_hz: HashMap = Default::default(); - - for (_, result_from_rpc) in tx_and_confirmations_from_rpc { - match result_from_rpc { - ConfirmationResponseFromRpc::Success(slot, _, _) => { - confirmed += 1; - *slot_hz.entry(slot).or_default() += 1; - } - ConfirmationResponseFromRpc::Timeout(_) => { - unconfirmed += 1; - } - ConfirmationResponseFromRpc::SendError(_) => { - failed += 1; + tx_sent += 1; + tx_unconfirmed += 1; } } - // - // match tx { - // Ok(Some(status)) => { - // if status.satisfies_commitment(CommitmentConfig::confirmed()) { - // confirmed += 1; - // *slot_hz.entry(status.slot).or_default() += 1; - // } else { - // unconfirmed += 1; - // } - // } - // Ok(None) => { - // unconfirmed += 1; - // } - // Err(_) => { - // failed += 1; - // } - // } } - let mode_slot = slot_hz - .into_iter() - .max_by_key(|(_, v)| *v) - .map(|(k, _)| k) - .unwrap_or_default(); + let average_confirmation_time_ms = if tx_confirmed > 0 { + sum_confirmation_time.as_secs_f32() * 1000.0 / tx_confirmed as f32 + } else { + 0.0 + }; + let average_slot_confirmation_time = if tx_confirmed > 0 { + sum_slot_confirmation_time as f32 / tx_confirmed as f32 + } else { + 0.0 + }; Ok(RpcStat { - confirmation_time: elapsed_total.as_secs_f32(), - mode_slot, - confirmed, - unconfirmed, - failed, + tx_sent, + tx_send_errors, + tx_confirmed, + tx_unconfirmed, + average_confirmation_time: average_confirmation_time_ms, + average_slot_confirmation_time, }) } @@ -152,235 +162,29 @@ fn calc_stats_avg(stats: &[RpcStat]) -> RpcStat { let len = stats.len(); let mut avg = RpcStat { - confirmation_time: 0.0, - mode_slot: 0, - confirmed: 0, - unconfirmed: 0, - failed: 0, + tx_sent: 0, + tx_send_errors: 0, + tx_confirmed: 0, + tx_unconfirmed: 0, + average_confirmation_time: 0.0, + average_slot_confirmation_time: 0.0, }; for stat in stats { - avg.confirmation_time += stat.confirmation_time; - avg.confirmed += stat.confirmed; - avg.unconfirmed += stat.unconfirmed; - avg.failed += stat.failed; + avg.tx_sent += stat.tx_sent; + avg.tx_send_errors += stat.tx_send_errors; + avg.tx_confirmed += stat.tx_confirmed; + avg.tx_unconfirmed += stat.tx_unconfirmed; + avg.average_confirmation_time += stat.average_confirmation_time; + avg.average_slot_confirmation_time += stat.average_slot_confirmation_time; } - avg.confirmation_time /= len as f32; - avg.confirmed /= len as u64; - avg.unconfirmed /= len as u64; - avg.failed /= len as u64; + avg.tx_sent /= len as u64; + avg.tx_send_errors /= len as u64; + avg.tx_confirmed /= len as u64; + avg.tx_unconfirmed /= len as u64; + avg.average_confirmation_time /= len as f32; + avg.average_slot_confirmation_time /= len as f32; avg } - -#[derive(Clone)] -enum ConfirmationResponseFromRpc { - SendError(Arc), - // elapsed slot: current slot (confirmed) at beginning til the slot where transaction showed up with status CONFIRMED - Success(Slot, TransactionConfirmationStatus, Duration), - Timeout(Duration), -} - -async fn send_and_confirm_bulk_transactions( - rpc_client: &RpcClient, - txs: &[Transaction], -) -> anyhow::Result> { - let send_slot = poll_next_slot_start(rpc_client).await?; - - let started_at = Instant::now(); - let batch_sigs_or_fails = join_all( - txs.iter() - .map(|tx| rpc_client.send_transaction(tx).map_err(|e| e.kind)), - ) - .await; - - let after_send_slot = rpc_client - .get_slot_with_commitment(CommitmentConfig::confirmed()) - .await?; - - // optimal value is "0" - info!( - "slots passed while sending: {}", - after_send_slot - send_slot - ); - - let num_sent_ok = batch_sigs_or_fails - .iter() - .filter(|sig_or_fail| sig_or_fail.is_ok()) - .count(); - let num_sent_failed = batch_sigs_or_fails - .iter() - .filter(|sig_or_fail| sig_or_fail.is_err()) - .count(); - - for (i, tx_sig) in txs.iter().enumerate() { - let tx_sent = batch_sigs_or_fails[i].is_ok(); - if tx_sent { - debug!("- tx_sent {}", tx_sig.get_signature()); - } else { - debug!("- tx_fail {}", tx_sig.get_signature()); - } - } - debug!( - "{} transactions sent successfully in {:.02}ms", - num_sent_ok, - started_at.elapsed().as_secs_f32() * 1000.0 - ); - debug!( - "{} transactions failed to send in {:.02}ms", - num_sent_failed, - started_at.elapsed().as_secs_f32() * 1000.0 - ); - - if num_sent_failed > 0 { - warn!( - "Some transactions failed to send: {} out of {}", - num_sent_failed, - txs.len() - ); - bail!("Failed to send all transactions"); - } - - let mut pending_status_set: HashSet = HashSet::new(); - batch_sigs_or_fails - .iter() - .filter(|sig_or_fail| sig_or_fail.is_ok()) - .for_each(|sig_or_fail| { - pending_status_set.insert(sig_or_fail.as_ref().unwrap().to_owned()); - }); - let mut result_status_map: HashMap = HashMap::new(); - - // items get moved from pending_status_set to result_status_map - - let started_at = Instant::now(); - let mut iteration = 1; - 'pooling_loop: loop { - let iteration_ends_at = started_at + Duration::from_millis(iteration * 200); - assert_eq!( - pending_status_set.len() + result_status_map.len(), - num_sent_ok, - "Items must move between pending+result" - ); - let tx_batch = pending_status_set.iter().cloned().collect_vec(); - debug!( - "Request status for batch of remaining {} transactions in iteration {}", - tx_batch.len(), - iteration - ); - // TODO warn if get_status api calles are slow - let batch_responses = rpc_client - .get_signature_statuses(tx_batch.as_slice()) - .await?; - let elapsed = started_at.elapsed(); - - for (tx_sig, status_response) in zip(tx_batch, batch_responses.value) { - match status_response { - Some(tx_status) => { - trace!( - "Some signature status {:?} received for {} after {:.02}ms", - tx_status.confirmation_status, - tx_sig, - elapsed.as_secs_f32() * 1000.0 - ); - if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) { - continue 'pooling_loop; - } - // status is confirmed or finalized - pending_status_set.remove(&tx_sig); - let prev_value = result_status_map.insert( - tx_sig, - ConfirmationResponseFromRpc::Success( - tx_status.slot - send_slot, - tx_status.confirmation_status(), - elapsed, - ), - ); - assert!(prev_value.is_none(), "Must not override existing value"); - } - None => { - // None: not yet processed by the cluster - trace!( - "No signature status was received for {} after {:.02}ms - continue waiting", - tx_sig, - elapsed.as_secs_f32() * 1000.0 - ); - } - } - } - - if pending_status_set.is_empty() { - debug!("All transactions confirmed after {} iterations", iteration); - break 'pooling_loop; - } - - if iteration == 100 { - debug!("Timeout waiting for transactions to confirmed after {} iterations - giving up on {}", iteration, pending_status_set.len()); - break 'pooling_loop; - } - iteration += 1; - - // avg 2 samples per slot - tokio::time::sleep_until(iteration_ends_at).await; - } // -- END polling loop - - let total_time_elapsed_polling = started_at.elapsed(); - - // all transactions which remain in pending list are considered timed out - for tx_sig in pending_status_set.clone() { - pending_status_set.remove(&tx_sig); - result_status_map.insert( - tx_sig, - ConfirmationResponseFromRpc::Timeout(total_time_elapsed_polling), - ); - } - - let result_as_vec = batch_sigs_or_fails - .into_iter() - .enumerate() - .map(|(i, sig_or_fail)| match sig_or_fail { - Ok(tx_sig) => { - let confirmation = result_status_map - .get(&tx_sig) - .expect("consistent map with all tx") - .clone() - .to_owned(); - (tx_sig, confirmation) - } - Err(send_error) => { - let tx_sig = txs[i].get_signature(); - let confirmation = ConfirmationResponseFromRpc::SendError(Arc::new(send_error)); - (*tx_sig, confirmation) - } - }) - .collect_vec(); - - Ok(result_as_vec) -} - -async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result { - let started_at = Instant::now(); - let mut last_slot: Option = None; - let mut i = 1; - // try to catch slot start - let send_slot = loop { - if i > 500 { - bail!("Timeout waiting for slot change"); - } - - let iteration_ends_at = started_at + Duration::from_millis(i * 30); - let slot = rpc_client - .get_slot_with_commitment(CommitmentConfig::confirmed()) - .await?; - trace!("polling slot {}", slot); - if let Some(last_slot) = last_slot { - if last_slot + 1 == slot { - break slot; - } - } - last_slot = Some(slot); - tokio::time::sleep_until(iteration_ends_at).await; - i += 1; - }; - Ok(send_slot) -} diff --git a/bench/src/benches/confirmation_slot.rs b/bench/src/benches/confirmation_slot.rs index dd298b99..4496901d 100644 --- a/bench/src/benches/confirmation_slot.rs +++ b/bench/src/benches/confirmation_slot.rs @@ -1,43 +1,142 @@ use std::path::Path; +use std::time::Duration; -use crate::tx_size::TxSize; -use crate::{create_memo_tx, create_rng, send_and_confirm_transactions, Rng8}; -use anyhow::Context; -use log::{info, warn}; +use crate::benches::rpc_interface::{ + create_rpc_client, send_and_confirm_bulk_transactions, ConfirmationResponseFromRpc, +}; +use crate::metrics::PingThing; +use crate::{create_memo_tx, create_rng, BenchmarkTransactionParams, Rng8}; +use anyhow::anyhow; +use log::{debug, info, warn}; +use solana_lite_rpc_util::obfuscate_rpcurl; use solana_rpc_client::nonblocking::rpc_client::RpcClient; -use solana_sdk::signature::{read_keypair_file, Signer}; +use solana_sdk::signature::{read_keypair_file, Signature, Signer}; use solana_sdk::transaction::Transaction; use solana_sdk::{commitment_config::CommitmentConfig, signature::Keypair}; +use tokio::time::{sleep, Instant}; +use url::Url; -/// TC1 send 2 txs (one via LiteRPC, one via Solana RPC) and compare confirmation slot (=slot distance) +#[derive(Clone, Copy, Debug, Default)] +pub struct Metric { + pub txs_sent: u64, + pub txs_confirmed: u64, + pub txs_un_confirmed: u64, + pub average_confirmation_time_ms: f64, + pub average_time_to_send_txs: f64, +} + +#[derive(Clone)] +pub enum ConfirmationSlotResult { + Success(ConfirmationSlotSuccess), +} + +#[derive(Clone)] +pub struct ConfirmationSlotSuccess { + pub slot_sent: u64, + pub slot_confirmed: u64, + pub confirmation_time: Duration, +} + +#[allow(clippy::too_many_arguments)] +/// TC1 -- Send 2 txs to separate RPCs and compare confirmation slot. +/// The benchmark attempts to minimize the effect of real-world distance and synchronize the time that each transaction reaches the RPC. +/// This is achieved by delaying submission of the transaction to the "nearer" RPC. +/// Delay time is calculated as half of the difference in duration of [getHealth](https://solana.com/docs/rpc/http/gethealth) calls to both RPCs. pub async fn confirmation_slot( payer_path: &Path, rpc_a_url: String, rpc_b_url: String, - tx_size: TxSize, + tx_params: BenchmarkTransactionParams, + max_timeout: Duration, + num_of_runs: usize, + _maybe_ping_thing: Option, ) -> anyhow::Result<()> { + info!( + "START BENCHMARK: confirmation_slot (prio_fees={})", + tx_params.cu_price_micro_lamports + ); warn!("THIS IS WORK IN PROGRESS"); + info!("RPC A: {}", obfuscate_rpcurl(&rpc_a_url)); + info!("RPC B: {}", obfuscate_rpcurl(&rpc_b_url)); - let rpc_a = RpcClient::new(rpc_a_url); - info!("RPC A: {}", rpc_a.url()); - - let rpc_b = RpcClient::new(rpc_b_url); - info!("RPC B: {}", rpc_b.url()); + let rpc_a_url = + Url::parse(&rpc_a_url).map_err(|e| anyhow!("Failed to parse RPC A URL: {}", e))?; + let rpc_b_url = + Url::parse(&rpc_b_url).map_err(|e| anyhow!("Failed to parse RPC B URL: {}", e))?; let mut rng = create_rng(None); let payer = read_keypair_file(payer_path).expect("payer file"); info!("Payer: {}", payer.pubkey().to_string()); + // let mut ping_thing_tasks = vec![]; - let rpc_a_tx = create_tx(&rpc_a, &payer, &mut rng, tx_size).await?; - let rpc_b_tx = create_tx(&rpc_b, &payer, &mut rng, tx_size).await?; + for _ in 0..num_of_runs { + let rpc_a = create_rpc_client(&rpc_a_url); + let rpc_b = create_rpc_client(&rpc_b_url); + // measure network time to reach the respective RPC endpoints, + // used to mitigate the difference in distance by delaying the txn sending + let time_a = rpc_roundtrip_duration(&rpc_a).await?.as_secs_f64(); + let time_b = rpc_roundtrip_duration(&rpc_b).await?.as_secs_f64(); - let (rpc_slot, lite_rpc_slot) = tokio::join!( - send_transaction_and_get_slot(&rpc_a, rpc_a_tx), - send_transaction_and_get_slot(&rpc_b, rpc_b_tx) - ); + debug!("(A) rpc network latency: {}", time_a); + debug!("(B) rpc network latency: {}", time_b); + + let rpc_a_tx = create_tx(&rpc_a, &payer, &mut rng, &tx_params).await?; + let rpc_b_tx = create_tx(&rpc_b, &payer, &mut rng, &tx_params).await?; + + let one_way_delay = (time_a - time_b).abs() / 2.0; + let (a_delay, b_delay) = if time_a > time_b { + (0f64, one_way_delay) + } else { + (one_way_delay, 0f64) + }; - info!("rpc_slot: {}", rpc_slot?); - info!("lite_rpc_slot: {}", lite_rpc_slot?); + debug!("A delay: {}s, B delay: {}s", a_delay, b_delay); + + let a_task = tokio::spawn(async move { + sleep(Duration::from_secs_f64(a_delay)).await; + debug!("(A) sending tx {}", rpc_a_tx.signatures[0]); + send_and_confirm_transaction(&rpc_a, rpc_a_tx, max_timeout).await + }); + + let b_task = tokio::spawn(async move { + sleep(Duration::from_secs_f64(b_delay)).await; + debug!("(B) sending tx {}", rpc_b_tx.signatures[0]); + send_and_confirm_transaction(&rpc_b, rpc_b_tx, max_timeout).await + }); + + let (a, b) = tokio::join!(a_task, b_task); + // only continue if both paths suceed + let a_result: ConfirmationResponseFromRpc = a??; + let b_result: ConfirmationResponseFromRpc = b??; + + if let ( + ConfirmationResponseFromRpc::Success(a_slot_sent, a_slot_confirmed, _, _), + ConfirmationResponseFromRpc::Success(b_slot_sent, b_slot_confirmed, _, _), + ) = (a_result, b_result) + { + info!( + "txn A landed after {} slots", + a_slot_confirmed - a_slot_sent + ); + info!( + "txn B landed after {} slots", + b_slot_confirmed - b_slot_sent + ); + } + + // if let Some(ping_thing) = maybe_ping_thing.clone() { + // ping_thing_tasks.push(tokio::spawn(async move { + // submit_ping_thing_stats(&a_result, &ping_thing) + // .await + // .unwrap(); + // submit_ping_thing_stats(&b_result, &ping_thing) + // .await + // .unwrap(); + // })); + // }; + } + + // futures::future::join_all(ping_thing_tasks).await; Ok(()) } @@ -46,20 +145,52 @@ async fn create_tx( rpc: &RpcClient, payer: &Keypair, rng: &mut Rng8, - tx_size: TxSize, + tx_params: &BenchmarkTransactionParams, ) -> anyhow::Result { - let hash = rpc.get_latest_blockhash().await?; + let (blockhash, _) = rpc + .get_latest_blockhash_with_commitment(CommitmentConfig::confirmed()) + .await?; - Ok(create_memo_tx(payer, hash, rng, tx_size)) + Ok(create_memo_tx(payer, blockhash, rng, tx_params)) } -async fn send_transaction_and_get_slot(client: &RpcClient, tx: Transaction) -> anyhow::Result { - let status = send_and_confirm_transactions(client, &[tx], CommitmentConfig::confirmed(), None) - .await? - .into_iter() - .next() - .unwrap()? - .context("unable to confirm tx")?; +async fn send_and_confirm_transaction( + rpc: &RpcClient, + tx: Transaction, + max_timeout: Duration, +) -> anyhow::Result { + let result_vec: Vec<(Signature, ConfirmationResponseFromRpc)> = + send_and_confirm_bulk_transactions(rpc, &[tx], max_timeout).await?; + assert_eq!(result_vec.len(), 1, "expected 1 result"); + let (_sig, confirmation_response) = result_vec.into_iter().next().unwrap(); - Ok(status.slot) + Ok(confirmation_response) } + +pub async fn rpc_roundtrip_duration(rpc: &RpcClient) -> anyhow::Result { + let started_at = Instant::now(); + rpc.get_health().await?; + let duration = started_at.elapsed(); + Ok(duration) +} + +// async fn submit_ping_thing_stats( +// confirmation_info: &ConfirmationSlotResult, +// ping_thing: &PingThing, +// ) -> anyhow::Result<()> { +// match confirmation_info.result { +// ConfirmationSlotResult::Timeout(_) => Ok(()), +// ConfirmationSlotResult::Success(slot_landed) => { +// ping_thing +// .submit_confirmed_stats( +// confirmation_info.confirmation_time, +// confirmation_info.signature, +// PingThingTxType::Memo, +// true, +// confirmation_info.slot_sent, +// slot_landed, +// ) +// .await +// } +// } +// } diff --git a/bench/src/benches/mod.rs b/bench/src/benches/mod.rs index 361d0851..c560a93c 100644 --- a/bench/src/benches/mod.rs +++ b/bench/src/benches/mod.rs @@ -1,3 +1,4 @@ pub mod api_load; pub mod confirmation_rate; pub mod confirmation_slot; +pub mod rpc_interface; diff --git a/bench/src/benches/rpc_interface.rs b/bench/src/benches/rpc_interface.rs new file mode 100644 index 00000000..0ddce4b5 --- /dev/null +++ b/bench/src/benches/rpc_interface.rs @@ -0,0 +1,285 @@ +use anyhow::{bail, Context, Error}; +use futures::future::join_all; +use futures::TryFutureExt; +use itertools::Itertools; +use log::{debug, trace, warn}; +use solana_rpc_client::nonblocking::rpc_client::RpcClient; +use solana_rpc_client::rpc_client::SerializableTransaction; +use solana_rpc_client_api::client_error::ErrorKind; +use solana_rpc_client_api::config::RpcSendTransactionConfig; +use solana_sdk::clock::Slot; +use solana_sdk::commitment_config::CommitmentConfig; +use solana_sdk::signature::Signature; +use solana_sdk::transaction::Transaction; +use solana_transaction_status::TransactionConfirmationStatus; +use std::collections::{HashMap, HashSet}; +use std::iter::zip; +use std::sync::Arc; +use std::time::Duration; +use tokio::time::Instant; +use url::Url; + +pub fn create_rpc_client(rpc_url: &Url) -> RpcClient { + RpcClient::new_with_commitment(rpc_url.to_string(), CommitmentConfig::confirmed()) +} + +#[derive(Clone)] +pub enum ConfirmationResponseFromRpc { + // RPC error on send_transaction + SendError(Arc), + // (sent slot at confirmed commitment, confirmed slot, ..., ...) + // transaction_confirmation_status is "confirmed" or "finalized" + Success(Slot, Slot, TransactionConfirmationStatus, Duration), + // timout waiting for confirmation status + Timeout(Duration), +} + +pub async fn send_and_confirm_bulk_transactions( + rpc_client: &RpcClient, + txs: &[Transaction], + max_timeout: Duration, +) -> anyhow::Result> { + trace!("Polling for next slot .."); + let send_slot = poll_next_slot_start(rpc_client) + .await + .context("poll for next start slot")?; + trace!("Send slot: {}", send_slot); + + let send_config = RpcSendTransactionConfig { + skip_preflight: true, + preflight_commitment: None, + encoding: None, + max_retries: None, + min_context_slot: None, + }; + + let started_at = Instant::now(); + trace!( + "Sending {} transactions via RPC (retries=off) ..", + txs.len() + ); + let batch_sigs_or_fails = join_all(txs.iter().map(|tx| { + rpc_client + .send_transaction_with_config(tx, send_config) + .map_err(|e| e.kind) + })) + .await; + + let after_send_slot = rpc_client + .get_slot_with_commitment(CommitmentConfig::confirmed()) + .await + .context("get slot afterwards")?; + + if after_send_slot - send_slot > 0 { + warn!( + "Slot advanced during sending transactions: {} -> {}", + send_slot, after_send_slot + ); + } else { + debug!( + "Slot did not advance during sending transactions: {} -> {}", + send_slot, after_send_slot + ); + } + + let num_sent_ok = batch_sigs_or_fails + .iter() + .filter(|sig_or_fail| sig_or_fail.is_ok()) + .count(); + let num_sent_failed = batch_sigs_or_fails + .iter() + .filter(|sig_or_fail| sig_or_fail.is_err()) + .count(); + + for (i, tx_sig) in txs.iter().enumerate() { + let tx_sent = batch_sigs_or_fails[i].is_ok(); + if tx_sent { + trace!("- tx_sent {}", tx_sig.get_signature()); + } else { + trace!("- tx_fail {}", tx_sig.get_signature()); + } + } + debug!( + "{} transactions sent successfully in {:.02}ms", + num_sent_ok, + started_at.elapsed().as_secs_f32() * 1000.0 + ); + debug!( + "{} transactions failed to send in {:.02}ms", + num_sent_failed, + started_at.elapsed().as_secs_f32() * 1000.0 + ); + + if num_sent_failed > 0 { + warn!( + "Some transactions failed to send: {} out of {}", + num_sent_failed, + txs.len() + ); + bail!("Failed to send all transactions"); + } + + let mut pending_status_set: HashSet = HashSet::new(); + batch_sigs_or_fails + .iter() + .filter(|sig_or_fail| sig_or_fail.is_ok()) + .for_each(|sig_or_fail| { + pending_status_set.insert(sig_or_fail.as_ref().unwrap().to_owned()); + }); + let mut result_status_map: HashMap = HashMap::new(); + + // items get moved from pending_status_set to result_status_map + + let started_at = Instant::now(); + let timeout_at = started_at + max_timeout; + 'polling_loop: for iteration in 1.. { + let iteration_ends_at = started_at + Duration::from_millis(iteration * 400); + assert_eq!( + pending_status_set.len() + result_status_map.len(), + num_sent_ok, + "Items must move between pending+result" + ); + let tx_batch = pending_status_set.iter().cloned().collect_vec(); + debug!( + "Request status for batch of remaining {} transactions in iteration {}", + tx_batch.len(), + iteration + ); + + let status_started_at = Instant::now(); + let mut batch_status = Vec::new(); + // "Too many inputs provided; max 256" + for chunk in tx_batch.chunks(256) { + // fail hard if not possible to poll status + let chunk_responses = rpc_client + .get_signature_statuses(chunk) + .await + .expect("get signature statuses"); + batch_status.extend(chunk_responses.value); + } + if status_started_at.elapsed() > Duration::from_millis(500) { + warn!( + "SLOW get_signature_statuses for {} transactions took {:?}", + tx_batch.len(), + status_started_at.elapsed() + ); + } + let elapsed = started_at.elapsed(); + + for (tx_sig, status_response) in zip(tx_batch, batch_status) { + match status_response { + Some(tx_status) => { + trace!( + "Some signature status {:?} received for {} after {:.02}ms", + tx_status.confirmation_status, + tx_sig, + elapsed.as_secs_f32() * 1000.0 + ); + if !tx_status.satisfies_commitment(CommitmentConfig::confirmed()) { + continue 'polling_loop; + } + // status is confirmed or finalized + pending_status_set.remove(&tx_sig); + let prev_value = result_status_map.insert( + tx_sig, + ConfirmationResponseFromRpc::Success( + send_slot, + tx_status.slot, + tx_status.confirmation_status(), + elapsed, + ), + ); + assert!(prev_value.is_none(), "Must not override existing value"); + } + None => { + // None: not yet processed by the cluster + trace!( + "No signature status was received for {} after {:.02}ms - continue waiting", + tx_sig, + elapsed.as_secs_f32() * 1000.0 + ); + } + } + } + + if pending_status_set.is_empty() { + debug!( + "All transactions confirmed after {} iterations / {:?}", + iteration, + started_at.elapsed() + ); + break 'polling_loop; + } + + if Instant::now() > timeout_at { + warn!( + "Timeout waiting for transactions to confirm after {} iterations", + iteration + ); + break 'polling_loop; + } + + // avg 2 samples per slot + tokio::time::sleep_until(iteration_ends_at).await; + } // -- END polling loop + + let total_time_elapsed_polling = started_at.elapsed(); + + // all transactions which remain in pending list are considered timed out + for tx_sig in pending_status_set.clone() { + pending_status_set.remove(&tx_sig); + result_status_map.insert( + tx_sig, + ConfirmationResponseFromRpc::Timeout(total_time_elapsed_polling), + ); + } + + let result_as_vec = batch_sigs_or_fails + .into_iter() + .enumerate() + .map(|(i, sig_or_fail)| match sig_or_fail { + Ok(tx_sig) => { + let confirmation = result_status_map + .get(&tx_sig) + .expect("consistent map with all tx") + .clone() + .to_owned(); + (tx_sig, confirmation) + } + Err(send_error) => { + let tx_sig = txs[i].get_signature(); + let confirmation = ConfirmationResponseFromRpc::SendError(Arc::new(send_error)); + (*tx_sig, confirmation) + } + }) + .collect_vec(); + + Ok(result_as_vec) +} + +pub async fn poll_next_slot_start(rpc_client: &RpcClient) -> Result { + let started_at = Instant::now(); + let mut last_slot: Option = None; + let mut i = 1; + // try to catch slot start + let send_slot = loop { + if i > 500 { + bail!("Timeout waiting for slot change"); + } + + let iteration_ends_at = started_at + Duration::from_millis(i * 30); + let slot = rpc_client + .get_slot_with_commitment(CommitmentConfig::confirmed()) + .await?; + trace!("polling slot {}", slot); + if let Some(last_slot) = last_slot { + if last_slot + 1 == slot { + break slot; + } + } + last_slot = Some(slot); + tokio::time::sleep_until(iteration_ends_at).await; + i += 1; + }; + Ok(send_slot) +} diff --git a/bench/src/benchnew.rs b/bench/src/benchnew.rs new file mode 100644 index 00000000..5d5ebf70 --- /dev/null +++ b/bench/src/benchnew.rs @@ -0,0 +1,156 @@ +use std::path::PathBuf; +use std::time::Duration; + +use bench::{ + benches::{ + api_load::api_load, confirmation_rate::confirmation_rate, + confirmation_slot::confirmation_slot, + }, + metrics::{PingThing, PingThingCluster}, + tx_size::TxSize, + BenchmarkTransactionParams, +}; +use clap::{Parser, Subcommand}; + +#[derive(Parser, Debug)] +#[clap(version, about)] + +struct Arguments { + #[clap(subcommand)] + subcommand: SubCommand, +} + +#[derive(Subcommand, Debug)] +enum SubCommand { + ApiLoad { + #[clap(short, long)] + payer_path: PathBuf, + #[clap(short, long)] + rpc_url: String, + #[clap(short, long)] + test_duration_ms: u64, + /// The CU price in micro lamports + #[clap(short, long, default_value_t = 3)] + #[arg(short = 'f')] + cu_price: u64, + }, + ConfirmationRate { + #[clap(short, long)] + payer_path: PathBuf, + #[clap(short, long)] + rpc_url: String, + #[clap(short, long)] + size_tx: TxSize, + /// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed + #[clap(short, long, default_value_t = 15_000)] + max_timeout_ms: u64, + #[clap(short, long)] + txs_per_run: usize, + #[clap(short, long)] + num_of_runs: usize, + /// The CU price in micro lamports + #[clap(short, long, default_value_t = 300)] + #[arg(short = 'f')] + cu_price: u64, + }, + /// Compares the confirmation slot of txs sent to 2 different RPCs + ConfirmationSlot { + #[clap(short, long)] + payer_path: PathBuf, + /// URL of the 1st RPC + #[clap(short, long)] + #[arg(short = 'a')] + rpc_a: String, + /// URL of the 2nd RPC + #[clap(short, long)] + #[arg(short = 'b')] + rpc_b: String, + #[clap(short, long)] + size_tx: TxSize, + /// Maximum confirmation time in milliseconds. After this, the txn is considered unconfirmed + #[clap(short, long, default_value_t = 15_000)] + max_timeout_ms: u64, + #[clap(short, long)] + num_of_runs: usize, + /// The CU price in micro lamports + #[clap(short, long, default_value_t = 300)] + #[arg(short = 'f')] + cu_price: u64, + #[clap(long)] + ping_thing_token: Option, + }, +} + +pub fn initialize_logger() { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .with_thread_ids(true) + .with_line_number(true) + .init(); +} + +#[tokio::main(flavor = "multi_thread", worker_threads = 16)] +async fn main() { + let args = Arguments::parse(); + initialize_logger(); + + match args.subcommand { + SubCommand::ApiLoad { + payer_path, + rpc_url, + test_duration_ms, + cu_price, + } => { + api_load(&payer_path, rpc_url, test_duration_ms, cu_price) + .await + .unwrap(); + } + SubCommand::ConfirmationRate { + payer_path, + rpc_url, + size_tx, + max_timeout_ms, + txs_per_run, + num_of_runs, + cu_price, + } => confirmation_rate( + &payer_path, + rpc_url, + BenchmarkTransactionParams { + tx_size: size_tx, + cu_price_micro_lamports: cu_price, + }, + Duration::from_millis(max_timeout_ms), + txs_per_run, + num_of_runs, + ) + .await + .unwrap(), + SubCommand::ConfirmationSlot { + payer_path, + rpc_a, + rpc_b, + size_tx, + max_timeout_ms, + num_of_runs, + cu_price, + ping_thing_token, + } => confirmation_slot( + &payer_path, + rpc_a, + rpc_b, + BenchmarkTransactionParams { + tx_size: size_tx, + cu_price_micro_lamports: cu_price, + }, + Duration::from_millis(max_timeout_ms), + num_of_runs, + ping_thing_token.map(|t| PingThing { + cluster: PingThingCluster::Mainnet, + va_api_key: t, + }), + ) + .await + .unwrap(), + } +} diff --git a/bench/src/cli.rs b/bench/src/cli.rs deleted file mode 100644 index 94a655e1..00000000 --- a/bench/src/cli.rs +++ /dev/null @@ -1,95 +0,0 @@ -use std::path::PathBuf; - -use bench::{ - benches::{ - api_load::api_load, confirmation_rate::confirmation_rate, - confirmation_slot::confirmation_slot, - }, - tx_size::TxSize, -}; -use clap::{Parser, Subcommand}; - -#[derive(Parser, Debug)] -#[clap(version, about)] - -struct Arguments { - #[clap(subcommand)] - subcommand: SubCommand, -} - -#[derive(Subcommand, Debug)] -enum SubCommand { - ApiLoad { - #[clap(short, long)] - payer_path: PathBuf, - #[clap(short, long)] - rpc_url: String, - #[clap(short, long)] - time_ms: u64, - }, - ConfirmationRate { - #[clap(short, long)] - payer_path: PathBuf, - #[clap(short, long)] - rpc_url: String, - #[clap(short, long)] - size_tx: TxSize, - #[clap(short, long)] - txns_per_round: usize, - #[clap(short, long)] - num_rounds: usize, - }, - ConfirmationSlot { - #[clap(short, long)] - payer_path: PathBuf, - #[clap(short, long)] - #[arg(short = 'a')] - rpc_a: String, - #[clap(short, long)] - #[arg(short = 'b')] - rpc_b: String, - #[clap(short, long)] - size_tx: TxSize, - }, -} - -pub fn initialize_logger() { - tracing_subscriber::fmt() - .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) - .with_thread_ids(true) - .with_line_number(true) - .init(); -} - -#[tokio::main(flavor = "multi_thread", worker_threads = 16)] -async fn main() { - let args = Arguments::parse(); - initialize_logger(); - - match args.subcommand { - SubCommand::ApiLoad { - payer_path, - rpc_url, - time_ms, - } => { - api_load(&payer_path, rpc_url, time_ms).await.unwrap(); - } - SubCommand::ConfirmationRate { - payer_path, - rpc_url, - size_tx, - txns_per_round, - num_rounds, - } => confirmation_rate(&payer_path, rpc_url, size_tx, txns_per_round, num_rounds) - .await - .unwrap(), - SubCommand::ConfirmationSlot { - payer_path, - rpc_a, - rpc_b, - size_tx, - } => confirmation_slot(&payer_path, rpc_a, rpc_b, size_tx) - .await - .unwrap(), - } -} diff --git a/bench/src/lib.rs b/bench/src/lib.rs index 1bde2acf..83587d7c 100644 --- a/bench/src/lib.rs +++ b/bench/src/lib.rs @@ -20,6 +20,7 @@ use solana_sdk::{ use solana_transaction_status::TransactionStatus; use std::{str::FromStr, time::Duration}; use tokio::time::Instant; +use tx_size::TxSize; pub mod bench1; pub mod benches; @@ -53,6 +54,11 @@ pub struct Args { pub large_transactions: bool, } +pub struct BenchmarkTransactionParams { + pub tx_size: TxSize, + pub cu_price_micro_lamports: u64, +} + const MEMO_PROGRAM_ID: &str = "MemoSq4gqABAXKb96qnH8TysNcWxMyWCqXgDLGmfcHr"; const WAIT_LIMIT_IN_SECONDS: u64 = 60; @@ -189,10 +195,10 @@ pub fn generate_txs( payer: &Keypair, blockhash: Hash, rng: &mut Rng8, - size: tx_size::TxSize, + tx_params: &BenchmarkTransactionParams, ) -> Vec { (0..num_of_txs) - .map(|_| create_memo_tx(payer, blockhash, rng, size)) + .map(|_| create_memo_tx(payer, blockhash, rng, tx_params)) .collect() } @@ -200,38 +206,58 @@ pub fn create_memo_tx( payer: &Keypair, blockhash: Hash, rng: &mut Rng8, - size: tx_size::TxSize, + tx_params: &BenchmarkTransactionParams, ) -> Transaction { - let rand_str = generate_random_string(rng, size.memo_size()); - - match size { - tx_size::TxSize::Small => create_memo_tx_small(&rand_str, payer, blockhash), - tx_size::TxSize::Large => create_memo_tx_large(&rand_str, payer, blockhash), + let rand_str = generate_random_string(rng, tx_params.tx_size.memo_size()); + + match tx_params.tx_size { + tx_size::TxSize::Small => create_memo_tx_small( + &rand_str, + payer, + blockhash, + tx_params.cu_price_micro_lamports, + ), + tx_size::TxSize::Large => create_memo_tx_large( + &rand_str, + payer, + blockhash, + tx_params.cu_price_micro_lamports, + ), } } -// note: there is another version of this -pub fn create_memo_tx_small(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { +pub fn create_memo_tx_small( + msg: &[u8], + payer: &Keypair, + blockhash: Hash, + cu_price_micro_lamports: u64, +) -> Transaction { let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); - // TODO make configurable - // 3 -> 6 slots - // 1 -> 31 slots - let cu_budget: Instruction = ComputeBudgetInstruction::set_compute_unit_price(3); + let cu_budget_ix: Instruction = + ComputeBudgetInstruction::set_compute_unit_price(cu_price_micro_lamports); // Program consumed: 12775 of 13700 compute units - let priority_fees: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000); + let cu_limit_ix: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000); let instruction = Instruction::new_with_bytes(memo, msg, vec![]); let message = Message::new( - &[cu_budget, priority_fees, instruction], + &[cu_budget_ix, cu_limit_ix, instruction], Some(&payer.pubkey()), ); Transaction::new(&[payer], message, blockhash) } -pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Transaction { +pub fn create_memo_tx_large( + msg: &[u8], + payer: &Keypair, + blockhash: Hash, + cu_price_micro_lamports: u64, +) -> Transaction { let accounts = (0..8).map(|_| Keypair::new()).collect_vec(); let memo = Pubkey::from_str(MEMO_PROGRAM_ID).unwrap(); + let cu_budget_ix: Instruction = + ComputeBudgetInstruction::set_compute_unit_price(cu_price_micro_lamports); + let cu_limit_ix: Instruction = ComputeBudgetInstruction::set_compute_unit_limit(14000); let instruction = Instruction::new_with_bytes( memo, @@ -241,7 +267,10 @@ pub fn create_memo_tx_large(msg: &[u8], payer: &Keypair, blockhash: Hash) -> Tra .map(|keypair| AccountMeta::new_readonly(keypair.pubkey(), true)) .collect_vec(), ); - let message = Message::new(&[instruction], Some(&payer.pubkey())); + let message = Message::new( + &[cu_budget_ix, cu_limit_ix, instruction], + Some(&payer.pubkey()), + ); let mut signers = vec![payer]; signers.extend(accounts.iter()); @@ -257,8 +286,9 @@ fn transaction_size_small() { ); let mut rng = create_rng(Some(42)); let rand_string = generate_random_string(&mut rng, 10); + let priority_fee = 100; - let tx = create_memo_tx_small(&rand_string, &payer_keypair, blockhash); + let tx = create_memo_tx_small(&rand_string, &payer_keypair, blockhash, priority_fee); assert_eq!(bincode::serialized_size(&tx).unwrap(), 231); } @@ -270,7 +300,8 @@ fn transaction_size_large() { ); let mut rng = create_rng(Some(42)); let rand_string = generate_random_string(&mut rng, 240); + let priority_fee = 100; - let tx = create_memo_tx_large(&rand_string, &payer_keypair, blockhash); - assert_eq!(bincode::serialized_size(&tx).unwrap(), 1186); + let tx = create_memo_tx_large(&rand_string, &payer_keypair, blockhash, priority_fee); + assert_eq!(bincode::serialized_size(&tx).unwrap(), 1238); } diff --git a/bench/src/metrics.rs b/bench/src/metrics.rs index a5675faf..2f0cb165 100644 --- a/bench/src/metrics.rs +++ b/bench/src/metrics.rs @@ -1,9 +1,13 @@ use std::{ + fmt::{self, Display}, ops::{AddAssign, DivAssign}, time::Duration, }; -use solana_sdk::slot_history::Slot; +use reqwest::StatusCode; +use serde::{Deserialize, Serialize}; +use solana_sdk::{signature::Signature, slot_history::Slot}; +use tracing::debug; #[derive(Clone, Copy, Debug, Default, serde::Serialize)] pub struct Metric { @@ -143,3 +147,158 @@ pub struct TxMetricData { pub time_to_send_in_millis: u64, pub time_to_confirm_in_millis: u64, } + +#[derive(Clone, Debug)] +pub enum PingThingCluster { + Mainnet, + Testnet, + Devnet, +} + +impl PingThingCluster { + pub fn from_arg(cluster: String) -> Self { + match cluster.to_lowercase().as_str() { + "mainnet" => PingThingCluster::Mainnet, + "testnet" => PingThingCluster::Testnet, + "devnet" => PingThingCluster::Devnet, + _ => panic!("incorrect cluster name"), + } + } +} + +impl PingThingCluster { + pub fn to_url_part(&self) -> String { + match self { + PingThingCluster::Mainnet => "mainnet", + PingThingCluster::Testnet => "testnet", + PingThingCluster::Devnet => "devnet", + } + .to_string() + } +} + +#[derive(Clone, Debug)] +pub enum PingThingTxType { + Transfer, + Memo, +} + +impl Display for PingThingTxType { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + match *self { + PingThingTxType::Transfer => write!(f, "transfer"), + PingThingTxType::Memo => write!(f, "memo"), + } + } +} + +#[derive(Clone)] +pub struct PingThing { + pub cluster: PingThingCluster, + pub va_api_key: String, +} + +/// request format see https://github.com/Block-Logic/ping-thing-client/blob/4c008c741164702a639c282f1503a237f7d95e64/ping-thing-client.mjs#L160 +#[derive(Debug, Serialize, Deserialize)] +struct PingThingData { + pub time: u128, + pub signature: String, // Tx sig + pub transaction_type: String, // 'transfer', + pub success: bool, // txSuccess + pub application: String, // e.g. 'web3' + pub commitment_level: String, // e.g. 'confirmed' + pub slot_sent: Slot, + pub slot_landed: Slot, +} + +impl PingThing { + pub async fn submit_confirmed_stats( + &self, + tx_elapsed: Duration, + tx_sig: Signature, + tx_type: PingThingTxType, + tx_success: bool, + slot_sent: Slot, + slot_landed: Slot, + ) -> anyhow::Result<()> { + submit_stats_to_ping_thing( + self.cluster.clone(), + self.va_api_key.clone(), + tx_elapsed, + tx_sig, + tx_type, + tx_success, + slot_sent, + slot_landed, + ) + .await + } +} + +/// submits to https://www.validators.app/ping-thing?network=mainnet +/// Assumes that the txn was sent on Mainnet and had the "confirmed" commitment level +#[allow(clippy::too_many_arguments)] +async fn submit_stats_to_ping_thing( + cluster: PingThingCluster, + va_api_key: String, + tx_elapsed: Duration, + tx_sig: Signature, + tx_type: PingThingTxType, + tx_success: bool, + slot_sent: Slot, + slot_landed: Slot, +) -> anyhow::Result<()> { + let submit_data_request = PingThingData { + time: tx_elapsed.as_millis(), + signature: tx_sig.to_string(), + transaction_type: tx_type.to_string(), + success: tx_success, + application: "LiteRPC.bench".to_string(), + commitment_level: "confirmed".to_string(), + slot_sent, + slot_landed, + }; + + let client = reqwest::Client::new(); + // cluster: 'mainnet' + let response = client + .post(format!( + "https://www.validators.app/api/v1/ping-thing/{}", + cluster.to_url_part() + )) + .header("Content-Type", "application/json") + .header("Token", va_api_key) + .json(&submit_data_request) + .send() + .await? + .error_for_status()?; + + assert_eq!(response.status(), StatusCode::CREATED); + + debug!("Sent data for tx {} to ping-thing server", tx_sig); + Ok(()) +} + +#[ignore] +#[tokio::test] +async fn test_ping_thing() { + let token = "".to_string(); + assert!(token.is_empty(), "Empty token for ping thing test"); + + let ping_thing = PingThing { + cluster: PingThingCluster::Mainnet, + va_api_key: token, + }; + + ping_thing + .submit_confirmed_stats( + Duration::from_secs(2), + Signature::new_unique(), + PingThingTxType::Transfer, + true, + 123, + 124, + ) + .await + .unwrap(); +} diff --git a/benchrunner-service/src/postgres/confirmation_slot.rs b/benchrunner-service/src/postgres/confirmation_slot.rs new file mode 100644 index 00000000..f1cf6897 --- /dev/null +++ b/benchrunner-service/src/postgres/confirmation_slot.rs @@ -0,0 +1,27 @@ +use std::time::SystemTime; + +#[derive(Debug)] +pub struct PostgresConfirmationSlot { + pub signature: String, + pub bench_datetime: SystemTime, + pub slot_sent: u64, + pub slot_confirmed: u64, + pub endpoint: String, + pub confirmed: bool, + pub confirmation_time_ms: f32, +} + +// impl PostgresConfirmationSlot { +// pub fn to_values() -> &[&(dyn ToSql + Sync)] { +// let values: &[&(dyn ToSql + Sync)] = &[ +// &self.signature, +// &self.bench_datetime, +// &(self.slot_sent as i64), +// &(self.slot_confirmed as i64), +// &self.endpoint, +// &self.confirmed, +// &self.confirmation_time_ms, +// ]; +// values +// } +// } diff --git a/benchrunner-service/src/postgres/mod.rs b/benchrunner-service/src/postgres/mod.rs index 3131825c..e0750dc5 100644 --- a/benchrunner-service/src/postgres/mod.rs +++ b/benchrunner-service/src/postgres/mod.rs @@ -1,3 +1,4 @@ +pub mod confirmation_slot; pub mod metrics_dbstore; pub mod postgres_session; pub mod postgres_session_cache; diff --git a/lite-rpc/src/main.rs b/lite-rpc/src/main.rs index c29a9d85..5266fe62 100644 --- a/lite-rpc/src/main.rs +++ b/lite-rpc/src/main.rs @@ -56,6 +56,7 @@ use solana_lite_rpc_services::tx_sender::TxSender; use lite_rpc::postgres_logger; use solana_lite_rpc_core::structures::block_info::BlockInfo; use solana_lite_rpc_prioritization_fees::start_block_priofees_task; +use solana_lite_rpc_util::obfuscate_rpcurl; use solana_rpc_client::nonblocking::rpc_client::RpcClient; use solana_sdk::commitment_config::CommitmentConfig; use solana_sdk::signature::Keypair; @@ -500,14 +501,6 @@ fn parse_host_port(host_port: &str) -> Result { } } -// http://mango.rpcpool.com/c232ab232ba2323 -fn obfuscate_rpcurl(rpc_addr: &str) -> String { - if rpc_addr.contains("rpcpool.com") { - return rpc_addr.replacen(char::is_numeric, "X", 99); - } - rpc_addr.to_string() -} - fn setup_tracing_subscriber() { let enable_instrument_tracing = std::env::var("ENABLE_INSTRUMENT_TRACING") .unwrap_or("false".to_string()) diff --git a/util/src/lib.rs b/util/src/lib.rs index 713e510d..5c4bafd8 100644 --- a/util/src/lib.rs +++ b/util/src/lib.rs @@ -1,3 +1,11 @@ pub mod encoding; pub mod secrets; pub mod statistics; + +// http://mango.rpcpool.com/c232ab232ba2323 +pub fn obfuscate_rpcurl(rpc_addr: &str) -> String { + if rpc_addr.contains("rpcpool.com") { + return rpc_addr.replacen(char::is_numeric, "X", 99); + } + rpc_addr.to_string() +}