16
16
*
17
17
*/
18
18
19
+ use crate :: catalog:: manifest:: File ;
19
20
use crate :: hottier:: HotTierManager ;
20
21
use crate :: Mode ;
21
22
use crate :: {
@@ -295,6 +296,7 @@ impl TableProvider for StandardTableProvider {
295
296
let mut memory_exec = None ;
296
297
let mut cache_exec = None ;
297
298
let mut hot_tier_exec = None ;
299
+ let mut listing_exec = None ;
298
300
let object_store = state
299
301
. runtime_env ( )
300
302
. object_store_registry
@@ -307,7 +309,7 @@ impl TableProvider for StandardTableProvider {
307
309
. await
308
310
. map_err ( |err| DataFusionError :: Plan ( err. to_string ( ) ) ) ?;
309
311
let time_partition = object_store_format. time_partition ;
310
- let time_filters = extract_primary_filter ( filters, time_partition. clone ( ) ) ;
312
+ let mut time_filters = extract_primary_filter ( filters, time_partition. clone ( ) ) ;
311
313
if time_filters. is_empty ( ) {
312
314
return Err ( DataFusionError :: Plan ( "potentially unbounded query on time range. Table scanning requires atleast one time bound" . to_string ( ) ) ) ;
313
315
}
@@ -350,21 +352,28 @@ impl TableProvider for StandardTableProvider {
350
352
}
351
353
352
354
// Is query timerange is overlapping with older data.
355
+ // if true, then get listing table time filters and execution plan separately
353
356
if is_overlapping_query ( & merged_snapshot. manifest_list , & time_filters) {
354
- return legacy_listing_table (
355
- self . stream . clone ( ) ,
356
- memory_exec,
357
- glob_storage,
358
- object_store,
359
- & time_filters,
360
- self . schema . clone ( ) ,
361
- state,
362
- projection,
363
- filters,
364
- limit,
365
- time_partition. clone ( ) ,
366
- )
367
- . await ;
357
+ let listing_time_fiters =
358
+ return_listing_time_filters ( & merged_snapshot. manifest_list , & mut time_filters) ;
359
+
360
+ listing_exec = if let Some ( listing_time_filter) = listing_time_fiters {
361
+ legacy_listing_table (
362
+ self . stream . clone ( ) ,
363
+ glob_storage. clone ( ) ,
364
+ object_store. clone ( ) ,
365
+ & listing_time_filter,
366
+ self . schema . clone ( ) ,
367
+ state,
368
+ projection,
369
+ filters,
370
+ limit,
371
+ time_partition. clone ( ) ,
372
+ )
373
+ . await ?
374
+ } else {
375
+ None
376
+ } ;
368
377
}
369
378
370
379
let mut manifest_files = collect_from_snapshot (
@@ -377,35 +386,19 @@ impl TableProvider for StandardTableProvider {
377
386
. await ?;
378
387
379
388
if manifest_files. is_empty ( ) {
380
- return final_plan ( vec ! [ memory_exec] , projection, self . schema . clone ( ) ) ;
389
+ return final_plan (
390
+ vec ! [ listing_exec, memory_exec] ,
391
+ projection,
392
+ self . schema . clone ( ) ,
393
+ ) ;
381
394
}
382
395
383
396
// Based on entries in the manifest files, find them in the cache and create a physical plan.
384
397
if let Some ( cache_manager) = LocalCacheManager :: global ( ) {
385
- let ( cached, remainder) = cache_manager
386
- . partition_on_cached ( & self . stream , manifest_files, |file| & file. file_path )
387
- . await
388
- . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?;
389
-
390
- // Assign remaining entries back to manifest list
391
- // This is to be used for remote query
392
- manifest_files = remainder;
393
-
394
- let cached = cached
395
- . into_iter ( )
396
- . map ( |( mut file, cache_path) | {
397
- let cache_path =
398
- object_store:: path:: Path :: from_absolute_path ( cache_path) . unwrap ( ) ;
399
- file. file_path = cache_path. to_string ( ) ;
400
- file
401
- } )
402
- . collect ( ) ;
403
-
404
- let ( partitioned_files, statistics) = partitioned_files ( cached, & self . schema , 1 ) ;
405
- let plan = create_parquet_physical_plan (
406
- ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
407
- partitioned_files,
408
- statistics,
398
+ cache_exec = get_cache_exectuion_plan (
399
+ cache_manager,
400
+ & self . stream ,
401
+ & mut manifest_files,
409
402
self . schema . clone ( ) ,
410
403
projection,
411
404
filters,
@@ -414,41 +407,15 @@ impl TableProvider for StandardTableProvider {
414
407
time_partition. clone ( ) ,
415
408
)
416
409
. await ?;
417
-
418
- cache_exec = Some ( plan)
419
410
}
420
411
421
412
// Hot tier data fetch
422
413
if let Some ( hot_tier_manager) = HotTierManager :: global ( ) {
423
414
if hot_tier_manager. check_stream_hot_tier_exists ( & self . stream ) {
424
- let ( hot_tier_files, remainder) = hot_tier_manager
425
- . get_hot_tier_manifest_files ( & self . stream , manifest_files)
426
- . await
427
- . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?;
428
- // Assign remaining entries back to manifest list
429
- // This is to be used for remote query
430
- manifest_files = remainder;
431
-
432
- let hot_tier_files = hot_tier_files
433
- . into_iter ( )
434
- . map ( |mut file| {
435
- let path = CONFIG
436
- . parseable
437
- . hot_tier_storage_path
438
- . as_ref ( )
439
- . unwrap ( )
440
- . join ( & file. file_path ) ;
441
- file. file_path = path. to_str ( ) . unwrap ( ) . to_string ( ) ;
442
- file
443
- } )
444
- . collect ( ) ;
445
-
446
- let ( partitioned_files, statistics) =
447
- partitioned_files ( hot_tier_files, & self . schema , 1 ) ;
448
- let plan = create_parquet_physical_plan (
449
- ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
450
- partitioned_files,
451
- statistics,
415
+ hot_tier_exec = get_hottier_exectuion_plan (
416
+ hot_tier_manager,
417
+ & self . stream ,
418
+ & mut manifest_files,
452
419
self . schema . clone ( ) ,
453
420
projection,
454
421
filters,
@@ -457,14 +424,12 @@ impl TableProvider for StandardTableProvider {
457
424
time_partition. clone ( ) ,
458
425
)
459
426
. await ?;
460
-
461
- hot_tier_exec = Some ( plan)
462
427
}
463
428
}
464
429
if manifest_files. is_empty ( ) {
465
430
QUERY_CACHE_HIT . with_label_values ( & [ & self . stream ] ) . inc ( ) ;
466
431
return final_plan (
467
- vec ! [ memory_exec, cache_exec, hot_tier_exec] ,
432
+ vec ! [ listing_exec , memory_exec, cache_exec, hot_tier_exec] ,
468
433
projection,
469
434
self . schema . clone ( ) ,
470
435
) ;
@@ -485,7 +450,13 @@ impl TableProvider for StandardTableProvider {
485
450
. await ?;
486
451
487
452
Ok ( final_plan (
488
- vec ! [ memory_exec, cache_exec, hot_tier_exec, Some ( remote_exec) ] ,
453
+ vec ! [
454
+ listing_exec,
455
+ memory_exec,
456
+ cache_exec,
457
+ hot_tier_exec,
458
+ Some ( remote_exec) ,
459
+ ] ,
489
460
projection,
490
461
self . schema . clone ( ) ,
491
462
) ?)
@@ -516,10 +487,109 @@ impl TableProvider for StandardTableProvider {
516
487
}
517
488
}
518
489
490
+ #[ allow( clippy:: too_many_arguments) ]
491
+ async fn get_cache_exectuion_plan (
492
+ cache_manager : & LocalCacheManager ,
493
+ stream : & str ,
494
+ manifest_files : & mut Vec < File > ,
495
+ schema : Arc < Schema > ,
496
+ projection : Option < & Vec < usize > > ,
497
+ filters : & [ Expr ] ,
498
+ limit : Option < usize > ,
499
+ state : & SessionState ,
500
+ time_partition : Option < String > ,
501
+ ) -> Result < Option < Arc < dyn ExecutionPlan > > , DataFusionError > {
502
+ let ( cached, remainder) = cache_manager
503
+ . partition_on_cached ( stream, manifest_files. clone ( ) , |file : & File | {
504
+ & file. file_path
505
+ } )
506
+ . await
507
+ . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?;
508
+
509
+ // Assign remaining entries back to manifest list
510
+ // This is to be used for remote query
511
+ * manifest_files = remainder;
512
+
513
+ let cached = cached
514
+ . into_iter ( )
515
+ . map ( |( mut file, cache_path) | {
516
+ let cache_path = object_store:: path:: Path :: from_absolute_path ( cache_path) . unwrap ( ) ;
517
+ file. file_path = cache_path. to_string ( ) ;
518
+ file
519
+ } )
520
+ . collect ( ) ;
521
+
522
+ let ( partitioned_files, statistics) = partitioned_files ( cached, & schema, 1 ) ;
523
+ let plan = create_parquet_physical_plan (
524
+ ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
525
+ partitioned_files,
526
+ statistics,
527
+ schema. clone ( ) ,
528
+ projection,
529
+ filters,
530
+ limit,
531
+ state,
532
+ time_partition. clone ( ) ,
533
+ )
534
+ . await ?;
535
+
536
+ Ok ( Some ( plan) )
537
+ }
538
+
539
+ #[ allow( clippy:: too_many_arguments) ]
540
+ async fn get_hottier_exectuion_plan (
541
+ hot_tier_manager : & HotTierManager ,
542
+ stream : & str ,
543
+ manifest_files : & mut Vec < File > ,
544
+ schema : Arc < Schema > ,
545
+ projection : Option < & Vec < usize > > ,
546
+ filters : & [ Expr ] ,
547
+ limit : Option < usize > ,
548
+ state : & SessionState ,
549
+ time_partition : Option < String > ,
550
+ ) -> Result < Option < Arc < dyn ExecutionPlan > > , DataFusionError > {
551
+ let ( hot_tier_files, remainder) = hot_tier_manager
552
+ . get_hot_tier_manifest_files ( stream, manifest_files. clone ( ) )
553
+ . await
554
+ . map_err ( |err| DataFusionError :: External ( Box :: new ( err) ) ) ?;
555
+ // Assign remaining entries back to manifest list
556
+ // This is to be used for remote query
557
+ * manifest_files = remainder;
558
+
559
+ let hot_tier_files = hot_tier_files
560
+ . into_iter ( )
561
+ . map ( |mut file| {
562
+ let path = CONFIG
563
+ . parseable
564
+ . hot_tier_storage_path
565
+ . as_ref ( )
566
+ . unwrap ( )
567
+ . join ( & file. file_path ) ;
568
+ file. file_path = path. to_str ( ) . unwrap ( ) . to_string ( ) ;
569
+ file
570
+ } )
571
+ . collect ( ) ;
572
+
573
+ let ( partitioned_files, statistics) = partitioned_files ( hot_tier_files, & schema, 1 ) ;
574
+ let plan = create_parquet_physical_plan (
575
+ ObjectStoreUrl :: parse ( "file:///" ) . unwrap ( ) ,
576
+ partitioned_files,
577
+ statistics,
578
+ schema. clone ( ) ,
579
+ projection,
580
+ filters,
581
+ limit,
582
+ state,
583
+ time_partition. clone ( ) ,
584
+ )
585
+ . await ?;
586
+
587
+ Ok ( Some ( plan) )
588
+ }
589
+
519
590
#[ allow( clippy:: too_many_arguments) ]
520
591
async fn legacy_listing_table (
521
592
stream : String ,
522
- mem_exec : Option < Arc < dyn ExecutionPlan > > ,
523
593
glob_storage : Arc < dyn ObjectStorage + Send > ,
524
594
object_store : Arc < dyn ObjectStore > ,
525
595
time_filters : & [ PartialTimeFilter ] ,
@@ -529,7 +599,7 @@ async fn legacy_listing_table(
529
599
filters : & [ Expr ] ,
530
600
limit : Option < usize > ,
531
601
time_partition : Option < String > ,
532
- ) -> Result < Arc < dyn ExecutionPlan > , DataFusionError > {
602
+ ) -> Result < Option < Arc < dyn ExecutionPlan > > , DataFusionError > {
533
603
let remote_table = ListingTableBuilder :: new ( stream)
534
604
. populate_via_listing ( glob_storage. clone ( ) , object_store, time_filters)
535
605
. and_then ( |builder| async {
@@ -546,7 +616,7 @@ async fn legacy_listing_table(
546
616
} )
547
617
. await ?;
548
618
549
- final_plan ( vec ! [ mem_exec , remote_table] , projection , schema )
619
+ Ok ( remote_table)
550
620
}
551
621
552
622
fn final_plan (
@@ -581,7 +651,7 @@ fn reversed_mem_table(
581
651
MemTable :: try_new ( schema, vec ! [ records] )
582
652
}
583
653
584
- #[ derive( Debug ) ]
654
+ #[ derive( Debug , Clone ) ]
585
655
pub enum PartialTimeFilter {
586
656
Low ( Bound < NaiveDateTime > ) ,
587
657
High ( Bound < NaiveDateTime > ) ,
@@ -662,6 +732,64 @@ fn is_overlapping_query(
662
732
false
663
733
}
664
734
735
+ /// This function will accept time filters provided to the query and will split them
736
+ /// into listing time filters and manifest time filters
737
+ /// This makes parseable backwards compatible for when it did not have manifests
738
+ /// Logic-
739
+ /// The control flow will only come to this function if there exists data without manifest files
740
+ /// Two new time filter vec![] are created
741
+ /// For listing table time filters, we will use OG time filter low bound and either OG time filter upper bound
742
+ /// or manifest lower bound
743
+ /// For manifest time filter, we will manifest lower bound and OG upper bound
744
+ fn return_listing_time_filters (
745
+ manifest_list : & [ ManifestItem ] ,
746
+ time_filters : & mut Vec < PartialTimeFilter > ,
747
+ ) -> Option < Vec < PartialTimeFilter > > {
748
+ // vec to hold timestamps for listing
749
+ let mut vec_listing_timestamps = Vec :: new ( ) ;
750
+
751
+ let mut first_entry_lower_bound = manifest_list
752
+ . iter ( )
753
+ . map ( |file| file. time_lower_bound . naive_utc ( ) )
754
+ . min ( ) ?;
755
+
756
+ let mut new_time_filters = vec ! [ PartialTimeFilter :: Low ( Bound :: Included (
757
+ first_entry_lower_bound,
758
+ ) ) ] ;
759
+
760
+ time_filters. iter_mut ( ) . for_each ( |filter| {
761
+ match filter {
762
+ // since we've already determined that there is a need to list tables,
763
+ // we just need to check whether the filter's upper bound is < manifest lower bound
764
+ PartialTimeFilter :: High ( Bound :: Included ( upper) )
765
+ | PartialTimeFilter :: High ( Bound :: Excluded ( upper) ) => {
766
+ if upper. lt ( & & mut first_entry_lower_bound) {
767
+ // filter upper bound is less than manifest lower bound, continue using filter upper bound
768
+ vec_listing_timestamps. push ( filter. clone ( ) ) ;
769
+ } else {
770
+ // use manifest lower bound as excluded
771
+ vec_listing_timestamps. push ( PartialTimeFilter :: High ( Bound :: Excluded (
772
+ first_entry_lower_bound,
773
+ ) ) ) ;
774
+ }
775
+ new_time_filters. push ( filter. clone ( ) ) ;
776
+ }
777
+ _ => {
778
+ vec_listing_timestamps. push ( filter. clone ( ) ) ;
779
+ }
780
+ }
781
+ } ) ;
782
+
783
+ // update time_filters
784
+ * time_filters = new_time_filters;
785
+
786
+ if vec_listing_timestamps. len ( ) . gt ( & 0 ) {
787
+ Some ( vec_listing_timestamps)
788
+ } else {
789
+ None
790
+ }
791
+ }
792
+
665
793
pub fn include_now ( filters : & [ Expr ] , time_partition : Option < String > ) -> bool {
666
794
let current_minute = Utc :: now ( )
667
795
. with_second ( 0 )
0 commit comments