Skip to content

Commit

Permalink
Compiled wheels
Browse files Browse the repository at this point in the history
  • Loading branch information
ddbnl committed May 2, 2022
1 parent d138594 commit d1eb0bb
Show file tree
Hide file tree
Showing 10 changed files with 106 additions and 41 deletions.
Binary file modified Linux/LINUX-OfficeAuditLogCollector-V2.0
Binary file not shown.
Binary file not shown.
Binary file not shown.
2 changes: 1 addition & 1 deletion Source/AuditLogCollector.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def receive_results_from_rust_engine(self):
break
last_received = now
except EOFError: # RustEngine throws this error when all content has been retrieved
print("Finished run")
logging.info("Rust engine finished receiving all content")
break
else:
content_json, content_id, content_expiration, content_type = result
Expand Down
28 changes: 25 additions & 3 deletions Source/RustEngine/src/api_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ pub fn get_api_connection(tenant_id: String, client_id: String, secret_key: Stri
api.login();
api
}
/// Abstraction of an API connection to Azure Management APIs. Can be used to login to the API and
/// retrieve log content.
/// Abstraction of an API connection to Azure Management APIs. Can be used to login to the API
/// which sets the headers. These headers can then be used to make authenticated requests.
pub struct ApiConnection {
pub tenant_id: String,
pub client_id: String,
Expand Down Expand Up @@ -61,7 +61,11 @@ impl ApiConnection {
}


/// Create a URL that can retrieve the first page of content for each passed content type.
/// Create a URL that can retrieve the first page of content for each passed content type. Each
/// content type can have multiple runs specified. A run consists of a start- and end date to
/// retrieve data for. Max. time span is 24, so if the user wants to retrieve for e.g. 72 hours,
/// we need 3 runs of 24 hours each. The runs object looks like e.g.:
/// Runs{Audit.Exchange: [(start_date, end_date), (start_date, end_date), (start_date, end_date)}
pub fn create_base_urls(
content_types: Vec<String>, tenant_id: String, publisher_id: String,
runs: HashMap<String, Vec<(String, String)>>) -> Vec<(String, String)> {
Expand All @@ -83,6 +87,11 @@ pub fn create_base_urls(
}


/// Get available content blobs to retrieve. A base URL receices the initial page of content blobs.
/// The response header could specify 'NextPageUri', which if it exists specifies the URL for the
/// next page of content. This is sent over the blobs_tx channel to retrieve as well. If no
/// additional pages exist, a status message is sent to indicate all content blobs for this
/// content type have been retrieved.
#[tokio::main(flavor="multi_thread", worker_threads=200)]
pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String, String)>) {
blobs_rx.for_each_concurrent(config.threads, |(content_type, url)| {
Expand Down Expand Up @@ -112,6 +121,9 @@ pub async fn get_content_blobs(config: GetBlobConfig, blobs_rx: Receiver<(String
}


/// Deal with the response of a successful content blob request. Try to decode into JSON to
/// retrieve the content URIs of the content inside the blob. Also check response header for another
/// page of content blobs.
async fn handle_blob_response(
resp: reqwest::Response, blobs_tx: Sender<(String, String)>,
mut status_tx: Sender<StatusMessage>, content_tx: Sender<ContentToRetrieve>,
Expand All @@ -138,6 +150,7 @@ async fn handle_blob_response(
}


/// Determine if a content blob response header contains a reference to another page of blobs.
async fn handle_blob_response_paging(
resp: &reqwest::Response, mut blobs_tx: Sender<(String, String)>,
mut status_tx: Sender<StatusMessage>, content_type: String) {
Expand All @@ -155,6 +168,9 @@ async fn handle_blob_response_paging(
};
}


/// Deal with successfully received and decoded content blobs. Send the URIs of content to retrieve
/// over the content_tx channel for the content thread to retrieve.
async fn handle_blob_response_content_uris(
mut status_tx: Sender<StatusMessage>, mut content_tx: Sender<ContentToRetrieve>,
content_type: String, content_json: JsonList) {
Expand Down Expand Up @@ -185,6 +201,7 @@ async fn handle_blob_response_content_uris(
};
}

/// Deal with error while requesting a content blob.
async fn handle_blob_response_error(
mut status_tx: Sender<StatusMessage>, mut blob_error_tx: Sender<(String, String)>,
content_type: String, url: String) {
Expand All @@ -198,6 +215,8 @@ async fn handle_blob_response_error(
}
}


/// Retrieve the actual ContentUris found in the JSON body of content blobs.
#[tokio::main(flavor="multi_thread", worker_threads=200)]
pub async fn get_content(config: GetContentConfig, content_rx: Receiver<ContentToRetrieve>) {
content_rx.for_each_concurrent(config.threads, |content_to_retrieve| {
Expand All @@ -224,6 +243,7 @@ pub async fn get_content(config: GetContentConfig, content_rx: Receiver<ContentT
}


/// Deal with successful content request response.
async fn handle_content_response(
resp: reqwest::Response, result_tx: std::sync::mpsc::Sender<(String, ContentToRetrieve)>,
mut status_tx: Sender<StatusMessage>, mut content_error_tx: Sender<ContentToRetrieve>,
Expand All @@ -247,6 +267,8 @@ async fn handle_content_response(
}
}


/// Deal with error response requesting a contentURI.
async fn handle_content_response_error(
mut status_tx: Sender<StatusMessage>, mut content_error_tx: Sender<ContentToRetrieve>,
content_to_retrieve: ContentToRetrieve) {
Expand Down
7 changes: 6 additions & 1 deletion Source/RustEngine/src/data_structures.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::collections::HashMap;
use reqwest::header::HeaderMap;
use serde_derive::{Deserialize};

/// List of JSON responses (used to represent content blobs)
pub type JsonList = Vec<HashMap<String, serde_json::Value>>;


Expand All @@ -13,6 +14,10 @@ pub struct AuthResult {
pub access_token: String,
}


/// Representation of content we need to retrieve. ID, expiration and content type are passed to
/// python along with the retrieved content. ID an expiration are needed for avoiding known logs,
/// content type for categorization in outputs.
pub struct ContentToRetrieve {
pub content_type: String,
pub content_id: String,
Expand Down Expand Up @@ -69,13 +74,13 @@ pub struct MessageLoopConfig {
}


/// These stats are passed back to python after a run has finished to show to end-user.
pub struct RunStatistics {
pub blobs_found: usize,
pub blobs_successful: usize,
pub blobs_error: usize,
pub blobs_retried: usize,
}

impl RunStatistics {
pub fn new() -> RunStatistics {
RunStatistics {
Expand Down
110 changes: 74 additions & 36 deletions Source/RustEngine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,16 @@ use crate::data_structures::{ContentToRetrieve, RunStatistics};
mod api_connection;
mod data_structures;


#[pyclass]
/// # Rust Engine
/// A class instantiated in Python. Python will call the run_once method below, which will start
/// three background threads responsible for retrieving content. Python will then call
/// the get_result method on a loop to drain the results from the results channel until it is
/// disconnected. The three background threads are:
/// - blob_thread: find content blobs and send results to content channel
/// - content_thread: retrieve content blobs from content channel, send results to results channel
/// - message_loop_thread: keep track of progress, terminate after all content is retrieved
pub struct RustEngine {
tenant_id: String,
client_id: String,
Expand Down Expand Up @@ -47,6 +56,8 @@ impl RustEngine {
}
}

/// Non-blocking. Call once to start retrieving logs, which will arrive in the results_rx
/// receiver. Call get_results iteratively to drain the results channel.
pub fn run_once(&mut self) {
let api = api_connection::get_api_connection(
self.tenant_id.clone(), self.client_id.clone(),
Expand All @@ -58,9 +69,14 @@ impl RustEngine {
self.stats_rx = Some(stats_rx);
}

/// ValueError means nothing in the channel right now, but more will come. EOFError means
/// all results received, no more will come. Message loop closes the results channel when
/// all content has been retrieved.
pub fn get_result(&self) -> PyResult<(String, String, String, String)> {
match self.result_rx.as_ref().unwrap().try_recv() {
Ok((i,j) ) => Ok((i, j.content_id, j.expiration, j.content_type)),
Ok((i,j) ) => {
Ok((i, j.content_id, j.expiration, j.content_type))
},
Err(std::sync::mpsc::TryRecvError::Empty) => {
Err(pyo3::exceptions::PyValueError::new_err("No logs ready"))
},
Expand All @@ -70,13 +86,15 @@ impl RustEngine {
}
}

/// Receive the run results. This can only happen when the message_loop thread had exited its'
/// loop, so if we return the results we know the engine has stopped.
pub fn stop(&self) -> PyResult<(usize, usize, usize, usize)> {
Ok(self.stats_rx.as_ref().unwrap().try_recv().unwrap())
}
}


/// # Initialize a config object for each sub thread to run
/// Initialize a config object for each sub thread to run
/// - Blob thread: Collect available content blobs
/// - Content thread: Collect the blobs found by blob thread
/// - Message loop: Communicates with other threads to handle retries and terminate when finished
Expand All @@ -85,24 +103,31 @@ fn initialize_configs(
runs: HashMap<String, Vec<(String, String)>>, retries: usize, threads:usize)
-> (data_structures::GetBlobConfig, data_structures::GetContentConfig,
data_structures::MessageLoopConfig, Receiver<(String, String)>, Receiver<ContentToRetrieve>,
std::sync::mpsc::Receiver<(String, ContentToRetrieve)>, std::sync::mpsc::Receiver<(usize, usize, usize, usize)>) {
std::sync::mpsc::Receiver<(String, ContentToRetrieve)>,
std::sync::mpsc::Receiver<(usize, usize, usize, usize)>) {

let urls = api_connection::create_base_urls(
content_types.clone(), api.tenant_id, api.publisher_id, runs);

// Create channels to communicate with async closures
let (status_tx, status_rx): (Sender<data_structures::StatusMessage>, Receiver<data_structures::StatusMessage>) = channel(100000);
let (blobs_tx, blobs_rx): (Sender<(String, String)>, Receiver<(String, String)>) = channel(100000);
let (status_tx, status_rx):
(Sender<data_structures::StatusMessage>, Receiver<data_structures::StatusMessage>) =
channel(100000);
let (blobs_tx, blobs_rx): (Sender<(String, String)>, Receiver<(String, String)>) =
channel(100000);
let (blob_error_tx, blob_error_rx):
(Sender<(String, String)>, Receiver<(String, String)>) = channel(100000);
let (content_tx, content_rx): (Sender<ContentToRetrieve>, Receiver<ContentToRetrieve>) = channel(100000);
let (content_error_tx, content_error_rx): (Sender<ContentToRetrieve>, Receiver<ContentToRetrieve>) = channel(100000000);
let (content_tx, content_rx): (Sender<ContentToRetrieve>, Receiver<ContentToRetrieve>) =
channel(100000);
let (content_error_tx, content_error_rx):
(Sender<ContentToRetrieve>, Receiver<ContentToRetrieve>) = channel(100000000);
let (result_tx, result_rx):
(std::sync::mpsc::Sender<(String, ContentToRetrieve)>, std::sync::mpsc::Receiver<(String, ContentToRetrieve)>) =
(std::sync::mpsc::Sender<(String, ContentToRetrieve)>,
std::sync::mpsc::Receiver<(String, ContentToRetrieve)>) =
std::sync::mpsc::channel();
let (stats_tx, stats_rx):
(std::sync::mpsc::Sender<(usize, usize, usize, usize)>, std::sync::mpsc::Receiver<(usize, usize, usize, usize)>) =
std::sync::mpsc::channel();
(std::sync::mpsc::Sender<(usize, usize, usize, usize)>,
std::sync::mpsc::Receiver<(usize, usize, usize, usize)>) = std::sync::mpsc::channel();

let blob_config = data_structures::GetBlobConfig { client: reqwest::Client::new(), headers: api.headers.clone(),
status_tx: status_tx.clone(), blobs_tx: blobs_tx.clone(),
Expand All @@ -121,7 +146,8 @@ fn initialize_configs(
}


/// Get all the available log content for a list of content types.
/// Get all the available log content for a list of content types and runs (start- and end times
/// of content to receive).
fn get_available_content(api: api_connection::ApiConnection, content_types: Vec<String>,
runs: HashMap<String, Vec<(String, String)>>, threads: usize,
retries: usize)
Expand All @@ -131,37 +157,26 @@ fn get_available_content(api: api_connection::ApiConnection, content_types: Vec<
let (blob_config, content_config, message_loop_config,
blobs_rx, content_rx, result_rx, stats_rx)
= initialize_configs(api, content_types, runs, retries, threads);
let (blob_handle, content_handle, message_loop_handle)
= spawn_blob_collector(blob_config, content_config, message_loop_config, blobs_rx,
content_rx);
spawn_blob_collector(blob_config, content_config, message_loop_config, blobs_rx, content_rx);
(result_rx, stats_rx)
}

/// Spawn a thread running the actual collect. This allows the main thread to keep track of
/// statistics and handle any errors in the collector and/or output interfaces.
/// Spawn threads running the actual collectors, and a message loop thread to keep track of
/// progress and terminate once finished.
fn spawn_blob_collector(
blob_config: data_structures::GetBlobConfig, content_config: data_structures::GetContentConfig,
message_loop_config: data_structures::MessageLoopConfig, blobs_rx: Receiver<(String, String)>,
content_rx: Receiver<(ContentToRetrieve)>)
-> (std::thread::JoinHandle<()>, std::thread::JoinHandle<()>, std::thread::JoinHandle<()>) {

let blob_handle = thread::spawn( move || {
api_connection::get_content_blobs(blob_config, blobs_rx);
});
let content_handle = thread::spawn( move || {
api_connection::get_content(content_config, content_rx);
});

let msg_loop_handle = thread::spawn(move || {
message_loop(message_loop_config)
});
content_rx: Receiver<(ContentToRetrieve)>) {

(blob_handle, content_handle, msg_loop_handle)
thread::spawn( move || {api_connection::get_content_blobs(blob_config, blobs_rx);});
thread::spawn( move || {api_connection::get_content(content_config, content_rx);});
thread::spawn(move || {message_loop(message_loop_config)});
}

/// Start receiving messages from the status and stats channels. Status channel will send a
/// termination signal once the async closures are done retrieving content. Stats channels
/// receives updates on how the async closures are doing while running.
/// Receive status updates to keep track of when all content has been retrieved. Also handle
/// retrying any failed content or dropping it after too many retries. Every time content is foudn
/// awaiting_content_blobs is incremented; every time content is retrieved or could not be
/// retrieved awaiting_content_blobs is decremented. When it reaches 0 we know we are done.
#[tokio::main]
pub async fn message_loop(mut config: data_structures::MessageLoopConfig) {

Expand All @@ -171,23 +186,38 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) {
config.blobs_tx.clone().send((content_type, base_url)).await.unwrap();
awaiting_content_types += 1;
}
let mut awaiting_content_blobs: usize = 0; // Incremented and decremented by loop
// Keep track of found and retrieved content blobs
let mut awaiting_content_blobs: usize = 0;
// Keep track of retry count for failed blobs
let mut retry_map :HashMap<String, usize> = HashMap::new();
// Keep stats to return to python after run finishes
let mut stats = RunStatistics::new();
// Loop ends with the run itself, signalling the program is done.
loop {
// Receive status message indicated found content and retrieved content. When all blobs have
// been found, and all found blobs have been retrieved, we are done.
match config.status_rx.try_next() {
Ok(Some(msg)) => {
match msg {
// awaiting_content_types is initially the size of content type * runs for each
// content type. When retrieving pages if we don't get a NextPageUri response
// header, we know we have found all possible blobs for that content type and
// we decrement awaiting_content_types. When it hits 0 we know we found all
// content that can possible be retrieved.
data_structures::StatusMessage::FinishedContentBlobs => {
if awaiting_content_types == 0 {
} else {
if awaiting_content_types > 0 {
awaiting_content_types -= 1;
}
},
// We have found a new content blob while iterating through the pages of them.
// It has been queued up to be retrieved.
data_structures::StatusMessage::FoundNewContentBlob => {
awaiting_content_blobs +=1;
stats.blobs_found += 1;
},
// A queued up content blob has actually been retrieved so we are done with it.
// When awaiting_content_blobs hits 0 we are done retrieving all actual content
// and we can exit.
data_structures::StatusMessage::RetrievedContentBlob => {
awaiting_content_blobs -= 1;
stats.blobs_successful += 1;
Expand All @@ -196,6 +226,9 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) {
break;
}
},
// A queued up content blob could not be retrieved so we are done with it.
// When awaiting_content_blobs hits 0 we are done retrieving all actual content
// and we can exit.
data_structures::StatusMessage::ErrorContentBlob => {
awaiting_content_blobs -= 1;
stats.blobs_error += 1;
Expand All @@ -209,6 +242,8 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) {
},
_ => ()
}
// Check channel for content pages that could not be retrieved and retry them the user
// defined amount of times. If we can't in that amount of times then give up.
match config.blob_error_rx.try_next() {
Ok(Some((content_type, url))) => {
if retry_map.contains_key(&url) == true {
Expand All @@ -233,6 +268,8 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) {
},
_ => (),
};
// Check channel for content blobs that could not be retrieved and retry them the user
// defined amount of times. If we can't in that amount of times then give up.
match config.content_error_rx.try_next() {
Ok(Some(content)) => {
if retry_map.contains_key(&content.url) == true {
Expand Down Expand Up @@ -263,6 +300,7 @@ pub async fn message_loop(mut config: data_structures::MessageLoopConfig) {
awaiting_content_types, awaiting_content_blobs}
*/
}
// We send back stats after exiting the loop, signalling the end of the run.
config.stats_tx.send((stats.blobs_found, stats.blobs_successful, stats.blobs_retried,
stats.blobs_error)).unwrap();
}
Expand Down
Binary file modified Source/requirements.txt
Binary file not shown.
Binary file removed Windows/GUI-OfficeAuditLogCollector-v1.1.exe
Binary file not shown.
Binary file modified Windows/WIN-OfficeAuditLogCollector-V2.0.exe
Binary file not shown.

0 comments on commit d1eb0bb

Please sign in to comment.