Skip to content

Commit

Permalink
tui
Browse files Browse the repository at this point in the history
  • Loading branch information
www committed Mar 24, 2024
1 parent 25068d8 commit 238f0e3
Show file tree
Hide file tree
Showing 15 changed files with 1,963 additions and 158 deletions.
423 changes: 415 additions & 8 deletions Cargo.lock

Large diffs are not rendered by default.

12 changes: 10 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,20 @@ edition = "2021"

[dependencies]
anyhow = "1.0.81"
simplelog = "0.12.2"
tui-scrollview = "0.3.2"
ratatui = { version = "0.26.1", features = [] }
crossterm = { version = "0.27.0", features = ["event-stream"] }
tui = "0.19.0"
color-eyre = "0.6.3"
log = "0.4.16"
simple_logger = "4.3.3"
chrono = "0.4.19"
futures = "0.3.21"
reqwest = {version = "0.11.10", features = ["blocking", "json"]}
tokio = {version="1.17.0", features=["full"]}
tokio = { version = "1.17.0", features = ["full"] }
tokio-stream = "0.1.8"
serde="1.0.136"
serde = "1.0.136"
serde_yaml = "0.9.32"
serde_json="1.0.79"
serde_derive = "1.0.136"
Expand All @@ -25,3 +31,5 @@ hmac = "0.12.1"
sha2 = "0.10.8"
async-trait = "0.1.77"
simple-logging = "2.0.2"
tokio-util = "0.7.10"
signal-hook = "0.3.17"
1 change: 0 additions & 1 deletion Release/ConfigExamples/CsvOutput.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
collect:
skipKnownLogs: True
workingDir: /app
contentTypes:
Audit.General: True
Expand Down
1 change: 1 addition & 0 deletions Release/ConfigExamples/fullConfig.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ collect: # Settings determining which audit logs to collect and how to do it
retries: 3 # Times to retry retrieving a content blob if it fails
skipKnownLogs: True # Remember retrieved log blobs, don't collect them twice
hoursToCollect: 24 # Look back this many hours for audit logs (max supported by Office API is 168)
duplicate: 1 # Amount of times to download each log, can be used for performance testing by inflating the number of logs to download. Default is 1
filter: # Only logs that match ALL filters for a content type are collected. Leave empty to collect all
Audit.General:
Audit.AzureActiveDirectory:
Expand Down
163 changes: 101 additions & 62 deletions src/api_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,26 @@ use futures::channel::mpsc::{Receiver, Sender};
use crate::config::Config;
use crate::data_structures::{JsonList, StatusMessage, GetBlobConfig, GetContentConfig, AuthResult,
ContentToRetrieve, CliArgs};
use anyhow::Result;
use anyhow::{anyhow, Result};
use serde_json::Value;


