From e068c975c5b8944642126bbab4a4da809d255e11 Mon Sep 17 00:00:00 2001 From: GroovieGermanikus Date: Wed, 22 Nov 2023 09:58:15 +0100 Subject: [PATCH] log timings to CSV --- lite-rpc/Cargo.toml | 1 + lite-rpc/tests/storage_integration_tests.rs | 18 ++++++++++++++++++ 2 files changed, 19 insertions(+) diff --git a/lite-rpc/Cargo.toml b/lite-rpc/Cargo.toml index e90a6b34..51c0e807 100644 --- a/lite-rpc/Cargo.toml +++ b/lite-rpc/Cargo.toml @@ -42,6 +42,7 @@ tokio = { version = "1.28.2", features = ["full", "fs"]} tokio-util = "0.7" tokio-postgres = { version = "0.7.8", features = ["with-chrono-0_4"] } chrono = { workspace = true } +csv = "1.3.0" solana-lite-rpc-core = { workspace = true } solana-lite-rpc-services = { workspace = true } diff --git a/lite-rpc/tests/storage_integration_tests.rs b/lite-rpc/tests/storage_integration_tests.rs index a6ca49a8..d78d131d 100644 --- a/lite-rpc/tests/storage_integration_tests.rs +++ b/lite-rpc/tests/storage_integration_tests.rs @@ -23,7 +23,9 @@ use std::process; use std::str::FromStr; use std::sync::Arc; use std::time::{Duration, Instant}; +use csv::WriterBuilder; use futures::channel::oneshot::Cancellation; +use serde::__private::ser::CannotSerializeVariant; use tokio::join; use tokio::sync::broadcast::error::RecvError; use tokio::task::JoinHandle; @@ -121,11 +123,22 @@ fn storage_prepare_epoch_schema( (join_handle, building_epoch_schema) } +#[derive(serde::Serialize)] +struct CsvRow { + slot: Slot, + tx_count: u32, + write_time_ms: u32, +} + // note: the consumer lags far behind the ingress of blocks and transactions fn storage_listen( block_notifier: BlockStream, block_storage: Arc, ) -> JoinHandle<()> { + + let mut csv_writer = WriterBuilder::new().from_path(format!("block_tx_ingress-postgres.csv")).unwrap(); + + tokio::spawn(async move { let mut block_notifier = block_notifier; // this is the critical write loop @@ -161,6 +174,11 @@ fn storage_listen( if elapsed > Duration::from_millis(150) { warn!("(soft_realtime) Write operation was slow!"); } + csv_writer.serialize(CsvRow { + slot: block.slot, + tx_count: block.transactions.len() as u32, + write_time_ms: elapsed.as_millis() as u32, + }).unwrap(); } // -- Ok Err(RecvError::Lagged(missed_blocks)) => { warn!(