From feae9e9721d56a12ac862ef864bb4736dc751ab7 Mon Sep 17 00:00:00 2001 From: SkuldNorniern Date: Mon, 3 Feb 2025 15:50:39 +0900 Subject: [PATCH] refactor: reduce duplicated code, unwrap --- src/error.rs | 2 ++ src/net/live_fluereflow.rs | 21 ++++++++++----------- src/net/offline_fluereflows.rs | 10 +++++----- src/net/online_fluereflow.rs | 30 +++++++++++++++++------------- src/net/types/flags.rs | 3 +++ 5 files changed, 37 insertions(+), 29 deletions(-) diff --git a/src/error.rs b/src/error.rs index 11a55a4..c5f9250 100644 --- a/src/error.rs +++ b/src/error.rs @@ -13,6 +13,7 @@ pub enum FluereError { // Add new variants FileNotFound(PathBuf), ParameterMissing(String), + PluginError(String), InvalidValue { field: String, value: String }, } @@ -31,6 +32,7 @@ impl std::fmt::Display for FluereError { Self::InvalidValue { field, value } => { write!(f, "Invalid value '{}' for field '{}'", value, field) } + Self::PluginError(e) => write!(f, "Plugin error: {}", e), } } } diff --git a/src/net/live_fluereflow.rs b/src/net/live_fluereflow.rs index 5da080a..6d67253 100644 --- a/src/net/live_fluereflow.rs +++ b/src/net/live_fluereflow.rs @@ -12,6 +12,7 @@ use crate::{ types::{Args, UDFlowKey}, utils::{cur_time_file, fluere_exporter}, FluereError, + error::OptionExt, }; use std::{ borrow::Cow, @@ -65,16 +66,14 @@ struct FlowSummary { // It takes the command line arguments as input, which specify the network interface to capture from and other parameters. // The function runs indefinitely, capturing packets and updating the terminal user interface with the captured data. pub async fn online_packet_capture(arg: Args) -> Result<(), FluereError> { - let csv_file = arg - .files - .csv - .ok_or_else(|| FluereError::ConfigError("CSV file not specified".to_string()))?; - let use_mac = arg.parameters.use_mac.unwrap(); - let interface_name = arg.interface.expect("interface not found"); - let duration = arg.parameters.duration.unwrap(); - let interval = arg.parameters.interval.unwrap(); - let flow_timeout = arg.parameters.timeout.unwrap(); - let _sleep_windows = arg.parameters.sleep_windows.unwrap(); + let csv_file = arg.files.csv.required("this should be defaulted to `output` on construction")?; + //let enable_ipv6 + let use_mac = arg.parameters.use_mac.required("this should be defaulted to `false` on construction")?; + let interface_name = arg.interface.required("interface should be provided")?; + let duration = arg.parameters.duration.required("this should be defaulted to `0(infinite)` on construction")?; + let interval = arg.parameters.interval.required("this should be defaulted to `30 minutes` on construction")?; + let flow_timeout = arg.parameters.timeout.required("this should be defaulted to `10 minutes` on construction")?; + let _sleep_windows = arg.parameters.sleep_windows.required("this should be defaulted to `false`, and now deprecated")?; let config = Config::new(); let plugin_manager = PluginManager::new().expect("Failed to create plugin manager"); let plugin_worker = plugin_manager.start_worker(); @@ -276,7 +275,7 @@ pub async fn online_packet_capture(arg: Args) -> Result<(), FluereError> { if is_reverse { "reverse" } else { "forward" } ); - if flags.fin == 1 || flags.rst == 1 { + if flags.is_finished() { trace!("flow finished"); plugin_manager.process_flow_data(*flow).await.unwrap(); records.push(*flow); diff --git a/src/net/offline_fluereflows.rs b/src/net/offline_fluereflows.rs index 5b845ba..7e0b1e6 100644 --- a/src/net/offline_fluereflows.rs +++ b/src/net/offline_fluereflows.rs @@ -24,10 +24,10 @@ use pcap::Capture; use tokio::task; pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { - let _csv_file = arg.files.csv.required("csv file")?; - let file_name = arg.files.file.required("input file")?; - let use_mac = arg.parameters.use_mac.required("use_mac parameter")?; - let flow_timeout = arg.parameters.timeout.required("timeout parameter")?; + let _csv_file = arg.files.csv.required("this should be defaulted to `output` on construction")?; + let file_name = arg.files.file.required("pcap file path should be provided")?; + let use_mac = arg.parameters.use_mac.required("this should be defaulted to `false` on construction")?; + let flow_timeout = arg.parameters.timeout.required("this should be defaulted to `10 minutes` on construction")?; let mut cap = Capture::from_file(file_name.clone())?; @@ -137,7 +137,7 @@ pub async fn fluereflow_fileparse(arg: Args) -> Result<(), FluereError> { if is_reverse { "reverse" } else { "forward" } ); - if flags.fin == 1 || flags.rst == 1 { + if flags.is_finished() { trace!("Flow finished"); trace!("Flow data: {:?}", flow); records.push(*flow); diff --git a/src/net/online_fluereflow.rs b/src/net/online_fluereflow.rs index 1502a4f..c91d8c7 100644 --- a/src/net/online_fluereflow.rs +++ b/src/net/online_fluereflow.rs @@ -20,6 +20,7 @@ use crate::{ types::{Args, UDFlowKey}, utils::{cur_time_file, fluere_exporter}, FluereError, + error::OptionExt, }; use fluere_config::Config; @@ -37,14 +38,14 @@ use tokio::{task, task::JoinHandle}; // It takes the command line arguments as input, which specify the network interface to capture from and other parameters. // The function runs indefinitely, capturing packets and exporting the captured data to a CSV file. pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { - let csv_file = arg.files.csv.expect("this should be defaulted to `output` on construction"); + let csv_file = arg.files.csv.required("this should be defaulted to `output` on construction")?; //let enable_ipv6 - let use_mac = arg.parameters.use_mac.expect("this should be defaulted to `false` on construction"); - let interface_name = arg.interface.expect("interface not found"); - let duration = arg.parameters.duration.expect("this should be defaulted to `0(infinite)` on construction"); - let interval = arg.parameters.interval.expect("this should be defaulted to `30 minutes` on construction"); - let flow_timeout = arg.parameters.timeout.expect("this should be defaulted to `10 minutes` on construction"); - let _sleep_windows = arg.parameters.sleep_windows.expect("this should be defaulted to `false`, and now deprecated"); + let use_mac = arg.parameters.use_mac.required("this should be defaulted to `false` on construction")?; + let interface_name = arg.interface.required("interface should be provided")?; + let duration = arg.parameters.duration.required("this should be defaulted to `0(infinite)` on construction")?; + let interval = arg.parameters.interval.required("this should be defaulted to `30 minutes` on construction")?; + let flow_timeout = arg.parameters.timeout.required("this should be defaulted to `10 minutes` on construction")?; + let _sleep_windows = arg.parameters.sleep_windows.required("this should be defaulted to `false`, and now deprecated")?; let config = Config::new(); let plugin_manager = PluginManager::new().expect("Failed to create plugin manager"); let plugin_worker = plugin_manager.start_worker(); @@ -81,7 +82,10 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { loop { match cap.next_packet() { - Err(_) => continue, + Err(e) => { + trace!("Error capturing packet: {}", e); + continue; + } Ok(packet) => { trace!("received packet"); @@ -172,11 +176,11 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { trace!("flow key detail: {:?}", flow_key); // Check if the flow has finished - if flags.fin == 1 || flags.rst == 1 { + if flags.is_finished() { trace!("flow finished"); trace!("flow data: {:?}", flow); - plugin_manager.process_flow_data(*flow).await.unwrap(); + plugin_manager.process_flow_data(*flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?; records.push(*flow); active_flow.remove(flow_key); @@ -203,7 +207,7 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { for key in keys { if let Some(flow) = active_flow.remove(&key) { trace!("flow expired"); - plugin_manager.process_flow_data(flow).await.unwrap(); + plugin_manager.process_flow_data(flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?; records.push(flow); } } @@ -246,7 +250,7 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { for (_exp_time, keys) in flow_expirations.iter() { for key in keys { if let Some(flow) = active_flow.remove(key) { - plugin_manager.process_flow_data(flow).await.unwrap(); + plugin_manager.process_flow_data(flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?; records.push(flow); } } @@ -254,7 +258,7 @@ pub async fn packet_capture(arg: Args) -> Result<(), FluereError> { debug!("Captured in {:?}", start.elapsed()); for (_key, flow) in active_flow.iter() { - plugin_manager.process_flow_data(*flow).await.unwrap(); + plugin_manager.process_flow_data(*flow).await.map_err(|e| FluereError::PluginError(e.to_string()))?; records.push(*flow); } for task in tasks { diff --git a/src/net/types/flags.rs b/src/net/types/flags.rs index eec5aab..d6cf3d0 100644 --- a/src/net/types/flags.rs +++ b/src/net/types/flags.rs @@ -25,4 +25,7 @@ impl TcpFlags { ns: flags[8], } } + pub fn is_finished(&self) -> bool { + self.fin == 1 || self.rst == 1 + } }