33use std:: {
44 env,
55 fmt:: { self , Display } ,
6- fs,
7- io:: { self , Read } ,
6+ io,
87 num:: NonZeroU32 ,
9- path:: PathBuf ,
8+ path:: { Path , PathBuf } ,
109 str:: FromStr ,
1110} ;
1211
@@ -48,6 +47,8 @@ enum Error {
4847 LadingInspector ( #[ from] lading:: inspector:: Error ) ,
4948 #[ error( "Lading observer returned an error: {0}" ) ]
5049 LadingObserver ( #[ from] lading:: observer:: Error ) ,
50+ #[ error( "Failed to load or parse configuration: {0}" ) ]
51+ Config ( config:: Error ) ,
5152 #[ error( "Failed to deserialize Lading config: {0}" ) ]
5253 SerdeYaml ( #[ from] serde_yaml:: Error ) ,
5354 #[ error( "Lading failed to sync servers {0}" ) ]
@@ -58,7 +59,7 @@ enum Error {
5859 CapturePath ,
5960 #[ error( "Invalid path for prometheus socket" ) ]
6061 PrometheusPath ,
61- #[ error( "Invalid capture format, must be 'jsonl' or 'parquet '" ) ]
62+ #[ error( "Invalid capture format, must be 'jsonl', 'parquet', or 'multi '" ) ]
6263 InvalidCaptureFormat ,
6364 #[ error( transparent) ]
6465 Registration ( #[ from] lading_signal:: RegisterError ) ,
@@ -163,7 +164,7 @@ struct CliFlatLegacy {
163164 . args( & [ "experiment_duration_seconds" , "experiment_duration_infinite" ] ) ,
164165) ) ]
165166struct LadingArgs {
166- /// path on disk to the configuration file
167+ /// path on disk to a configuration file or directory containing config files
167168 #[ clap( long, default_value_t = default_config_path( ) ) ]
168169 config_path : String ,
169170 /// additional labels to apply to all captures, format KEY=VAL,KEY2=VAL
@@ -204,7 +205,7 @@ struct LadingArgs {
204205 /// time that capture metrics will expire by if they are not seen again, only useful when capture-path is set
205206 #[ clap( long) ]
206207 capture_expiriation_seconds : Option < u64 > ,
207- /// capture file format: jsonl or parquet (default: jsonl)
208+ /// capture file format: jsonl, parquet, or multi (default: jsonl)
208209 #[ clap( long, default_value = "jsonl" ) ]
209210 capture_format : String ,
210211 /// number of seconds to buffer before flushing capture file (default: 60)
@@ -255,30 +256,11 @@ struct RunCommand {
255256
256257#[ derive( Args ) ]
257258struct ConfigCheckCommand {
258- /// path on disk to the configuration file
259+ /// path on disk to a configuration file or directory containing config files
259260 #[ clap( long, default_value_t = default_config_path( ) ) ]
260261 config_path : String ,
261262}
262263
263- fn load_config_contents ( config_path : & str ) -> Result < String , Error > {
264- if let Ok ( env_var_value) = env:: var ( "LADING_CONFIG" ) {
265- debug ! ( "Using config from env var 'LADING_CONFIG'" ) ;
266- Ok ( env_var_value)
267- } else {
268- debug ! ( "Attempting to open configuration file at: {}" , config_path) ;
269- let mut file = fs:: OpenOptions :: new ( )
270- . read ( true )
271- . open ( config_path)
272- . map_err ( |err| {
273- error ! ( "Could not read config file '{}': {}" , config_path, err) ;
274- err
275- } ) ?;
276- let mut contents = String :: new ( ) ;
277- file. read_to_string ( & mut contents) ?;
278- Ok ( contents)
279- }
280- }
281-
282264fn parse_config ( contents : & str ) -> Result < Config , Error > {
283265 serde_yaml:: from_str ( contents) . map_err ( |err| {
284266 error ! ( "Configuration validation failed: {}" , err) ;
@@ -287,21 +269,33 @@ fn parse_config(contents: &str) -> Result<Config, Error> {
287269}
288270
289271fn validate_config ( config_path : & str ) -> Result < Config , Error > {
290- let contents = load_config_contents ( config_path) ?;
291- let config = parse_config ( & contents) ?;
292- info ! ( "Configuration file is valid" ) ;
293- Ok ( config)
272+ // Check if config is provided via environment variable
273+ if let Ok ( env_var_value) = env:: var ( "LADING_CONFIG" ) {
274+ debug ! ( "Using config from env var 'LADING_CONFIG'" ) ;
275+ let config = parse_config ( & env_var_value) ?;
276+ info ! ( "Configuration file is valid" ) ;
277+ Ok ( config)
278+ } else {
279+ // Load from path (file or directory)
280+ debug ! ( "Attempting to load configuration from: {config_path}" ) ;
281+ let config = config:: load_config_from_path ( Path :: new ( config_path) ) . map_err ( |err| {
282+ error ! ( "Could not load config from '{config_path}': {err}" ) ;
283+ Error :: Config ( err)
284+ } ) ?;
285+ info ! ( "Configuration file is valid" ) ;
286+ Ok ( config)
287+ }
294288}
295289
296290fn get_config ( args : & LadingArgs , config : Option < String > ) -> Result < Config , Error > {
297- let contents = if let Some ( config) = config {
298- config
291+ let mut config = if let Some ( contents) = config {
292+ // Config provided via environment variable - parse as single file
293+ parse_config ( & contents) ?
299294 } else {
300- load_config_contents ( & args. config_path ) ?
295+ // Load from path (auto-detect file or directory)
296+ config:: load_config_from_path ( Path :: new ( & args. config_path ) ) . map_err ( Error :: Config ) ?
301297 } ;
302298
303- let mut config = parse_config ( & contents) ?;
304-
305299 let target = if args. no_target {
306300 None
307301 } else if let Some ( pid) = args. target_pid {
@@ -352,6 +346,10 @@ fn get_config(args: &LadingArgs, config: Option<String>) -> Result<Config, Error
352346 flush_seconds : args. capture_flush_seconds ,
353347 compression_level : args. capture_compression_level ,
354348 } ,
349+ "multi" => config:: CaptureFormat :: Multi {
350+ flush_seconds : args. capture_flush_seconds ,
351+ compression_level : args. capture_compression_level ,
352+ } ,
355353 _ => return Err ( Error :: InvalidCaptureFormat ) ,
356354 } ;
357355
@@ -548,16 +546,14 @@ async fn inner_main(
548546 //
549547 // BLACKHOLE
550548 //
551- if let Some ( cfgs) = config. blackhole {
552- for cfg in cfgs {
553- let blackhole_server = blackhole:: Server :: new ( cfg, shutdown_watcher. clone ( ) ) ?;
554- let _bsrv = tokio:: spawn ( async {
555- match blackhole_server. run ( ) . await {
556- Ok ( ( ) ) => debug ! ( "blackhole shut down successfully" ) ,
557- Err ( err) => warn ! ( "blackhole failed with {:?}" , err) ,
558- }
559- } ) ;
560- }
549+ for cfg in config. blackhole {
550+ let blackhole_server = blackhole:: Server :: new ( cfg, shutdown_watcher. clone ( ) ) ?;
551+ let _bsrv = tokio:: spawn ( async {
552+ match blackhole_server. run ( ) . await {
553+ Ok ( ( ) ) => debug ! ( "blackhole shut down successfully" ) ,
554+ Err ( err) => warn ! ( "blackhole failed with {:?}" , err) ,
555+ }
556+ } ) ;
561557 }
562558
563559 //
0 commit comments