diff --git a/Cargo.lock b/Cargo.lock index febf3f9..e44710d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -312,16 +312,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" -[[package]] -name = "colored" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cbf2150cce219b664a8a70df7a1f933836724b503f8a413af9365b4dcc4d90b8" -dependencies = [ - "lazy_static", - "windows-sys 0.48.0", -] - [[package]] name = "compact_str" version = "0.7.1" @@ -375,22 +365,6 @@ version = "0.8.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "248e3bacc7dc6baa3b21e405ee045c3047101a49145e7e9eca583ab4c2ca5345" -[[package]] -name = "crossterm" -version = "0.25.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e64e6c0fbe2c17357405f7c758c1ef960fce08bdfb2c03d88d2a18d7e09c4b67" -dependencies = [ - "bitflags 1.3.2", - "crossterm_winapi", - "libc", - "mio", - "parking_lot", - "signal-hook", - "signal-hook-mio", - "winapi", -] - [[package]] name = "crossterm" version = "0.27.0" @@ -448,15 +422,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "deranged" -version = "0.3.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b42b6fa04a440b495c8b04d0e71b707c585f83cb9cb28cf8cd0d976c315e31b4" -dependencies = [ - "powerfmt", -] - [[package]] name = "digest" version = "0.10.7" @@ -970,12 +935,6 @@ dependencies = [ "tempfile", ] -[[package]] -name = "num-conv" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" - [[package]] name = "num-traits" version = "0.2.18" @@ -995,15 +954,6 @@ dependencies = [ "libc", ] -[[package]] -name = "num_threads" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9" -dependencies = [ - "libc", -] - [[package]] name = "object" version = "0.32.2" @@ -1023,7 +973,7 @@ dependencies = [ "chrono", "clap", "color-eyre", - "crossterm 0.27.0", + "crossterm", "csv", "futures", "hmac", @@ -1038,12 +988,9 @@ dependencies = [ "sha2", "signal-hook", "simple-logging", - "simple_logger", - "simplelog", "tokio", "tokio-stream", "tokio-util", - "tui", "tui-scrollview", ] @@ -1173,12 +1120,6 @@ dependencies = [ "uuid", ] -[[package]] -name = "powerfmt" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1242,7 +1183,7 @@ dependencies = [ "bitflags 2.4.2", "cassowary", "compact_str", - "crossterm 0.27.0", + "crossterm", "indoc", "itertools", "lru", @@ -1525,29 +1466,6 @@ dependencies = [ "thread-id", ] -[[package]] -name = "simple_logger" -version = "4.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8e7e46c8c90251d47d08b28b8a419ffb4aede0f87c2eea95e17d1d5bacbf3ef1" -dependencies = [ - "colored", - "log", - "time", - "windows-sys 0.48.0", -] - -[[package]] -name = "simplelog" -version = "0.12.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "16257adbfaef1ee58b1363bdc0664c9b8e1e30aed86049635fb5f147d065a9c0" -dependencies = [ - "log", - "termcolor", - "time", -] - [[package]] name = "slab" version = "0.4.9" @@ -1684,15 +1602,6 @@ dependencies = [ "windows-sys 0.52.0", ] -[[package]] -name = "termcolor" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" -dependencies = [ - "winapi-util", -] - [[package]] name = "thread-id" version = "3.3.0" @@ -1714,39 +1623,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "time" -version = "0.3.34" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c8248b6521bb14bc45b4067159b9b6ad792e2d6d754d6c41fb50e29fefe38749" -dependencies = [ - "deranged", - "itoa", - "libc", - "num-conv", - "num_threads", - "powerfmt", - "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ef927ca75afb808a4d64dd374f00a2adf8d0fcff8e7b184af886c3c87ec4a3f3" - -[[package]] -name = "time-macros" -version = "0.2.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ba3a3ef41e6672a2f0f001392bb5dcd3ff0a9992d618ca761a11c3121547774" -dependencies = [ - "num-conv", - "time-core", -] - [[package]] name = "tinyvec" version = "1.6.0" @@ -1880,19 +1756,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "tui" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccdd26cbd674007e649a272da4475fb666d3aa0ad0531da7136db6fab0e5bad1" -dependencies = [ - "bitflags 1.3.2", - "cassowary", - "crossterm 0.25.0", - "unicode-segmentation", - "unicode-width", -] - [[package]] name = "tui-scrollview" version = "0.3.2" @@ -2099,15 +1962,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" -[[package]] -name = "winapi-util" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" -dependencies = [ - "winapi", -] - [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 7a2d7a8..2d64eeb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,14 +6,10 @@ edition = "2021" [dependencies] anyhow = "1.0.81" -simplelog = "0.12.2" tui-scrollview = "0.3.2" ratatui = { version = "0.26.1", features = [] } crossterm = { version = "0.27.0", features = ["event-stream"] } -tui = "0.19.0" color-eyre = "0.6.3" -log = "0.4.16" -simple_logger = "4.3.3" chrono = "0.4.19" futures = "0.3.21" reqwest = {version = "0.11.10", features = ["blocking", "json"]} @@ -25,6 +21,7 @@ serde_json="1.0.79" serde_derive = "1.0.136" clap = { version = "4.5.2", features = ["derive"] } csv = "1.3.0" +log = { version = "0.4.21", features = ["std"] } poston = "0.7.8" base64 = "0.22.0" hmac = "0.12.1" diff --git a/README.md b/README.md index 5ca63c2..6fd0b90 100644 --- a/README.md +++ b/README.md @@ -18,10 +18,19 @@ rewrite, I'm hoping I'll be able to maintain the smaller codebase in my limited - Csv file - Graylog - Fluentd +- Azure Log Analytics If you were using an interface that was dropped, keep using the previous version and raise an issue asking for the interface to be included. I don't mind writing an interface for one person, I only mind writing it for no one. + +#### Interactive interface + +An interactive terminal interface was added, which allows testing the API connection, retrieving logs, and load testing +by downloading each log an arbitrary number of times. This should allow live troubleshooting and testing, which might +make solving issues easier. You can use it by running the collector as normal, only adding the '--interactive' command +line parameter. + #### Add container releases While binaries will still be available, the primary method of release should be containers. This will hopefully @@ -32,6 +41,8 @@ be necessary. # Office365 audit log collector +![Screenshot.jpg](Screenshot.jpg) + Collect/retrieve Office365, Azure and DLP audit logs, optionally filter them, then send them to one or more outputs (see full list below). Onboarding is easy and takes only a few minutes (see 'Onboarding' section). There are Windows and Linux executables. diff --git a/Release/Linux/OfficeAuditLogCollector b/Release/Linux/OfficeAuditLogCollector index 0f27a41..df2685f 100644 --- a/Release/Linux/OfficeAuditLogCollector +++ b/Release/Linux/OfficeAuditLogCollector @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:6cf5a102e87b8841728aa9b39152934b116c1591fc25a2de82e0343313573ad5 -size 6292760 +oid sha256:9b4d3320c7e109dbe6ed1f380ae16230f6ca8b9b005d815bdb44aeffc0f4e454 +size 7349496 diff --git a/Release/Windows/OfficeAuditLogCollector.exe b/Release/Windows/OfficeAuditLogCollector.exe index 0dcbfdf..a6f8c96 100644 --- a/Release/Windows/OfficeAuditLogCollector.exe +++ b/Release/Windows/OfficeAuditLogCollector.exe @@ -1,3 +1,3 @@ version https://git-lfs.github.com/spec/v1 -oid sha256:78791d7113b976128bcf90294aa74b5a09cc3a8eebdcc79640468e979012fd7b -size 4945920 +oid sha256:69580b148a6b7ee23881a8562998028bdb113c6ec85c19f2cd9911ba7a3da4fc +size 6075904 diff --git a/Screenshot.jpg b/Screenshot.jpg new file mode 100644 index 0000000..89ddf0e Binary files /dev/null and b/Screenshot.jpg differ diff --git a/src/api_connection.rs b/src/api_connection.rs index f7dee08..bc248b9 100644 --- a/src/api_connection.rs +++ b/src/api_connection.rs @@ -1,4 +1,5 @@ use std::collections::HashMap; +use std::time::Duration; use reqwest; use log::{debug, warn, error, info}; use reqwest::header::{AUTHORIZATION, CONTENT_TYPE, HeaderMap}; @@ -202,7 +203,7 @@ impl ApiConnection { /// next page of content. This is sent over the blobs_tx channel to retrieve as well. If no /// additional pages exist, a status message is sent to indicate all content blobs for this /// content type have been retrieved. -#[tokio::main(flavor="multi_thread", worker_threads=200)] +#[tokio::main(flavor="multi_thread", worker_threads=20)] pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String, String)>, known_blobs: HashMap) { @@ -210,7 +211,7 @@ pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String let blobs_tx = config.blobs_tx.clone(); let blob_error_tx = config.blob_error_tx.clone(); - let status_tx = config.status_tx.clone(); + let mut status_tx = config.status_tx.clone(); let content_tx = config.content_tx.clone(); let client = config.client.clone(); let headers = config.headers.clone(); @@ -219,11 +220,24 @@ pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String let known_blobs = known_blobs.clone(); let duplicate = config.duplicate; async move { - match client.get(url.clone()).timeout(std::time::Duration::from_secs(5)). - headers(headers.clone()).send().await { + match client + .get(url.clone()) + .timeout(Duration::from_secs(5)) + .headers(headers.clone()).send().await { Ok(resp) => { - handle_blob_response(resp, blobs_tx, status_tx, content_tx, blob_error_tx, - content_type, url, &known_blobs, duplicate).await; + if resp.status().is_success() { + handle_blob_response(resp, blobs_tx, status_tx, content_tx, blob_error_tx, + content_type, url, &known_blobs, duplicate).await; + } else { + if let Ok(text) = resp.text().await { + if text.to_lowercase().contains("too many request") { + status_tx.send(StatusMessage::BeingThrottled).await.unwrap(); + } else { + error!("Err getting blob response {}", text); + } + handle_blob_response_error(status_tx, blob_error_tx, content_type, url).await; + } + } }, Err(e) => { error!("Err getting blob response {}", e); @@ -247,14 +261,14 @@ async fn handle_blob_response( handle_blob_response_paging(&resp, blobs_tx, status_tx.clone(), content_type.clone()).await; - match resp.json::>>().await { + match resp.json::>>().await { Ok(i) => { handle_blob_response_content_uris(status_tx, content_tx, content_type, i, known_blobs, duplicate) .await; }, Err(e) => { - warn!("Err getting blob JSON {}", e); + warn!("Error getting blob JSON {}", e); match blob_error_tx.send((content_type, url)).await { Err(e) => { error!("Could not resend failed blob, dropping it: {}", e); @@ -357,8 +371,9 @@ async fn handle_blob_response_error( /// Retrieve the actual ContentUris found in the JSON body of content blobs. -#[tokio::main(flavor="multi_thread", worker_threads=20)] +#[tokio::main(flavor="multi_thread", worker_threads=50)] pub async fn get_content(config: GetContentConfig, content_rx: Receiver) { + content_rx.for_each_concurrent(config.threads, |content_to_retrieve| { let client = config.client.clone(); let headers = config.headers.clone(); @@ -367,7 +382,10 @@ pub async fn get_content(config: GetContentConfig, content_rx: Receiver { handle_content_response(resp, result_tx, status_tx, content_error_tx, content_to_retrieve).await; @@ -379,7 +397,7 @@ pub async fn get_content(config: GetContentConfig, content_rx: Receiver { - error!("Could not resend failed content, dropping it: {}", e); + Err(_) => { status_tx.send(StatusMessage::ErrorContentBlob).await.unwrap_or_else( |e| panic!("Could not send status update, channel closed?: {}", e) ); @@ -422,8 +439,7 @@ async fn handle_content_response( Err(e) => { warn!("Error interpreting JSON: {}", e); match content_error_tx.send(content_to_retrieve).await { - Err(e) => { - error!("Could not resend failed content, dropping it: {}", e); + Err(_) => { status_tx.send(StatusMessage::ErrorContentBlob).await.unwrap_or_else( |e| panic!("Could not send status update, channel closed?: {}", e) ); diff --git a/src/collector.rs b/src/collector.rs index da14f4a..4774fe8 100644 --- a/src/collector.rs +++ b/src/collector.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use std::mem::swap; use std::ops::Div; use std::sync::Arc; -use std::time::Instant; +use std::time::{Duration, Instant}; use anyhow::Result; use log::{warn, error, info}; use futures::{SinkExt}; @@ -12,6 +12,7 @@ use futures::channel::mpsc::{Sender, Receiver}; use serde_json::Value; use tokio::sync::mpsc::UnboundedSender; use tokio::sync::Mutex; +use tokio::time::sleep; use crate::data_structures; use crate::api_connection; use crate::api_connection::ApiConnection; @@ -54,6 +55,7 @@ impl Collector { interactive_sender: Option>> ) -> Result { + info!("Initializing collector."); // Initialize interfaces let mut interfaces: Vec> = Vec::new(); if args.interactive { @@ -380,16 +382,29 @@ fn spawn_blob_collector( /// awaiting_content_blobs is incremented; every time content is retrieved or could not be /// retrieved awaiting_content_blobs is decremented. When it reaches 0 we know we are done. #[tokio::main] -pub async fn message_loop(mut config: data_structures::MessageLoopConfig, mut state: Arc>) { +pub async fn message_loop(mut config: data_structures::MessageLoopConfig, + mut state: Arc>) { // Send base URLS for content blob retrieval then keep track of when they've all come in for (content_type, base_url) in config.urls.into_iter() { config.blobs_tx.clone().send((content_type, base_url)).await.unwrap(); state.lock().await.awaiting_content_types += 1; } + + let mut rate_limit_backoff_started: Option = None; + let mut retry_map = HashMap::new(); // Loop ends with the run itself, signalling the program is done. loop { + if let Some(t) = rate_limit_backoff_started { + if t.elapsed().as_secs() >= 30 { + rate_limit_backoff_started = None; + state.lock().await.rate_limited = false; + info!("Release rate limit"); + } + } + + if let Ok(msg) = config.kill_rx.try_recv() { if msg { info!("Stopping collector."); @@ -440,28 +455,35 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig, mut st break; } } - data_structures::StatusMessage::BeingThrottled => warn!("Throttled!"), // TODO: handle being throttled + data_structures::StatusMessage::BeingThrottled => { + if rate_limit_backoff_started.is_none() { + warn!("Being rate limited, backing off 30 seconds."); + state.lock().await.rate_limited = true; + rate_limit_backoff_started = Some(Instant::now()); + } + } } } // Check channel for content pages that could not be retrieved and retry them the user // defined amount of times. If we can't in that amount of times then give up. if let Ok(Some((content_type, url))) = config.blob_error_rx.try_next() { - if state.lock().await.retry_map.contains_key(&url) { - let retries = &mut state.lock().await.retry_map; - let retries_left = retries.get_mut(&url).unwrap(); + if retry_map.contains_key(&url) { + let retries_left = retry_map.get_mut(&url).unwrap(); if retries_left == &mut 0 { error!("Gave up on blob {}", url); state.lock().await.awaiting_content_types -= 1; state.lock().await.stats.blobs_error += 1; } else { - *retries_left -= 1; + if rate_limit_backoff_started.is_none() { + *retries_left -= 1; + } state.lock().await.stats.blobs_retried += 1; warn!("Retry blob {} {}", retries_left, url); config.blobs_tx.send((content_type, url)).await.unwrap(); } } else { - state.lock().await.retry_map.insert(url.clone(), config.retries - 1); + retry_map.insert(url.clone(), config.retries - 1); state.lock().await.stats.blobs_retried += 1; warn!("Retry blob {} {}", config.retries - 1, url); config.blobs_tx.send((content_type, url)).await.unwrap(); @@ -470,22 +492,22 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig, mut st // Check channel for content blobs that could not be retrieved and retry them the user // defined amount of times. If we can't in that amount of times then give up. if let Ok(Some(content)) = config.content_error_rx.try_next() { - if state.lock().await.retry_map.contains_key(&content.url) { - let retries = &mut state.lock().await.retry_map; - let retries_left = retries.get_mut(&content.url).unwrap(); + state.lock().await.stats.blobs_retried += 1; + if retry_map.contains_key(&content.url) { + let retries_left = retry_map.get_mut(&content.url).unwrap(); if retries_left == &mut 0 { error!("Gave up on content {}", content.url); state.lock().await.awaiting_content_blobs -= 1; state.lock().await.stats.blobs_error += 1; } else { - *retries_left -= 1; - state.lock().await.stats.blobs_retried += 1; + if rate_limit_backoff_started.is_none() { + *retries_left -= 1; + } warn!("Retry content {} {}", retries_left, content.url); config.content_tx.send(content).await.unwrap(); - } } else { - state.lock().await.retry_map.insert(content.url.to_string(), config.retries - 1); + retry_map.insert(content.url.to_string(), config.retries - 1); state.lock().await.stats.blobs_retried += 1; warn!("Retry content {} {}", config.retries - 1, content.url); config.content_tx.send(content).await.unwrap(); @@ -494,6 +516,7 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig, mut st } // We send back stats after exiting the loop, signalling the end of the run. let stats = state.lock().await.stats.clone(); + sleep(Duration::from_secs(3)).await; config.stats_tx.send(( stats.blobs_found, stats.blobs_successful, diff --git a/src/data_structures.rs b/src/data_structures.rs index 058a349..1e6958b 100644 --- a/src/data_structures.rs +++ b/src/data_structures.rs @@ -6,7 +6,6 @@ use clap::Parser; use log::warn; use serde_json::Value; use crate::config::ContentTypesSubConfig; -use crate::data_structures; /// List of JSON responses (used to represent content blobs) pub type ArbitraryJson = HashMap; @@ -149,24 +148,14 @@ pub struct RunStatistics { pub blobs_error: usize, pub blobs_retried: usize, } -impl RunStatistics { - pub fn new() -> RunStatistics { - RunStatistics { - blobs_found: 0, - blobs_successful: 0, - blobs_error: 0, - blobs_retried: 0 - } - } -} -#[derive(Default)] +#[derive(Default, Clone)] pub struct RunState { pub awaiting_content_types: usize, pub awaiting_content_blobs: usize, - pub retry_map: HashMap, pub stats: RunStatistics, + pub rate_limited: bool, } #[derive(Parser, Debug, Clone)] @@ -177,33 +166,24 @@ pub struct RunState { /// collection options (check the examples folder in the repo). Then run the tool with below options. pub struct CliArgs { - #[arg(long)] + #[arg(long, help = "ID of tenant to retrieve logs for.")] pub tenant_id: String, - #[arg(long)] + #[arg(long, help = "Client ID of app registration used to retrieve logs.")] pub client_id: String, - #[arg(long)] + #[arg(long, help = "Secret key of app registration used to retrieve logs")] pub secret_key: String, - #[arg(short, long, default_value = "12345678-1234-1234-1234-123456789123")] + #[arg(short, long, default_value = "12345678-1234-1234-1234-123456789123", help = "Publisher ID, set to tenant-id if left empty.")] pub publisher_id: String, - #[arg(long)] + #[arg(long, help = "Path to mandatory config file.")] pub config: String, - #[arg(short, long, default_value = "")] - pub table_string: String, - - #[arg(short, long, default_value = "")] - pub blob_string: String, - - #[arg(short, long, default_value = "")] - pub sql_string: String, - - #[arg(short, long, default_value = "")] + #[arg(short, long, default_value = "", help = "Shared key for Azure Log Analytics Workspace.")] pub oms_key: String, - #[arg(short, long, required = false)] + #[arg(short, long, required = false, help = "Interactive interface for (load) testing.")] pub interactive: bool, } diff --git a/src/interactive_mode/interactive.rs b/src/interactive_mode/interactive.rs index ed43ce2..62d0e0f 100644 --- a/src/interactive_mode/interactive.rs +++ b/src/interactive_mode/interactive.rs @@ -1,7 +1,4 @@ use std::cmp::max; -use std::collections::HashMap; -use std::mem::swap; -use std::ops::Sub; use std::sync::Arc; use color_eyre::eyre::Result; use crossterm::event::KeyCode::Char; @@ -14,7 +11,6 @@ use ratatui::style::Color; use ratatui::widgets::ListItem; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use anyhow::Result as AnyHowResult; -use futures::StreamExt; use ratatui::style::palette::tailwind; use reqwest::header::HeaderMap; use tokio::sync::Mutex; @@ -57,8 +53,10 @@ struct State { selected_list_max: usize, scroll_log: ScrollViewState, table_result: TableState, + table_result_colum_start: usize, found_blobs: usize, successful_blobs: usize, + awaiting_blobs: usize, error_blobs: usize, retry_blobs: usize, logs_retrieved: usize, @@ -66,6 +64,7 @@ struct State { run_started: Option, run_ended: Option, run_progress: u16, + rate_limit: bool, } impl State { pub fn new(args: CliArgs, @@ -92,15 +91,18 @@ impl State { selected_list_max: 2, scroll_log: ScrollViewState::default(), table_result: TableState::default(), + table_result_colum_start: 0, found_blobs: 0, error_blobs: 0, successful_blobs: 0, + awaiting_blobs: 0, retry_blobs: 0, logs_retrieved: 0, logs_retrieval_speeds: Vec::new(), run_started: None, run_ended: None, run_progress: 0, + rate_limit: false, } } } @@ -118,7 +120,6 @@ pub async fn run(args: CliArgs, config: Config, mut log_rx: UnboundedReceiver<(S loop { let e = tui.next().await.unwrap(); match e { - tui::Event::Quit => action_tx.send(Action::Quit)?, tui::Event::Tick => action_tx.send(Action::Tick)?, tui::Event::Render => action_tx.send(Action::Render)?, tui::Event::Key(_) => { @@ -235,9 +236,9 @@ fn update(state: &mut State, action: Action, api: Arc>) { if let Some(index) = state.table_result.selected() { if index < 1000 { state.table_result.select(Some(index + 1)); - } else { - state.table_result.select(Some(0)) } + } else { + state.table_result.select(Some(0)) } } else { state.selected_list = if state.selected_list >= state.selected_list_max { @@ -250,9 +251,13 @@ fn update(state: &mut State, action: Action, api: Arc>) { Action::HandleLeft => { if state.selected_block == SelectedBlock::Commands && state.selected_list == 2 { let mut current = state.config.collect.duplicate.unwrap_or(1); - current = current.saturating_sub(current / 10); + current = current.saturating_sub(max(1, current / 10)); current = max(1, current); state.config.collect.duplicate = Some(current); + } else if state.selected_block == SelectedBlock::Results { + if state.table_result_colum_start > 0 { + state.table_result_colum_start -= 1; + } } }, Action::HandleRight => { @@ -260,6 +265,14 @@ fn update(state: &mut State, action: Action, api: Arc>) { let current = state.config.collect.duplicate.unwrap_or(1); let increase = max(1, current / 10); state.config.collect.duplicate = Some(current + increase); + } else if state.selected_block == SelectedBlock::Results { + let mut max_col = state.results.first().unwrap_or(&Vec::new()).len(); + if max_col > 10 { + max_col -= 10; + } + if state.table_result_colum_start < max_col { + state.table_result_colum_start += 1; + } } }, Action::ScrollPageUp => { @@ -284,6 +297,9 @@ fn update(state: &mut State, action: Action, api: Arc>) { Action::UpdateErrorBlobs(found) => { state.error_blobs = found; } + Action::UpdateAwaitingBlobs(found) => { + state.awaiting_blobs = found; + } Action::LogsRetrieved(found) => { state.logs_retrieved = found; } @@ -300,6 +316,12 @@ fn update(state: &mut State, action: Action, api: Arc>) { Action::RunEnded => { state.run_ended = Some(Instant::now()); } + Action::RateLimited => { + state.rate_limit = true; + } + Action::NotRateLimited => { + state.rate_limit = false; + } Action::ConnectApi => { state.api_connected = true; }, @@ -455,14 +477,14 @@ fn ui(frame: &mut Frame, state: &mut State) { let mut commands_list_items = Vec::::new(); commands_list_items.push(ListItem::new(Line::from(Span::styled( - "Connect API", Style::default().fg(Color::Magenta), + "Test API connection", Style::default().fg(Color::Magenta), )))); commands_list_items.push(ListItem::new(Line::from(Span::styled( - "Run Collector", Style::default().fg(Color::Magenta), + "Run Collector (using specified config)", Style::default().fg(Color::Magenta), )))); let duplicate = state.config.collect.duplicate.unwrap_or(1); commands_list_items.push(ListItem::new(Line::from(Span::styled( - format!("< Load test ({}x) >", duplicate), Style::default().fg(Color::Magenta), + format!("< Load test ({}x) > (use arrow keys to increase load)", duplicate), Style::default().fg(Color::Magenta), )))); let mut command_state = ListState::default().with_selected(Some(state.selected_list)); @@ -490,7 +512,6 @@ fn ui(frame: &mut Frame, state: &mut State) { } // Speed chart - let chart_block = Block::default() .title(block::Title::from("Performance").alignment(Alignment::Center)) .borders(Borders::ALL); @@ -502,20 +523,11 @@ fn ui(frame: &mut Frame, state: &mut State) { .style(Style::default().magenta()) .data(state.logs_retrieval_speeds.as_slice()), ]; - let x_label_amount = state - .logs_retrieval_speeds - .last() - .unwrap_or(&(10.0, 0.0)) - .0 as usize / 5; - let mut x_labels: Vec = Vec::new(); - for x in 0..x_label_amount { - x_labels.push(Span::from((x * 5).to_string())); - } + let x_axis = Axis::default() - .title("Time (seconds)".red()) .style(Style::default().white()) .bounds([0.0, state.logs_retrieval_speeds.last().unwrap_or(&(10.0, 0.0)).0]) - .labels(x_labels.into()); + .labels(vec![]); let top_speed = state.logs_retrieval_speeds .iter() @@ -528,12 +540,11 @@ fn ui(frame: &mut Frame, state: &mut State) { Span::from(top_speed.to_string()) ); let y_axis = Axis::default() - .title("Y Axis".red()) + .title("Logs per second".red()) .style(Style::default().white()) - .bounds([0.0, state.logs_retrieval_speeds.last().unwrap_or(&(0.0, 10.0)).1 + 100.0]) + .bounds([0.0, top_speed as f64]) .labels(y_labels); - // Create the chart and link all the parts together let chart = Chart::new(datasets) .block(chart_block) .x_axis(x_axis) @@ -609,37 +620,23 @@ fn ui(frame: &mut Frame, state: &mut State) { .title_style(Style::new()) .borders(Borders::ALL); - let mut status_list_items = Vec::::new(); - - let (connect_string, color) = if state.api_connected { - (" API Connection: Connected".to_string(), Color::Green,) - } else { - (" API Connection: Disconnected".to_string(), Color::Red,) - }; - status_list_items.push(ListItem::new(Line::from(Span::styled( - connect_string, Style::default().fg(color), - )))); - - status_list_items.push(ListItem::new(Line::from(Span::styled( - format!(" Blobs discovered:: {}", state.found_blobs), Style::default().fg(Color::LightBlue), - )))); - status_list_items.push(ListItem::new(Line::from(Span::styled( - format!(" Blobs Retrieved:: {}", state.successful_blobs), Style::default().fg(Color::Green), - )))); - status_list_items.push(ListItem::new(Line::from(Span::styled( - format!(" Blobs Retried:: {}", state.retry_blobs), Style::default().fg(Color::Yellow), - )))); - status_list_items.push(ListItem::new(Line::from(Span::styled( - format!(" Blobs Failed:: {}", state.error_blobs), Style::default().fg(Color::Red), - )))); - - let list_wid = List::new(status_list_items) - .style(Style::new()) - .highlight_symbol(" ") + let highest = *[state.found_blobs, state.successful_blobs, state.retry_blobs, state.error_blobs] + .iter() + .max() + .unwrap(); + let bar = BarChart::default() + .block(Block::default().title("Run stats").borders(Borders::ALL)) + .bar_width(10) + .data(BarGroup::default().bars(&[Bar::default().value(state.found_blobs as u64).style(Style::default().fg(Color::Blue)).label(Line::from("Found"))])) + .data(BarGroup::default().bars(&[Bar::default().value(state.successful_blobs as u64).style(Style::default().fg(Color::Green)).label(Line::from("Retrieved"))])) + .data(BarGroup::default().bars(&[Bar::default().value(state.retry_blobs as u64).style(Style::default().fg(Color::Yellow)).label(Line::from("Retried"))])) + .data(BarGroup::default().bars(&[Bar::default().value(state.error_blobs as u64).style(Style::default().fg(Color::Red)).label(Line::from("Error"))])) + .max(max(highest as u64, 10)) .block(status_block); - frame.render_widget(list_wid, horizontal_1[1]); + frame.render_widget(bar, horizontal_1[1]); + // Progress let progress_block = Block::new() .title("Progress") .title_alignment(Alignment::Center) @@ -647,7 +644,25 @@ fn ui(frame: &mut Frame, state: &mut State) { .borders(Borders::ALL); let mut progress_list_items = Vec::::new(); - + let (connect_string, color) = if state.api_connected { + (" API Connection: Connected".to_string(), Color::Green,) + } else { + (" API Connection: Disconnected".to_string(), Color::Red,) + }; + progress_list_items.push(ListItem::new(Line::from(Span::styled( + connect_string, Style::default().fg(color), + )))); + + if state.rate_limit { + progress_list_items.push(ListItem::new(Line::from(Span::styled( + " Being rate limited!", Style::default().fg(Color::Red).rapid_blink(), + )))); + } else { + progress_list_items.push(ListItem::new(Line::from(Span::styled( + " Not rate limited", Style::default().fg(Color::Green), + )))); + } + let elapsed = if let Some(elapsed) = state.run_started { let end = state.run_ended.unwrap_or(Instant::now()); @@ -661,11 +676,16 @@ fn ui(frame: &mut Frame, state: &mut State) { seconds, ) } else { - "Not started".to_string() + " Not started".to_string() }; progress_list_items.push(ListItem::new(Line::from(Span::styled( - format!("Time elapsed: {}", elapsed), Style::default().fg(color), + format!(" Time elapsed: {}", elapsed), Style::default().fg(Color::LightBlue), )))); + + progress_list_items.push(ListItem::new(Line::from(Span::styled( + format!(" Blobs remaining: {}", state.awaiting_blobs), Style::default().fg(Color::LightBlue), + )))); + let progress_list = List::new(progress_list_items) .style(Style::new()) .highlight_symbol(" ") @@ -682,9 +702,9 @@ fn ui(frame: &mut Frame, state: &mut State) { let list_wid = List::new(logs_list_items) .style(Style::new()) .highlight_symbol(" "); - let size = Size::new(frame.size().width - 2, 7); + let size = Size::new(1000, 1000); let mut scroll_view = ScrollView::new(size); - let area = Rect::new(0, 0, frame.size().width - 2, 7); + let area = Rect::new(0, 0, 1000 , 1000); scroll_view.render_widget(list_wid, area); let palette = tailwind::SLATE; @@ -697,7 +717,7 @@ fn ui(frame: &mut Frame, state: &mut State) { let keys_bg = palette.c600; let title = Line::from(vec![ " Logs ".into(), - " ↓ | ↑ | PageDown | PageUp | Home | End " + "| ↓ | ↑ | PageDown | PageUp | " .fg(keys_fg) .bg(keys_bg), ]) @@ -709,14 +729,13 @@ fn ui(frame: &mut Frame, state: &mut State) { let mut results = state.results.clone(); let mut header = if !results.is_empty() { results.remove(0) } else { Vec::new() }; if header.len() > 10 { - header = header[0..10].to_vec(); + header = header[state.table_result_colum_start..state.table_result_colum_start + 10].to_vec(); } let rows: Vec = results .clone() .into_iter() .map(|mut x|{ - let end = if x.len() > 10 { 10 } else { x.len() }; - x = x[0..end].to_vec(); + x = x[state.table_result_colum_start..state.table_result_colum_start + 10].to_vec(); Row::new(x) }) .collect(); @@ -724,9 +743,11 @@ fn ui(frame: &mut Frame, state: &mut State) { .rows(rows) .highlight_style(Style::new().add_modifier(Modifier::REVERSED)) .highlight_symbol(">>") - .header(Row::new(header)); - - + .header(Row::new(header) + .style(Style::new().bold().underlined()) + .bottom_margin(1), + ); + let palette = tailwind::SLATE; let (fg, bg) = if state.selected_block == SelectedBlock::Results { @@ -738,7 +759,7 @@ fn ui(frame: &mut Frame, state: &mut State) { let keys_bg = palette.c600; let title = Line::from(vec![ " Results ".into(), - " ↓ | ↑ | PageDown | PageUp | Home | End " + " ↓ | ↑ | ← | → | " .fg(keys_fg) .bg(keys_bg), ]) @@ -784,8 +805,9 @@ async fn handle_enter_command(state: State, api: Arc>) -> A Ok(()) } -async fn handle_enter_command_connect(mut state: State, api: Arc>) -> AnyHowResult<()> { +async fn handle_enter_command_connect(state: State, api: Arc>) -> AnyHowResult<()> { + state.action_tx.send(Action::DisconnectApi).unwrap(); if api.lock().await.headers.is_empty() { api.lock().await.login().await?; } @@ -803,6 +825,8 @@ async fn handle_enter_command_run(state: State, let mut config = state.config.clone(); if !load_test { config.collect.duplicate = Some(1); + } else { + config.collect.skip_known_logs = Some(false); } let runs = config.get_needed_runs(); let run_state = Arc::new(Mutex::new(RunState::default())); @@ -817,13 +841,24 @@ async fn handle_enter_command_run(state: State, let mut elapsed_since_data_point = Instant::now(); let run_start = elapsed_since_data_point.clone(); let mut logs_retrieved: usize = 0; + let mut rate_limited = false; loop { let stats = run_state.lock().await.stats; + state.action_tx.send(Action::UpdateAwaitingBlobs(run_state.lock().await.awaiting_content_blobs)).unwrap(); + state.action_tx.send(Action::UpdateFoundBlobs(stats.blobs_found)).unwrap(); state.action_tx.send(Action::UpdateFoundBlobs(stats.blobs_found)).unwrap(); state.action_tx.send(Action::UpdateSuccessfulBlobs(stats.blobs_successful)).unwrap(); state.action_tx.send(Action::UpdateErrorBlobs(stats.blobs_error)).unwrap(); state.action_tx.send(Action::UpdateRetryBlobs(stats.blobs_retried)).unwrap(); + if !rate_limited && run_state.lock().await.rate_limited { + rate_limited = true; + state.action_tx.send(Action::RateLimited).unwrap(); + } else if rate_limited && !run_state.lock().await.rate_limited { + rate_limited = false; + state.action_tx.send(Action::NotRateLimited).unwrap(); + } + let progress = if stats.blobs_found > 0 { ((stats.blobs_found - stats.blobs_successful) / stats.blobs_found) * 100 } else { @@ -831,13 +866,17 @@ async fn handle_enter_command_run(state: State, }; state.action_tx.send(Action::RunProgress(progress as u16)).unwrap(); - let done = collector.check_stats().await; logs_retrieved += collector.check_results().await; + let done = collector.check_stats().await; if done { logs_retrieved += collector.check_all_results().await; state.action_tx.send(Action::RunEnded).unwrap(); state.action_tx.send(Action::RunProgress(100)).unwrap(); state.action_tx.send(Action::LogsRetrieved(logs_retrieved)).unwrap(); + state.action_tx.send(Action::UpdateFoundBlobs(stats.blobs_found)).unwrap(); + state.action_tx.send(Action::UpdateSuccessfulBlobs(stats.blobs_successful)).unwrap(); + state.action_tx.send(Action::UpdateErrorBlobs(stats.blobs_error)).unwrap(); + state.action_tx.send(Action::UpdateRetryBlobs(stats.blobs_retried)).unwrap(); break } state.action_tx.send(Action::LogsRetrieved(logs_retrieved)).unwrap(); diff --git a/src/interactive_mode/tui.rs b/src/interactive_mode/tui.rs index c930036..d31254a 100644 --- a/src/interactive_mode/tui.rs +++ b/src/interactive_mode/tui.rs @@ -23,9 +23,7 @@ use tokio_util::sync::CancellationToken; #[derive(Clone, Debug)] pub enum Event { Init, - Quit, Error, - Closed, Tick, Render, FocusGained, @@ -63,6 +61,7 @@ pub enum Action { ScrollPageUp, ScrollPageDown, UpdateFoundBlobs(usize), + UpdateAwaitingBlobs(usize), UpdateSuccessfulBlobs(usize), UpdateErrorBlobs(usize), UpdateRetryBlobs(usize), @@ -71,6 +70,8 @@ pub enum Action { RunProgress(u16), RunStarted, RunEnded, + RateLimited, + NotRateLimited, Quit, Render, None, @@ -111,16 +112,6 @@ impl Tui { self } - pub fn mouse(mut self, mouse: bool) -> Self { - self.mouse = mouse; - self - } - - pub fn paste(mut self, paste: bool) -> Self { - self.paste = paste; - self - } - pub fn start(&mut self) { let tick_delay = Duration::from_secs_f64(1.0 / self.tick_rate); let render_delay = Duration::from_secs_f64(1.0 / self.frame_rate); @@ -234,18 +225,6 @@ impl Tui { self.cancellation_token.cancel(); } - pub fn suspend(&mut self) -> Result<()> { - self.exit()?; - #[cfg(not(windows))] - signal_hook::low_level::raise(signal_hook::consts::signal::SIGTSTP)?; - Ok(()) - } - - pub fn resume(&mut self) -> Result<()> { - self.enter()?; - Ok(()) - } - pub async fn next(&mut self) -> Option { self.event_rx.recv().await } diff --git a/src/interfaces/file_interface.rs b/src/interfaces/file_interface.rs index 756a8f6..4966b79 100644 --- a/src/interfaces/file_interface.rs +++ b/src/interfaces/file_interface.rs @@ -70,8 +70,11 @@ impl FileInterface { columns.append(&mut get_all_columns(content_type)); } + let path = &self.config.output.file.as_ref().unwrap().path; let mut wrt = - Writer::from_path(&self.config.output.file.as_ref().unwrap().path).unwrap(); + Writer::from_path(path).unwrap_or_else( + |e| panic!("Error in CSV interface: Could not write to path '{}': {}", path, e) + ); wrt.write_record(&columns).unwrap(); for logs in all_logs.iter_mut() { for log in logs.iter_mut() { @@ -90,7 +93,9 @@ impl FileInterface { } let columns = get_all_columns(logs); let path = self.paths.get(&content_type).unwrap(); - let mut wrt = Writer::from_path(path).unwrap(); + let mut wrt = Writer::from_path(path).unwrap_or_else( + |e| panic!("Error in CSV interface: Could not write to path '{}': {}", path, e) + ); wrt.write_record(&columns).unwrap(); for log in logs { diff --git a/src/interfaces/interactive_interface.rs b/src/interfaces/interactive_interface.rs index f2e4e54..c65dc0c 100644 --- a/src/interfaces/interactive_interface.rs +++ b/src/interfaces/interactive_interface.rs @@ -1,6 +1,4 @@ -use std::collections::HashMap; use async_trait::async_trait; -use csv::Writer; use tokio::sync::mpsc::UnboundedSender; use crate::data_structures::{Caches}; use crate::interfaces::interface::Interface; diff --git a/src/main.rs b/src/main.rs index 3984c66..bc9e7bd 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,18 +1,11 @@ -use std::io; -use std::io::{IoSlice, Sink, Write}; -use std::mem::swap; -use std::ops::Add; use std::sync::Arc; use clap::Parser; -use crate::collector::{Collector, message_loop}; +use crate::collector::Collector; use crate::config::Config; -use log::{error, info, Level, LevelFilter, log, Log, Metadata, Record}; -use simple_logger::SimpleLogger; -use simple_logging::log_to; -use simplelog::{CombinedLogger, SharedLogger}; +use log::{error, Level, LevelFilter, Log, Metadata, Record}; use tokio::sync::mpsc::{unbounded_channel, UnboundedSender}; use tokio::sync::Mutex; -use crate::data_structures::{CliArgs, RunState}; +use crate::data_structures::RunState; use crate::interactive_mode::interactive; mod collector; @@ -71,30 +64,23 @@ fn init_interactive_logging(config: &Config, log_tx: UnboundedSender<(String, Le } else { LevelFilter::Info }; - let _ = CombinedLogger::init( - vec![ - InteractiveLogger::new(log_tx, level), - ] - ); - + log::set_max_level(level); + log::set_boxed_logger(InteractiveLogger::new(log_tx)).unwrap(); } pub struct InteractiveLogger { log_tx: UnboundedSender<(String, Level)>, - level: LevelFilter, - config: simplelog::Config, - messages: Vec, } impl InteractiveLogger { - pub fn new(log_tx: UnboundedSender<(String, Level)>, level: LevelFilter) -> Box { - Box::new(InteractiveLogger { log_tx, messages: Vec::new(), level, config: simplelog::Config::default() }) + pub fn new(log_tx: UnboundedSender<(String, Level)>) -> Box { + Box::new(InteractiveLogger { log_tx }) } } impl Log for InteractiveLogger { fn enabled(&self, metadata: &Metadata) -> bool { - true + metadata.level() <= Level::Info } fn log(&self, record: &Record) { @@ -109,16 +95,3 @@ impl Log for InteractiveLogger { fn flush(&self) {} } -impl SharedLogger for InteractiveLogger { - fn level(&self) -> LevelFilter { - self.level - } - - fn config(&self) -> Option<&simplelog::Config> { - Some(&self.config) - } - - fn as_log(self: Box) -> Box { - Box::new(*self) - } -}