1
+ // Licensed to the Apache Software Foundation (ASF) under one
2
+ // or more contributor license agreements. See the NOTICE file
3
+ // distributed with this work for additional information
4
+ // regarding copyright ownership. The ASF licenses this file
5
+ // to you under the Apache License, Version 2.0 (the
6
+ // "License"); you may not use this file except in compliance
7
+ // with the License. You may obtain a copy of the License at
8
+ //
9
+ // http://www.apache.org/licenses/LICENSE-2.0
10
+ //
11
+ // Unless required by applicable law or agreed to in writing,
12
+ // software distributed under the License is distributed on an
13
+ // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14
+ // KIND, either express or implied. See the License for the
15
+ // specific language governing permissions and limitations
16
+ // under the License.
17
+
1
18
use std:: collections:: HashMap ;
2
19
use std:: sync:: Arc ;
3
20
4
21
use futures:: channel:: mpsc;
5
22
use futures:: { StreamExt , TryStreamExt } ;
6
23
use tokio:: sync:: watch;
7
24
25
+ use crate :: runtime:: spawn;
8
26
use crate :: scan:: { DeleteFileContext , FileScanTaskDeleteFile } ;
9
27
use crate :: spec:: { DataContentType , DataFile , Struct } ;
10
28
use crate :: Result ;
@@ -29,7 +47,7 @@ impl DeleteFileIndex {
29
47
let ( tx, rx) = watch:: channel ( None ) ;
30
48
31
49
let delete_file_stream = receiver. boxed ( ) ;
32
- tokio :: spawn ( async move {
50
+ spawn ( async move {
33
51
let delete_files = delete_file_stream. try_collect :: < Vec < _ > > ( ) . await ;
34
52
let delete_file_index = delete_files. map ( DeleteFileIndex :: from_delete_files) ;
35
53
let delete_file_index = Arc :: new ( delete_file_index) ;
@@ -107,59 +125,36 @@ impl DeleteFileIndex {
107
125
data_file : & DataFile ,
108
126
seq_num : Option < i64 > ,
109
127
) -> Vec < FileScanTaskDeleteFile > {
110
- let mut results = vec ! [ ] ;
128
+ let mut deletes_queue = vec ! [ ] ;
111
129
112
130
if let Some ( deletes) = self . pos_deletes_by_path . get ( data_file. file_path ( ) ) {
113
- deletes
114
- . iter ( )
115
- . filter ( |& delete| {
116
- seq_num
117
- . map ( |seq_num| delete. manifest_entry . sequence_number ( ) > Some ( seq_num) )
118
- . unwrap_or_else ( || true )
119
- } )
120
- . for_each ( |delete| {
121
- results. push ( FileScanTaskDeleteFile {
122
- file_path : delete. manifest_entry . file_path ( ) . to_string ( ) ,
123
- file_type : delete. manifest_entry . content_type ( ) ,
124
- partition_spec_id : delete. partition_spec_id ,
125
- } )
126
- } ) ;
131
+ deletes_queue. extend ( deletes. iter ( ) ) ;
127
132
}
128
133
129
134
if let Some ( deletes) = self . pos_deletes_by_partition . get ( data_file. partition ( ) ) {
130
- deletes
131
- . iter ( )
132
- . filter ( |& delete| {
133
- seq_num
134
- . map ( |seq_num| delete. manifest_entry . sequence_number ( ) > Some ( seq_num) )
135
- . unwrap_or_else ( || true )
136
- } )
137
- . for_each ( |delete| {
138
- results. push ( FileScanTaskDeleteFile {
139
- file_path : delete. manifest_entry . file_path ( ) . to_string ( ) ,
140
- file_type : delete. manifest_entry . content_type ( ) ,
141
- partition_spec_id : delete. partition_spec_id ,
142
- } )
143
- } ) ;
135
+ deletes_queue. extend ( deletes. iter ( ) ) ;
144
136
}
145
137
146
138
if let Some ( deletes) = self . eq_deletes_by_partition . get ( data_file. partition ( ) ) {
147
- deletes
148
- . iter ( )
149
- . filter ( |& delete| {
150
- seq_num
151
- . map ( |seq_num| delete. manifest_entry . sequence_number ( ) > Some ( seq_num) )
152
- . unwrap_or_else ( || true )
153
- } )
154
- . for_each ( |delete| {
155
- results. push ( FileScanTaskDeleteFile {
156
- file_path : delete. manifest_entry . file_path ( ) . to_string ( ) ,
157
- file_type : delete. manifest_entry . content_type ( ) ,
158
- partition_spec_id : delete. partition_spec_id ,
159
- } )
160
- } ) ;
139
+ deletes_queue. extend ( deletes. iter ( ) ) ;
161
140
}
162
141
142
+ let mut results = vec ! [ ] ;
143
+ deletes_queue
144
+ . iter ( )
145
+ . filter ( |& delete| {
146
+ seq_num
147
+ . map ( |seq_num| delete. manifest_entry . sequence_number ( ) > Some ( seq_num) )
148
+ . unwrap_or_else ( || true )
149
+ } )
150
+ . for_each ( |delete| {
151
+ results. push ( FileScanTaskDeleteFile {
152
+ file_path : delete. manifest_entry . file_path ( ) . to_string ( ) ,
153
+ file_type : delete. manifest_entry . content_type ( ) ,
154
+ partition_spec_id : delete. partition_spec_id ,
155
+ } )
156
+ } ) ;
157
+
163
158
results
164
159
}
165
160
}
0 commit comments