@@ -636,9 +636,18 @@ async fn search_partial_hits_phase_with_scroll(
636
636
/// metadata count.
637
637
///
638
638
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
639
- pub fn is_metadata_count_request ( request : & SearchRequest ) -> bool {
639
+ pub fn is_metadata_count_request (
640
+ request : & SearchRequest ,
641
+ split_start_timestamp : Option < i64 > ,
642
+ split_end_timestamp : Option < i64 > ,
643
+ ) -> bool {
640
644
let query_ast: QueryAst = serde_json:: from_str ( & request. query_ast ) . unwrap ( ) ;
641
- is_metadata_count_request_with_ast ( & query_ast, request)
645
+ is_metadata_count_request_with_ast (
646
+ & query_ast,
647
+ request,
648
+ split_start_timestamp,
649
+ split_end_timestamp,
650
+ )
642
651
}
643
652
644
653
/// Check if the request is a count request without any filters, so we can just return the split
@@ -647,42 +656,53 @@ pub fn is_metadata_count_request(request: &SearchRequest) -> bool {
647
656
/// This is done by exclusion, so we will need to keep it up to date if fields are added.
648
657
///
649
658
/// The passed query_ast should match the serialized on in request.
650
- pub fn is_metadata_count_request_with_ast ( query_ast : & QueryAst , request : & SearchRequest ) -> bool {
659
+ pub fn is_metadata_count_request_with_ast (
660
+ query_ast : & QueryAst ,
661
+ request : & SearchRequest ,
662
+ split_start_timestamp : Option < i64 > ,
663
+ split_end_timestamp : Option < i64 > ,
664
+ ) -> bool {
651
665
if query_ast != & QueryAst :: MatchAll {
652
666
return false ;
653
667
}
654
668
if request. max_hits != 0 {
655
669
return false ;
656
670
}
657
671
658
- // If the start and end timestamp encompass the whole split, it is still a count query.
659
- // We remove this currently on the leaf level, but not yet on the root level.
660
- // There's a small advantage when we would do this on the root level, since we have the
661
- // counts available on the split. On the leaf it is currently required to open the split
662
- // to get the count.
663
- if request . start_timestamp . is_some ( ) || request . end_timestamp . is_some ( ) {
664
- return false ;
672
+ if let Some ( request_start_timestamp ) = request . start_timestamp {
673
+ let Some ( split_start_timestamp ) = split_start_timestamp else {
674
+ return false ;
675
+ } ;
676
+ if split_start_timestamp < request_start_timestamp {
677
+ return false ;
678
+ }
665
679
}
680
+ if let Some ( request_end_timestamp) = request. end_timestamp {
681
+ let Some ( split_end_timestamp) = split_end_timestamp else {
682
+ return false ;
683
+ } ;
684
+ if split_end_timestamp >= request_end_timestamp {
685
+ return false ;
686
+ }
687
+ }
688
+
666
689
if request. aggregation_request . is_some ( ) || !request. snippet_fields . is_empty ( ) {
667
690
return false ;
668
691
}
669
692
true
670
693
}
671
694
672
695
/// Get a leaf search response that returns the num_docs of the split
673
- pub fn get_count_from_metadata ( split_metadatas : & [ SplitMetadata ] ) -> Vec < LeafSearchResponse > {
674
- split_metadatas
675
- . iter ( )
676
- . map ( |metadata| LeafSearchResponse {
677
- num_hits : metadata. num_docs as u64 ,
678
- partial_hits : Vec :: new ( ) ,
679
- failed_splits : Vec :: new ( ) ,
680
- num_attempted_splits : 1 ,
681
- num_successful_splits : 1 ,
682
- intermediate_aggregation_result : None ,
683
- resource_stats : None ,
684
- } )
685
- . collect ( )
696
+ pub fn get_count_from_metadata ( metadata : & SplitMetadata ) -> LeafSearchResponse {
697
+ LeafSearchResponse {
698
+ num_hits : metadata. num_docs as u64 ,
699
+ partial_hits : Vec :: new ( ) ,
700
+ failed_splits : Vec :: new ( ) ,
701
+ num_attempted_splits : 1 ,
702
+ num_successful_splits : 1 ,
703
+ intermediate_aggregation_result : None ,
704
+ resource_stats : None ,
705
+ }
686
706
}
687
707
688
708
/// Returns true if the query is particularly memory intensive.
@@ -730,26 +750,32 @@ pub(crate) async fn search_partial_hits_phase(
730
750
split_metadatas : & [ SplitMetadata ] ,
731
751
cluster_client : & ClusterClient ,
732
752
) -> crate :: Result < LeafSearchResponse > {
733
- let leaf_search_responses: Vec < LeafSearchResponse > =
734
- if is_metadata_count_request ( search_request) {
735
- get_count_from_metadata ( split_metadatas)
753
+ let mut leaf_search_responses: Vec < LeafSearchResponse > =
754
+ Vec :: with_capacity ( split_metadatas. len ( ) ) ;
755
+ let mut leaf_search_jobs = Vec :: new ( ) ;
756
+ for split in split_metadatas {
757
+ let start_time = split. time_range . as_ref ( ) . map ( |x| x. start ( ) ) . copied ( ) ;
758
+ let end_time = split. time_range . as_ref ( ) . map ( |x| x. end ( ) ) . copied ( ) ;
759
+ if is_metadata_count_request ( search_request, start_time, end_time) {
760
+ leaf_search_responses. push ( get_count_from_metadata ( split) ) ;
736
761
} else {
737
- let jobs: Vec < SearchJob > = split_metadatas. iter ( ) . map ( SearchJob :: from) . collect ( ) ;
738
- let assigned_leaf_search_jobs = cluster_client
739
- . search_job_placer
740
- . assign_jobs ( jobs, & HashSet :: default ( ) )
741
- . await ?;
742
- let mut leaf_request_tasks = Vec :: new ( ) ;
743
- for ( client, client_jobs) in assigned_leaf_search_jobs {
744
- let leaf_request = jobs_to_leaf_request (
745
- search_request,
746
- indexes_metas_for_leaf_search,
747
- client_jobs,
748
- ) ?;
749
- leaf_request_tasks. push ( cluster_client. leaf_search ( leaf_request, client. clone ( ) ) ) ;
750
- }
751
- try_join_all ( leaf_request_tasks) . await ?
752
- } ;
762
+ leaf_search_jobs. push ( SearchJob :: from ( split) ) ;
763
+ }
764
+ }
765
+
766
+ if !leaf_search_jobs. is_empty ( ) {
767
+ let assigned_leaf_search_jobs = cluster_client
768
+ . search_job_placer
769
+ . assign_jobs ( leaf_search_jobs, & HashSet :: default ( ) )
770
+ . await ?;
771
+ let mut leaf_request_tasks = Vec :: new ( ) ;
772
+ for ( client, client_jobs) in assigned_leaf_search_jobs {
773
+ let leaf_request =
774
+ jobs_to_leaf_request ( search_request, indexes_metas_for_leaf_search, client_jobs) ?;
775
+ leaf_request_tasks. push ( cluster_client. leaf_search ( leaf_request, client. clone ( ) ) ) ;
776
+ }
777
+ leaf_search_responses. extend ( try_join_all ( leaf_request_tasks) . await ?) ;
778
+ }
753
779
754
780
// Creates a collector which merges responses into one
755
781
let merge_collector =
0 commit comments