1
1
use anyhow:: Result ;
2
+ use caracat:: models:: Probe ;
2
3
use log:: trace;
3
4
use std:: io:: { stdin, BufRead } ;
5
+ use std:: path:: PathBuf ;
4
6
5
7
use crate :: auth:: { KafkaAuth , SaslAuth } ;
6
8
use crate :: client:: producer:: produce;
7
9
use crate :: config:: AppConfig ;
8
10
use crate :: probe:: decode_probe;
9
11
10
- pub async fn handle ( config : & AppConfig , agents : & str ) -> Result < ( ) > {
12
+ fn read_probes < R : BufRead > ( buf_reader : R ) -> Result < Vec < Probe > > {
13
+ let mut probes = Vec :: new ( ) ;
14
+ for line in buf_reader. lines ( ) {
15
+ let probe = line?;
16
+ probes. push ( decode_probe ( & probe) ?) ;
17
+ }
18
+ Ok ( probes)
19
+ }
20
+
21
+ pub async fn handle ( config : & AppConfig , agents : & str , probes_file : Option < PathBuf > ) -> Result < ( ) > {
11
22
trace ! ( "Client handler" ) ;
12
23
trace ! ( "{:?}" , config) ;
13
24
@@ -26,16 +37,24 @@ pub async fn handle(config: &AppConfig, agents: &str) -> Result<()> {
26
37
}
27
38
} ;
28
39
29
- // Get probes from stdin
30
- let mut probes = Vec :: new ( ) ;
31
- for line in stdin ( ) . lock ( ) . lines ( ) {
32
- let probe = line?;
33
- probes. push ( decode_probe ( & probe) ?) ;
34
- }
40
+ // Read probes from file or stdin
41
+ let probes = match probes_file {
42
+ Some ( probes_file) => {
43
+ let file = std:: fs:: File :: open ( probes_file) ?;
44
+ let buf_reader = std:: io:: BufReader :: new ( file) ;
45
+ read_probes ( buf_reader) ?
46
+ }
47
+ None => {
48
+ let stdin = stdin ( ) ;
49
+ let buf_reader = stdin. lock ( ) ;
50
+ read_probes ( buf_reader) ?
51
+ }
52
+ } ;
35
53
36
- // Split agents
54
+ // Split the agents
37
55
let agents = agents. split ( ',' ) . collect :: < Vec < & str > > ( ) ;
38
56
57
+ // Produce Kafka messages
39
58
produce ( config, auth, agents, probes) . await ;
40
59
41
60
Ok ( ( ) )
0 commit comments