@@ -286,6 +286,13 @@ impl ReclusterTableInterpreter {
286
286
Ok ( false )
287
287
}
288
288
289
+ /// Builds physical plan for Hilbert clustering.
290
+ /// # Arguments
291
+ /// * `tbl` - Reference to the table being reclustered
292
+ /// * `push_downs` - Optional filter conditions to push down to storage
293
+ /// * `hilbert_info` - Cached Hilbert mapping information (built if None)
294
+ /// # Returns
295
+ /// * `Result<Option<PhysicalPlan>>` - The physical plan if reclustering is needed, None otherwise
289
296
async fn build_hilbert_plan (
290
297
& self ,
291
298
tbl : & Arc < dyn Table > ,
@@ -299,6 +306,7 @@ impl ReclusterTableInterpreter {
299
306
. do_hilbert_clustering ( tbl. clone ( ) , self . ctx . clone ( ) , push_downs. clone ( ) )
300
307
. await ?
301
308
else {
309
+ // No reclustering needed (e.g., table already optimally clustered)
302
310
return Ok ( None ) ;
303
311
} ;
304
312
@@ -311,38 +319,49 @@ impl ReclusterTableInterpreter {
311
319
let total_rows = recluster_info. removed_statistics . row_count as usize ;
312
320
let total_compressed = recluster_info. removed_statistics . compressed_byte_size as usize ;
313
321
322
+ // Determine rows per block based on data size and compression ratio
314
323
let rows_per_block =
315
324
block_thresholds. calc_rows_per_block ( total_bytes, total_rows, total_compressed) ;
325
+
326
+ // Calculate initial partition count based on data volume and block size
316
327
let mut total_partitions = std:: cmp:: max ( total_rows / rows_per_block, 1 ) ;
328
+
329
+ // Adjust number of partitions according to the block size thresholds
317
330
if total_partitions < block_thresholds. block_per_segment
318
331
&& block_thresholds. check_perfect_segment (
319
- block_thresholds. block_per_segment ,
332
+ block_thresholds. block_per_segment , // this effectively by-pass the total_blocks criteria
320
333
total_rows,
321
334
total_bytes,
322
335
total_compressed,
323
336
)
324
337
{
325
338
total_partitions = block_thresholds. block_per_segment ;
326
339
}
340
+
327
341
warn ! (
328
342
"Do hilbert recluster, total_bytes: {}, total_rows: {}, total_partitions: {}" ,
329
343
total_bytes, total_rows, total_partitions
330
344
) ;
331
345
346
+ // Create a subquery executor for running Hilbert mapping calculations
332
347
let subquery_executor = Arc :: new ( ServiceQueryExecutor :: new ( QueryContext :: create_from (
333
348
self . ctx . as_ref ( ) ,
334
349
) ) ) ;
350
+
335
351
let partitions = settings. get_hilbert_num_range_ids ( ) ? as usize ;
336
352
353
+ // Ensure Hilbert mapping information is built (if not already)
337
354
self . build_hilbert_info ( tbl, hilbert_info) . await ?;
338
355
let HilbertBuildInfo {
339
356
keys_bound,
340
357
index_bound,
341
358
query,
342
359
} = hilbert_info. as_ref ( ) . unwrap ( ) ;
343
360
361
+ // Variables will store the calculated bounds for Hilbert mapping
344
362
let mut variables = VecDeque :: new ( ) ;
345
363
364
+ // Execute the `kyes_bound` plan to calculate bounds for each clustering key
346
365
let keys_bounds = self
347
366
. execute_hilbert_plan (
348
367
& subquery_executor,
@@ -352,11 +371,15 @@ impl ReclusterTableInterpreter {
352
371
tbl,
353
372
)
354
373
. await ?;
374
+
375
+ // Store each clustering key's bounds in the variables collection
355
376
for entry in keys_bounds. columns ( ) . iter ( ) {
356
377
let v = entry. value . index ( 0 ) . unwrap ( ) . to_owned ( ) ;
357
378
variables. push_back ( v) ;
358
379
}
359
380
381
+ // Execute the `index_bound` plan to calculate the Hilbert index bounds
382
+ // i.e. `range_bound(..)(hilbert_range_index(..))`
360
383
let index_bounds = self
361
384
. execute_hilbert_plan (
362
385
& subquery_executor,
@@ -366,28 +389,38 @@ impl ReclusterTableInterpreter {
366
389
tbl,
367
390
)
368
391
. await ?;
392
+
393
+ // Add the Hilbert index bound to the front of variables
369
394
let val = index_bounds. value_at ( 0 , 0 ) . unwrap ( ) . to_owned ( ) ;
370
395
variables. push_front ( val) ;
371
396
372
- // reset the scan progress.
397
+ // Reset the scan progress to its original value
373
398
self . ctx . get_scan_progress ( ) . set ( & scan_progress_value) ;
399
+
374
400
let Plan :: Query {
375
401
s_expr,
376
402
metadata,
377
403
bind_context,
378
404
..
379
405
} = query
380
406
else {
381
- unreachable ! ( )
407
+ unreachable ! ( "Expected a Query plan, but got {:?}" , query . kind ( ) ) ;
382
408
} ;
409
+
410
+ // Replace placeholders in the expression
411
+ // `range_partition_id(hilbert_range_index(cluster_key, [$key_range_bound], ..), [$hilbert_index_range_bound])`
412
+ // with calculated constants.
383
413
let mut s_expr = replace_with_constant ( s_expr, & variables, total_partitions as u16 ) ;
414
+
384
415
if tbl. change_tracking_enabled ( ) {
385
416
s_expr = set_update_stream_columns ( & s_expr) ?;
386
417
}
418
+
387
419
metadata. write ( ) . replace_all_tables ( tbl. clone ( ) ) ;
388
420
let mut builder = PhysicalPlanBuilder :: new ( metadata. clone ( ) , self . ctx . clone ( ) , false ) ;
389
421
let mut plan = Box :: new ( builder. build ( & s_expr, bind_context. column_set ( ) ) . await ?) ;
390
422
423
+ // Check if the plan already has an exchange operator
391
424
let mut is_exchange = false ;
392
425
if let PhysicalPlan :: Exchange ( Exchange {
393
426
input,
@@ -399,16 +432,24 @@ impl ReclusterTableInterpreter {
399
432
plan = input. clone ( ) ;
400
433
}
401
434
435
+ // Determine if we need distributed execution
402
436
let cluster = self . ctx . get_cluster ( ) ;
403
437
let is_distributed = is_exchange || !cluster. is_empty ( ) ;
438
+
439
+ // For distributed execution, add an exchange operator to distribute work
404
440
if is_distributed {
441
+ // Create an expression for the partition column,
442
+ // i.e.`range_partition_id(hilbert_range_index({hilbert_keys_str}), [...]) AS _predicate`
405
443
let expr = scalar_expr_to_remote_expr (
406
444
& ScalarExpr :: BoundColumnRef ( BoundColumnRef {
407
445
span : None ,
408
446
column : bind_context. columns . last ( ) . unwrap ( ) . clone ( ) ,
409
447
} ) ,
410
448
plan. output_schema ( ) ?. as_ref ( ) ,
411
449
) ?;
450
+
451
+ // Add exchange operator for data distribution,
452
+ // shuffling data based on the hash of range partition IDs derived from the Hilbert index.
412
453
plan = Box :: new ( PhysicalPlan :: Exchange ( Exchange {
413
454
plan_id : 0 ,
414
455
input : plan,
@@ -422,13 +463,18 @@ impl ReclusterTableInterpreter {
422
463
let table_meta_timestamps = self
423
464
. ctx
424
465
. get_table_meta_timestamps ( tbl. as_ref ( ) , Some ( snapshot. clone ( ) ) ) ?;
466
+
467
+ // Create the Hilbert partition physical plan,
468
+ // collecting data into partitions and persist them
425
469
let plan = PhysicalPlan :: HilbertPartition ( Box :: new ( HilbertPartition {
426
470
plan_id : 0 ,
427
471
input : plan,
428
472
table_info : table_info. clone ( ) ,
429
473
num_partitions : total_partitions,
430
474
table_meta_timestamps,
431
475
} ) ) ;
476
+
477
+ // Finally, commit the newly clustered table
432
478
Ok ( Some ( Self :: add_commit_sink (
433
479
plan,
434
480
is_distributed,
0 commit comments