Skip to content

Commit

Permalink
refactor: reduce duplicated code, unwrap
Browse files Browse the repository at this point in the history
  • Loading branch information
SkuldNorniern committed Feb 3, 2025
1 parent 8192f3b commit feae9e9
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
2 changes: 2 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub enum FluereError {
// Add new variants
FileNotFound(PathBuf),
ParameterMissing(String),
PluginError(String),
InvalidValue { field: String, value: String },
}

Expand All @@ -31,6 +32,7 @@ impl std::fmt::Display for FluereError {
Self::InvalidValue { field, value } => {
write!(f, "Invalid value '{}' for field '{}'", value, field)
}
Self::PluginError(e) => write!(f, "Plugin error: {}", e),
}
}
}
Expand Down
21 changes: 10 additions & 11 deletions src/net/live_fluereflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use crate::{
types::{Args, UDFlowKey},
utils::{cur_time_file, fluere_exporter},
FluereError,
error::OptionExt,
};
use std::{
borrow::Cow,
Expand Down Expand Up @@ -65,16 +66,14 @@ struct FlowSummary {
// It takes the command line arguments as input, which specify the network interface to capture from and other parameters.
// The function runs indefinitely, capturing packets and updating the terminal user interface with the captured data.
pub async fn online_packet_capture(arg: Args) -> Result<(), FluereError> {
let csv_file = arg
.files
.csv
.ok_or_else(|| FluereError::ConfigError("CSV file not specified".to_string()))?;
let use_mac = arg.parameters.use_mac.unwrap();
let interface_name = arg.interface.expect("interface not found");
let duration = arg.parameters.duration.unwrap();
let interval = arg.parameters.interval.unwrap();
let flow_timeout = arg.parameters.timeout.unwrap();
let _sleep_windows = arg.parameters.sleep_windows.unwrap();
let csv_file = arg.files.csv.required("this should be defaulted to `output` on construction")?;
//let enable_ipv6
let use_mac = arg.parameters.use_mac.required("this should be defaulted to `false` on construction")?;
let interface_name = arg.interface.required("interface should be provided")?;
let duration = arg.parameters.duration.required("this should be defaulted to `0(infinite)` on construction")?;
let interval = arg.parameters.interval.required("this should be defaulted to `30 minutes` on construction")?;
let flow_timeout = arg.parameters.timeout.required("this should be defaulted to `10 minutes` on construction")?;
let _sleep_windows = arg.parameters.sleep_windows.required("this should be defaulted to `false`, and now deprecated")?;
let config = Config::new();
let plugin_manager = PluginManager::new().expect("Failed to create plugin manager");
let plugin_worker = plugin_manager.start_worker();
Expand Down Expand Up @@ -276,7 +275,7 @@ pub async fn online_packet_capture(arg: Args) -> Result<(), FluereError> {
if is_reverse { "reverse" } else { "forward" }
);

if flags.fin == 1 || flags.rst == 1 {
if flags.is_finished() {
trace!("flow finished");
plugin_manager.process_flow_data(*flow).await.unwrap();
records.push(*flow);
Expand Down
10 changes: 5 additions & 5 deletions src/net/offline_fluereflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ use pcap::Capture;
use tokio::task;

pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
let _csv_file = arg.files.csv.required("csv file")?;
let file_name = arg.files.file.required("input file")?;
let use_mac = arg.parameters.use_mac.required("use_mac parameter")?;
let flow_timeout = arg.parameters.timeout.required("timeout parameter")?;
let _csv_file = arg.files.csv.required("this should be defaulted to `output` on construction")?;
let file_name = arg.files.file.required("pcap file path should be provided")?;
let use_mac = arg.parameters.use_mac.required("this should be defaulted to `false` on construction")?;
let flow_timeout = arg.parameters.timeout.required("this should be defaulted to `10 minutes` on construction")?;

let mut cap = Capture::from_file(file_name.clone())?;

Expand Down Expand Up @@ -137,7 +137,7 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> {
if is_reverse { "reverse" } else { "forward" }
);

if flags.fin == 1 || flags.rst == 1 {
if flags.is_finished() {
trace!("Flow finished");
trace!("Flow data: {:?}", flow);
records.push(*flow);
Expand Down
30 changes: 17 additions & 13 deletions src/net/online_fluereflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::{
types::{Args, UDFlowKey},
utils::{cur_time_file, fluere_exporter},
FluereError,
error::OptionExt,
};

use fluere_config::Config;
Expand All @@ -37,14 +38,14 @@ use tokio::{task, task::JoinHandle};
// It takes the command line arguments as input, which specify the network interface to capture from and other parameters.
// The function runs indefinitely, capturing packets and exporting the captured data to a CSV file.
pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
let csv_file = arg.files.csv.expect("this should be defaulted to `output` on construction");
let csv_file = arg.files.csv.required("this should be defaulted to `output` on construction")?;
//let enable_ipv6
let use_mac = arg.parameters.use_mac.expect("this should be defaulted to `false` on construction");
let interface_name = arg.interface.expect("interface not found");
let duration = arg.parameters.duration.expect("this should be defaulted to `0(infinite)` on construction");
let interval = arg.parameters.interval.expect("this should be defaulted to `30 minutes` on construction");
let flow_timeout = arg.parameters.timeout.expect("this should be defaulted to `10 minutes` on construction");
let _sleep_windows = arg.parameters.sleep_windows.expect("this should be defaulted to `false`, and now deprecated");
let use_mac = arg.parameters.use_mac.required("this should be defaulted to `false` on construction")?;
let interface_name = arg.interface.required("interface should be provided")?;
let duration = arg.parameters.duration.required("this should be defaulted to `0(infinite)` on construction")?;
let interval = arg.parameters.interval.required("this should be defaulted to `30 minutes` on construction")?;
let flow_timeout = arg.parameters.timeout.required("this should be defaulted to `10 minutes` on construction")?;
let _sleep_windows = arg.parameters.sleep_windows.required("this should be defaulted to `false`, and now deprecated")?;
let config = Config::new();
let plugin_manager = PluginManager::new().expect("Failed to create plugin manager");
let plugin_worker = plugin_manager.start_worker();
Expand Down Expand Up @@ -81,7 +82,10 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {

loop {
match cap.next_packet() {
Err(_) => continue,
Err(e) => {
trace!("Error capturing packet: {}", e);
continue;
}
Ok(packet) => {
trace!("received packet");

Expand Down Expand Up @@ -172,11 +176,11 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
trace!("flow key detail: {:?}", flow_key);

// Check if the flow has finished
if flags.fin == 1 || flags.rst == 1 {
if flags.is_finished() {
trace!("flow finished");
trace!("flow data: {:?}", flow);

plugin_manager.process_flow_data(*flow).await.unwrap();
plugin_manager.process_flow_data(*flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?;
records.push(*flow);

active_flow.remove(flow_key);
Expand All @@ -203,7 +207,7 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
for key in keys {
if let Some(flow) = active_flow.remove(&key) {
trace!("flow expired");
plugin_manager.process_flow_data(flow).await.unwrap();
plugin_manager.process_flow_data(flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?;
records.push(flow);
}
}
Expand Down Expand Up @@ -246,15 +250,15 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> {
for (_exp_time, keys) in flow_expirations.iter() {
for key in keys {
if let Some(flow) = active_flow.remove(key) {
plugin_manager.process_flow_data(flow).await.unwrap();
plugin_manager.process_flow_data(flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?;
records.push(flow);
}
}
}

debug!("Captured in {:?}", start.elapsed());
for (_key, flow) in active_flow.iter() {
plugin_manager.process_flow_data(*flow).await.unwrap();
plugin_manager.process_flow_data(*flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?;
records.push(*flow);
}
for task in tasks {
Expand Down
3 changes: 3 additions & 0 deletions src/net/types/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,7 @@ impl TcpFlags {
ns: flags[8],
}
}
pub fn is_finished(&self) -> bool {
self.fin == 1 || self.rst == 1
}
}

0 comments on commit feae9e9

Please sign in to comment.