Skip to content

Commit

Permalink
improve: improve memory efficiency of flow capture
Browse files Browse the repository at this point in the history
  • Loading branch information
SkuldNorniern committed Jun 11, 2024
1 parent 8769cbe commit cea1610
Showing 1 changed file with 17 additions and 33 deletions.
50 changes: 17 additions & 33 deletions src/net/online_fluereflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use std::{
collections::HashMap,
fs,
mem::take,
time::{Duration, Instant},
};

Expand All @@ -29,9 +30,8 @@ use fluere_config::Config;
use fluere_plugin::PluginManager;
use fluereflow::FluereRecord;

use tokio::task;

use log::{debug, info, trace};
use tokio::task;

// This function captures packets from a network interface and converts them into NetFlow data.
// It takes the command line arguments as input, which specify the network interface to capture from and other parameters.
Expand All @@ -53,17 +53,13 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
.await
.expect("Failed to load plugins");

let interface = find_device(interface_name.as_str())?;
let interface = find_device(&interface_name)?;
let mut cap_device = CaptureDevice::new(interface.clone()).map_err(NetError::from)?;
let cap = &mut cap_device.capture;

let file_dir = "./output";
match fs::create_dir_all(<&str>::clone(&file_dir)) {
Ok(_) => {
trace!("Created directory: {}", file_dir)
}
Err(error) => panic!("Problem creating directory: {:?}", error),
};
fs::create_dir_all(file_dir)
.unwrap_or_else(|error| panic!("Problem creating directory: {:?}", error));

let start = Instant::now();
let mut last_export = Instant::now();
Expand All @@ -78,16 +74,12 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
let mut active_flow: HashMap<Key, FluereRecord> = HashMap::new();
let mut tasks = vec![];
let mut export_tasks = vec![];
// let mut packet_count = 0;
// let export_rt =

loop {
match cap.next_packet() {
Err(_) => {
continue;
}
Err(_) => continue,
Ok(packet) => {
trace!("received packet");
// trace!("packet: {:?}", );

let (mut key_value, mut reverse_key) = match parse_keys(packet.clone()) {
Ok(keys) => keys,
Expand Down Expand Up @@ -149,14 +141,14 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
packet.header.ts.tv_sec as u64,
packet.header.ts.tv_usec as u64,
);
//println!("time: {:?}", time);
let pkt = flowdata.min_pkt;
let ttl = flowdata.min_ttl;
// trace!(
// "current inputed flow{:?}",
// active_flow.get(&key_value).unwrap()
// );
let flow_key = if is_reverse { &reverse_key } else { &key_value };

if let Some(flow) = active_flow.get_mut(flow_key) {
let update_key = UDFlowKey {
doctets,
Expand Down Expand Up @@ -190,7 +182,7 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
if last_export.elapsed() >= Duration::from_millis(interval) && interval != 0 {
let mut expired_flows = vec![];
let mut expired_flow_data = vec![];
// packet_count = 0;

debug!("Calculating timeout start");
for (key, flow) in active_flow.iter() {
if flow_timeout > 0 && flow.last < (time - (flow_timeout * 1000)) {
Expand All @@ -204,19 +196,14 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
}
}

// Send the expired flows to the plugins
debug!(
"Sending {} expired flows to plugins start",
expired_flows.len()
);

let cloned_plugin_manager = plugin_manager.clone();
let plugin_manager_clone = plugin_manager.clone();
tasks.push(task::spawn(async move {
for flow in &expired_flow_data {
cloned_plugin_manager
.process_flow_data(*flow)
.await
.unwrap();
plugin_manager_clone.process_flow_data(*flow).await.unwrap();
}
debug!(
"Sending {} expired flows to plugins done",
Expand All @@ -225,21 +212,18 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
}));

active_flow.retain(|key, _| !expired_flows.contains(key));
let cloned_records = records.clone();
records.clear();
let records_to_export = take(&mut records);
debug!("Calculating timeout done");

let file_path_clone = file_path.clone();
//let file = fs::File::create(file_path_clone).unwrap();
info!("Export {} Started", file_path_clone);
export_tasks.push(task::spawn(async move {
fluere_exporter(cloned_records, file).await;
fluere_exporter(records_to_export, file).await;
info!("Export {} Finished", file_path_clone);
}));

// let result = tasks.await;
info!("running without blockng");
file_path = cur_time_file(csv_file.as_str(), file_dir, ".csv");
info!("running without blocking");
file_path = cur_time_file(&csv_file, file_dir, ".csv");
file = fs::File::create(file_path.as_ref()).unwrap();
last_export = Instant::now();
}
Expand Down Expand Up @@ -270,9 +254,9 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
let _ = task.await;
}

let cloned_records = records.clone();
let records_to_export = take(&mut records);
export_tasks.push(task::spawn(async {
fluere_exporter(cloned_records, file).await;
fluere_exporter(records_to_export, file).await;
}));
plugin_manager.await_completion(plugin_worker).await;
drop(plugin_manager);
Expand Down

0 comments on commit cea1610

Please sign in to comment.