12
12
// See the License for the specific language governing permissions and
13
13
// limitations under the License.
14
14
15
+ use std:: collections:: HashSet ;
15
16
use std:: sync:: Arc ;
16
17
18
+ use databend_common_catalog:: plan:: split_row_id;
17
19
use databend_common_catalog:: plan:: DataSourcePlan ;
18
20
use databend_common_catalog:: plan:: Projection ;
19
21
use databend_common_catalog:: table_context:: TableContext ;
@@ -66,6 +68,7 @@ pub fn row_fetch_processor(
66
68
input,
67
69
output,
68
70
row_id_col_offset,
71
+ max_threads,
69
72
NativeRowsFetcher :: < true > :: create (
70
73
fuse_table. clone ( ) ,
71
74
projection. clone ( ) ,
@@ -79,6 +82,7 @@ pub fn row_fetch_processor(
79
82
input,
80
83
output,
81
84
row_id_col_offset,
85
+ max_threads,
82
86
NativeRowsFetcher :: < false > :: create (
83
87
fuse_table. clone ( ) ,
84
88
projection. clone ( ) ,
@@ -97,6 +101,7 @@ pub fn row_fetch_processor(
97
101
input,
98
102
output,
99
103
row_id_col_offset,
104
+ max_threads,
100
105
ParquetRowsFetcher :: < true > :: create (
101
106
fuse_table. clone ( ) ,
102
107
projection. clone ( ) ,
@@ -111,6 +116,7 @@ pub fn row_fetch_processor(
111
116
input,
112
117
output,
113
118
row_id_col_offset,
119
+ max_threads,
114
120
ParquetRowsFetcher :: < false > :: create (
115
121
fuse_table. clone ( ) ,
116
122
projection. clone ( ) ,
@@ -130,13 +136,17 @@ pub fn row_fetch_processor(
130
136
pub trait RowsFetcher {
131
137
async fn on_start ( & mut self ) -> Result < ( ) > ;
132
138
async fn fetch ( & mut self , row_ids : & [ u64 ] ) -> Result < DataBlock > ;
139
+ fn clear_cache ( & mut self ) ;
133
140
}
134
141
135
142
pub struct TransformRowsFetcher < F : RowsFetcher > {
136
143
row_id_col_offset : usize ,
144
+ max_threads : usize ,
137
145
fetcher : F ,
138
146
need_wrap_nullable : bool ,
139
147
blocks : Vec < DataBlock > ,
148
+ row_ids : Vec < u64 > ,
149
+ distinct_block_ids : HashSet < u64 > ,
140
150
}
141
151
142
152
#[ async_trait:: async_trait]
@@ -151,26 +161,7 @@ where F: RowsFetcher + Send + Sync + 'static
151
161
}
152
162
153
163
async fn transform ( & mut self , data : DataBlock ) -> Result < Option < DataBlock > > {
154
- self . blocks . push ( data) ;
155
- Ok ( None )
156
- }
157
-
158
- #[ async_backtrace:: framed]
159
- async fn on_finish ( & mut self , _output : bool ) -> Result < Option < DataBlock > > {
160
- if self . blocks . is_empty ( ) {
161
- return Ok ( None ) ;
162
- }
163
-
164
- let start_time = std:: time:: Instant :: now ( ) ;
165
- let num_blocks = self . blocks . len ( ) ;
166
- let mut data = DataBlock :: concat ( & self . blocks ) ?;
167
- self . blocks . clear ( ) ;
168
-
169
164
let num_rows = data. num_rows ( ) ;
170
- if num_rows == 0 {
171
- return Ok ( None ) ;
172
- }
173
-
174
165
let entry = & data. columns ( ) [ self . row_id_col_offset ] ;
175
166
let value = entry
176
167
. value
@@ -184,24 +175,43 @@ where F: RowsFetcher + Send + Sync + 'static
184
175
value. column . into_number ( ) . unwrap ( ) . into_u_int64 ( ) . unwrap ( )
185
176
} ;
186
177
187
- let fetched_block = self . fetcher . fetch ( & row_id_column) . await ?;
178
+ // Process the row id column in block batch
179
+ // Ensure that the same block would be processed in the same batch and threads
180
+ let mut consumed_len = num_rows;
181
+ for ( idx, row_id) in row_id_column. iter ( ) . enumerate ( ) {
182
+ let ( prefix, _) = split_row_id ( * row_id) ;
188
183
189
- for col in fetched_block. columns ( ) . iter ( ) {
190
- if self . need_wrap_nullable {
191
- data. add_column ( wrap_true_validity ( col, num_rows) ) ;
192
- } else {
193
- data. add_column ( col. clone ( ) ) ;
184
+ // Which means we are full now, new prefix will be processed in next batch
185
+ if self . distinct_block_ids . len ( ) >= self . max_threads * 2
186
+ && !self . distinct_block_ids . contains ( & prefix)
187
+ {
188
+ consumed_len = idx;
189
+ break ;
194
190
}
191
+ self . distinct_block_ids . insert ( prefix) ;
195
192
}
196
193
197
- log:: info!(
198
- "TransformRowsFetcher on_finish: num_rows: {}, input blocks: {} in {} milliseconds" ,
199
- num_rows,
200
- num_blocks,
201
- start_time. elapsed( ) . as_millis( )
202
- ) ;
194
+ self . row_ids
195
+ . extend_from_slice ( & row_id_column. as_slice ( ) [ 0 ..consumed_len] ) ;
196
+ self . blocks . push ( data. slice ( 0 ..consumed_len) ) ;
203
197
204
- Ok ( Some ( data) )
198
+ if consumed_len < num_rows {
199
+ let block = self . flush ( ) . await ;
200
+ for row_id in row_id_column. as_slice ( ) [ consumed_len..num_rows] . iter ( ) {
201
+ let ( prefix, _) = split_row_id ( * row_id) ;
202
+ self . distinct_block_ids . insert ( prefix) ;
203
+ self . row_ids . push ( * row_id) ;
204
+ }
205
+ self . blocks . push ( data. slice ( consumed_len..num_rows) ) ;
206
+ block
207
+ } else {
208
+ Ok ( None )
209
+ }
210
+ }
211
+
212
+ #[ async_backtrace:: framed]
213
+ async fn on_finish ( & mut self , _output : bool ) -> Result < Option < DataBlock > > {
214
+ self . flush ( ) . await
205
215
}
206
216
}
207
217
@@ -212,16 +222,59 @@ where F: RowsFetcher + Send + Sync + 'static
212
222
input : Arc < InputPort > ,
213
223
output : Arc < OutputPort > ,
214
224
row_id_col_offset : usize ,
225
+ max_threads : usize ,
215
226
fetcher : F ,
216
227
need_wrap_nullable : bool ,
217
228
) -> ProcessorPtr {
218
229
ProcessorPtr :: create ( AsyncAccumulatingTransformer :: create ( input, output, Self {
219
230
row_id_col_offset,
231
+ max_threads,
220
232
fetcher,
221
233
need_wrap_nullable,
222
234
blocks : vec ! [ ] ,
235
+ row_ids : vec ! [ ] ,
236
+ distinct_block_ids : HashSet :: new ( ) ,
223
237
} ) )
224
238
}
239
+
240
+ async fn flush ( & mut self ) -> Result < Option < DataBlock > > {
241
+ let blocks = std:: mem:: take ( & mut self . blocks ) ;
242
+ if blocks. is_empty ( ) {
243
+ return Ok ( None ) ;
244
+ }
245
+
246
+ let start_time = std:: time:: Instant :: now ( ) ;
247
+ let num_blocks = blocks. len ( ) ;
248
+ let mut data = DataBlock :: concat ( & blocks) ?;
249
+ let num_rows = data. num_rows ( ) ;
250
+ if num_rows == 0 {
251
+ return Ok ( None ) ;
252
+ }
253
+
254
+ let row_ids = std:: mem:: take ( & mut self . row_ids ) ;
255
+ self . distinct_block_ids . clear ( ) ;
256
+ let fetched_block = self . fetcher . fetch ( & row_ids) . await ?;
257
+ // Clear cache after fetch, the block will never be fetched in following batches
258
+ // We ensure it in transform method
259
+ self . fetcher . clear_cache ( ) ;
260
+
261
+ for col in fetched_block. columns ( ) . iter ( ) {
262
+ if self . need_wrap_nullable {
263
+ data. add_column ( wrap_true_validity ( col, num_rows) ) ;
264
+ } else {
265
+ data. add_column ( col. clone ( ) ) ;
266
+ }
267
+ }
268
+
269
+ log:: info!(
270
+ "TransformRowsFetcher flush: num_rows: {}, input blocks: {} in {} milliseconds" ,
271
+ num_rows,
272
+ num_blocks,
273
+ start_time. elapsed( ) . as_millis( )
274
+ ) ;
275
+
276
+ Ok ( Some ( data) )
277
+ }
225
278
}
226
279
227
280
fn wrap_true_validity ( column : & BlockEntry , num_rows : usize ) -> BlockEntry {
0 commit comments