File tree Expand file tree Collapse file tree 1 file changed +15
-3
lines changed Expand file tree Collapse file tree 1 file changed +15
-3
lines changed Original file line number Diff line number Diff line change @@ -21,7 +21,12 @@ use arrow_array::{RecordBatch, TimestampMillisecondArray};
21
21
use arrow_ipc:: reader:: StreamReader ;
22
22
use arrow_schema:: Schema ;
23
23
use itertools:: kmerge_by;
24
- use std:: { fs:: File , io:: BufReader , path:: PathBuf , sync:: Arc } ;
24
+ use std:: {
25
+ fs:: { self , File } ,
26
+ io:: BufReader ,
27
+ path:: PathBuf ,
28
+ sync:: Arc ,
29
+ } ;
25
30
26
31
use super :: {
27
32
adapt_batch,
@@ -39,8 +44,15 @@ impl MergedRecordReader {
39
44
let mut readers = Vec :: with_capacity ( files. len ( ) ) ;
40
45
41
46
for file in files {
42
- let reader = StreamReader :: try_new ( File :: open ( file) . unwrap ( ) , None ) . map_err ( |_| ( ) ) ?;
43
- readers. push ( reader) ;
47
+ //remove empty files before reading
48
+ if file. metadata ( ) . unwrap ( ) . len ( ) == 0 {
49
+ log:: error!( "Invalid file detected, removing it: {:?}" , file) ;
50
+ fs:: remove_file ( file) . unwrap ( ) ;
51
+ } else {
52
+ let reader =
53
+ StreamReader :: try_new ( File :: open ( file) . unwrap ( ) , None ) . map_err ( |_| ( ) ) ?;
54
+ readers. push ( reader) ;
55
+ }
44
56
}
45
57
46
58
Ok ( Self { readers } )
You can’t perform that action at this time.
0 commit comments