/// Return a logged in API connection object. Use the Headers value to make API requests.
pub async fn get_api_connection(args: CliArgs, config: Config) -> ApiConnection {
pub async fn get_api_connection(args: CliArgs, config: Config) -> Result<ApiConnection> {

let mut api = ApiConnection {
args,
config,
headers: HeaderMap::new(),
};
api.login().await;
api
api.login().await?;
Ok(api)
}


/// Abstraction of an API connection to Azure Management APIs. Can be used to login to the API
/// which sets the headers. These headers can then be used to make authenticated requests.
#[derive(Clone)]
pub struct ApiConnection {
pub args: CliArgs,
pub config: Config,
Expand All @@ -36,7 +37,7 @@ pub struct ApiConnection {
impl ApiConnection {
/// Use tenant_id, client_id and secret_key to request a bearer token and store it in
/// our headers. Must be called once before requesting any content.
async fn login(&mut self) {
pub async fn login(&mut self) -> Result<()> {
info!("Logging in to Office Management API.");
let auth_url = format!("https://login.microsoftonline.com/{}/oauth2/token",
self.args.tenant_id.to_string());
Expand All @@ -52,51 +53,72 @@ impl ApiConnection {
self.headers.insert(CONTENT_TYPE, "application/x-www-form-urlencoded".parse().unwrap());

let login_client = reqwest::Client::new();
let result = login_client
let response = login_client
.post(auth_url)
.headers(self.headers.clone())
.form(&params)
.send()
.await;
let response = match result {
Ok(response) => response,
Err(e) => {
let msg = format!("Could not send API login request: {}", e);
error!("{}", msg);
panic!("{}", msg);
}
};
.await?;
if !response.status().is_success() {
let text = match response.text().await {
Ok(text) => text,
Err(e) => {
let msg = format!("Received error response to API login, but could not parse response: {}", e);
error!("{}", msg);
panic!("{}", msg);
}
};
let text = response.text().await?;
let msg = format!("Received error response to API login: {}", text);
error!("{}", msg);
panic!("{}", msg);
return Err(anyhow!("{}", msg));
}
let json = match response.json::<AuthResult>().await {
Ok(json) => json,
Err(e) => {
let msg = format!("Could not parse API login reply: {}", e);
error!("{}", msg);
panic!("{}", msg);
}
};

let json = response.json::<AuthResult>().await?;
let token = format!("bearer {}", json.access_token);
self.headers.insert(AUTHORIZATION, token.parse().unwrap());
info!("Successfully logged in to Office Management API.")
info!("Successfully logged in to Office Management API.");
Ok(())
}

fn get_base_url(&self) -> String {
format!("https://manage.office.com/api/v1.0/{}/activity/feed", self.args.tenant_id)
}

pub async fn get_feeds(&self) -> Result<Vec<String>> {

let url = format!("{}/subscriptions/list", self.get_base_url());
let client = reqwest::Client::new();
let result: Vec<HashMap<String, Value>> = client
.get(url)
.headers(self.headers.clone())
.header("content-length", 0)
.send()
.await?
.json()
.await?;
Ok(result.iter()
.filter(|x| x.get("status").unwrap() == "enabled")
.map(|x|x.get("contentType").unwrap().as_str().unwrap().to_string())
.collect())
}

pub async fn set_subscription(&self, content_type: String, enable: bool) -> Result<()> {

let action = if enable { "start" } else { "stop" };
let url = format!("{}/subscriptions/{}?contentType={}",
self.get_base_url(),
action,
content_type
);
debug!("Subscribing to {} feed.", content_type);
let client = reqwest::Client::new();
let response = client
.post(url)
.headers(self.headers.clone())
.header("content-length", 0)
.send()
.await?;
if !response.status().is_success() {
let text = response.text().await?;
let msg = format!("Received error response subscribing to audit feed {}: {}", content_type, text);
error!("{}", msg);
return Err(anyhow!("{}", msg))
}
Ok(())
}

pub async fn subscribe_to_feeds(&self) -> Result<()> {

info!("Subscribing to audit feeds.");
Expand Down Expand Up @@ -138,23 +160,7 @@ impl ApiConnection {
}
}
for content_type in content_types {
let url = format!("{}/subscriptions/start?contentType={}",
self.get_base_url(),
content_type
);
debug!("Subscribing to {} feed.", content_type);
let response = client
.post(url)
.headers(self.headers.clone())
.header("content-length", 0)
.send()
.await?;
if !response.status().is_success() {
let text = response.text().await?;
let msg = format!("Received error response subscribing to audit feed {}: {}", content_type, text);
error!("{}", msg);
panic!("{}", msg);
}
self.set_subscription(content_type, true).await?;
}
info!("All audit feeds subscriptions exist.");
Ok(())
Expand Down Expand Up @@ -211,12 +217,13 @@ pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String
let content_type = content_type.clone();
let url = url.clone();
let known_blobs = known_blobs.clone();
let duplicate = config.duplicate;
async move {
match client.get(url.clone()).timeout(std::time::Duration::from_secs(5)).
headers(headers.clone()).send().await {
Ok(resp) => {
handle_blob_response(resp, blobs_tx, status_tx, content_tx, blob_error_tx,
content_type, url, &known_blobs).await;
content_type, url, &known_blobs, duplicate).await;
},
Err(e) => {
error!("Err getting blob response {}", e);
Expand All @@ -236,13 +243,14 @@ async fn handle_blob_response(
resp: reqwest::Response, blobs_tx: Sender<(String, String)>,
mut status_tx: Sender<StatusMessage>, content_tx: Sender<ContentToRetrieve>,
mut blob_error_tx: Sender<(String, String)>, content_type: String, url: String,
known_blobs: &HashMap<String, String>) {
known_blobs: &HashMap<String, String>, duplicate: usize) {

handle_blob_response_paging(&resp, blobs_tx, status_tx.clone(),
content_type.clone()).await;
match resp.json::<Vec<HashMap<String, serde_json::Value>>>().await {
Ok(i) => {
handle_blob_response_content_uris(status_tx, content_tx, content_type, i, known_blobs)
handle_blob_response_content_uris(status_tx, content_tx, content_type, i, known_blobs,
duplicate)
.await;
},
Err(e) => {
Expand Down Expand Up @@ -288,7 +296,8 @@ async fn handle_blob_response_paging(
/// over the content_tx channel for the content thread to retrieve.
async fn handle_blob_response_content_uris(
mut status_tx: Sender<StatusMessage>, mut content_tx: Sender<ContentToRetrieve>,
content_type: String, content_json: JsonList, known_blobs: &HashMap<String, String>) {
content_type: String, content_json: JsonList, known_blobs: &HashMap<String, String>,
duplicate: usize) {

for json_dict in content_json.into_iter() {
if json_dict.contains_key("contentUri") == false {
Expand All @@ -313,12 +322,19 @@ async fn handle_blob_response_content_uris(
let content_to_retrieve = ContentToRetrieve {
expiration, content_type: content_type.clone(), content_id, url};

content_tx.send(content_to_retrieve).await.unwrap_or_else(
|e| panic!("Could not send found content, channel closed?: {}", e)
);
status_tx.send(StatusMessage::FoundNewContentBlob).await.unwrap_or_else(
|e| panic!("Could not send status update, channel closed?: {}", e)
);
if duplicate <= 1 {
content_tx.send(content_to_retrieve).await.unwrap_or_else(
|e| panic!("Could not send found content, channel closed?: {}", e));
status_tx.send(StatusMessage::FoundNewContentBlob).await.unwrap_or_else(
|e| panic!("Could not send status update, channel closed?: {}", e));
} else {
for _ in 0..duplicate {
content_tx.send(content_to_retrieve.clone()).await.unwrap_or_else(
|e| panic!("Could not send found content, channel closed?: {}", e));
status_tx.send(StatusMessage::FoundNewContentBlob).await.unwrap_or_else(
|e| panic!("Could not send status update, channel closed?: {}", e));
}
}
}
};
}
Expand All @@ -341,7 +357,7 @@ async fn handle_blob_response_error(


/// Retrieve the actual ContentUris found in the JSON body of content blobs.
#[tokio::main(flavor="multi_thread", worker_threads=200)]
#[tokio::main(flavor="multi_thread", worker_threads=20)]
pub async fn get_content(config: GetContentConfig, content_rx: Receiver<ContentToRetrieve>) {
content_rx.for_each_concurrent(config.threads, |content_to_retrieve| {
let client = config.client.clone();
Expand Down Expand Up @@ -373,6 +389,29 @@ async fn handle_content_response(
mut status_tx: Sender<StatusMessage>, mut content_error_tx: Sender<ContentToRetrieve>,
content_to_retrieve: ContentToRetrieve) {

if !resp.status().is_success() {
match content_error_tx.send(content_to_retrieve).await {
Err(e) => {
error!("Could not resend failed content, dropping it: {}", e);
status_tx.send(StatusMessage::ErrorContentBlob).await.unwrap_or_else(
|e| panic!("Could not send status update, channel closed?: {}", e)
);
},
_=> (),
}
if let Ok(text) = resp.text().await {
if text.to_lowercase().contains("too many request") {
match status_tx.send(StatusMessage::BeingThrottled).await {
Err(e) => {
error!("Could not send status message: {}", e);
},
_=> (),
}
}
}
return
}

match resp.text().await {
Ok(json) => {
result_tx.send((json, content_to_retrieve)).await.unwrap_or_else(
Expand Down
Loading

0 comments on commit 238f0e3

Please sign in to comment.