Skip to content

Commit 2e36400

Browse files
authored
Record search metrics on cancelation (#5743)
* Record search metrics on cancelation * Refactor to clarify leaf search levels * Improve root search cancellation states * Replace leaf search cancelation tracking task with future * Replace root search cancelation traking task with future * Refactor metrics trackers to their own file
1 parent bbcbdf7 commit 2e36400

File tree

10 files changed

+255
-139
lines changed

10 files changed

+255
-139
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-search/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ http = { workspace = true }
2222
itertools = { workspace = true }
2323
mockall = { workspace = true }
2424
once_cell = { workspace = true }
25+
pin-project = { workspace = true }
2526
postcard = { workspace = true }
2627
prost = { workspace = true }
2728
rayon = { workspace = true }

quickwit/quickwit-search/src/leaf.rs

Lines changed: 24 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ use tokio::task::JoinError;
4545
use tracing::*;
4646

4747
use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
48-
use crate::metrics::SEARCH_METRICS;
4948
use crate::root::is_metadata_count_request_with_ast;
5049
use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation};
5150
use crate::service::{SearcherContext, deserialize_doc_mapper};
@@ -1175,9 +1174,10 @@ impl CanSplitDoBetter {
11751174
}
11761175
}
11771176

1178-
/// `multi_leaf_search` searches multiple indices and multiple splits.
1177+
/// Searches multiple splits, potentially in multiple indexes, sitting on different storages and
1178+
/// having different doc mappings.
11791179
#[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))]
1180-
pub async fn multi_leaf_search(
1180+
pub async fn multi_index_leaf_search(
11811181
searcher_context: Arc<SearcherContext>,
11821182
leaf_search_request: LeafSearchRequest,
11831183
storage_resolver: &StorageResolver,
@@ -1225,18 +1225,25 @@ pub async fn multi_leaf_search(
12251225
})?
12261226
.clone();
12271227

1228-
let leaf_request_future = tokio::spawn(
1229-
resolve_storage_and_leaf_search(
1230-
searcher_context.clone(),
1231-
search_request.clone(),
1232-
index_uri,
1233-
storage_resolver.clone(),
1234-
leaf_search_request_ref.split_offsets,
1235-
doc_mapper,
1236-
aggregation_limits.clone(),
1237-
)
1238-
.in_current_span(),
1239-
);
1228+
let leaf_request_future = tokio::spawn({
1229+
let storage_resolver = storage_resolver.clone();
1230+
let searcher_context = searcher_context.clone();
1231+
let search_request = search_request.clone();
1232+
let aggregation_limits = aggregation_limits.clone();
1233+
async move {
1234+
let storage = storage_resolver.resolve(&index_uri).await?;
1235+
single_doc_mapping_leaf_search(
1236+
searcher_context,
1237+
search_request,
1238+
storage,
1239+
leaf_search_request_ref.split_offsets,
1240+
doc_mapper,
1241+
aggregation_limits,
1242+
)
1243+
.await
1244+
}
1245+
.in_current_span()
1246+
});
12401247
leaf_request_tasks.push(leaf_request_future);
12411248
}
12421249

@@ -1269,29 +1276,6 @@ pub async fn multi_leaf_search(
12691276
.context("failed to merge split search responses")?
12701277
}
12711278

1272-
/// Resolves storage and calls leaf_search
1273-
#[allow(clippy::too_many_arguments)]
1274-
async fn resolve_storage_and_leaf_search(
1275-
searcher_context: Arc<SearcherContext>,
1276-
search_request: Arc<SearchRequest>,
1277-
index_uri: quickwit_common::uri::Uri,
1278-
storage_resolver: StorageResolver,
1279-
splits: Vec<SplitIdAndFooterOffsets>,
1280-
doc_mapper: Arc<DocMapper>,
1281-
aggregations_limits: AggregationLimitsGuard,
1282-
) -> crate::Result<LeafSearchResponse> {
1283-
let storage = storage_resolver.resolve(&index_uri).await?;
1284-
leaf_search(
1285-
searcher_context.clone(),
1286-
search_request.clone(),
1287-
storage.clone(),
1288-
splits,
1289-
doc_mapper,
1290-
aggregations_limits,
1291-
)
1292-
.await
1293-
}
1294-
12951279
/// Optimizes the search_request based on CanSplitDoBetter
12961280
/// Returns true if the split can return better results
12971281
fn check_optimize_search_request(
@@ -1315,14 +1299,14 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) {
13151299
search_request.sort_fields.clear();
13161300
}
13171301

1318-
/// `leaf` step of search.
1302+
/// Searches multiple splits for a specific index and a single doc mapping
13191303
///
13201304
/// The leaf search collects all kind of information, and returns a set of
13211305
/// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in
13221306
/// charge to consolidate, identify the actual final top hits to display, and
13231307
/// fetch the actual documents to convert the partial hits into actual Hits.
13241308
#[instrument(skip_all, fields(index = ?request.index_id_patterns))]
1325-
pub async fn leaf_search(
1309+
pub async fn single_doc_mapping_leaf_search(
13261310
searcher_context: Arc<SearcherContext>,
13271311
request: Arc<SearchRequest>,
13281312
index_storage: Arc<dyn Storage>,
@@ -1444,15 +1428,6 @@ pub async fn leaf_search(
14441428
.await
14451429
.context("failed to merge split search responses");
14461430

1447-
let label_values = match leaf_search_response_reresult {
1448-
Ok(Ok(_)) => ["success"],
1449-
_ => ["error"],
1450-
};
1451-
SEARCH_METRICS
1452-
.leaf_search_targeted_splits
1453-
.with_label_values(label_values)
1454-
.observe(num_splits as f64);
1455-
14561431
Ok(leaf_search_response_reresult??)
14571432
}
14581433

quickwit/quickwit-search/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ mod leaf_cache;
2929
mod list_fields;
3030
mod list_fields_cache;
3131
mod list_terms;
32+
mod metrics_trackers;
3233
mod retry;
3334
mod root;
3435
mod scroll_context;

quickwit/quickwit-search/src/list_fields.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ pub struct IndexMetasForLeafSearch {
279279
}
280280

281281
/// Performs a distributed list fields request.
282-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
282+
/// 1. Sends leaf requests over gRPC to multiple leaf nodes.
283283
/// 2. Merges the search results.
284284
/// 3. Builds the response and returns.
285285
pub async fn root_list_fields(

quickwit/quickwit-search/src/list_terms.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use crate::search_permit_provider::compute_initial_memory_allocation;
3939
use crate::{ClusterClient, SearchError, SearchJob, SearcherContext, resolve_index_patterns};
4040

4141
/// Performs a distributed list terms.
42-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
42+
/// 1. Sends leaf requests over gRPC to multiple leaf nodes.
4343
/// 2. Merges the search results.
4444
/// 3. Builds the response and returns.
4545
/// this is much simpler than `root_search` as it doesn't need to get actual docs.
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
// Copyright 2021-Present Datadog, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// See https://prometheus.io/docs/practices/naming/
16+
17+
use std::pin::Pin;
18+
use std::task::{Context, Poll, ready};
19+
use std::time::Instant;
20+
21+
use pin_project::{pin_project, pinned_drop};
22+
use quickwit_proto::search::LeafSearchResponse;
23+
24+
use crate::SearchError;
25+
use crate::metrics::SEARCH_METRICS;
26+
27+
// root
28+
29+
pub enum RootSearchMetricsStep {
30+
Plan,
31+
Exec { num_targeted_splits: usize },
32+
}
33+
34+
/// Wrapper around the plan and search futures to track metrics.
35+
#[pin_project(PinnedDrop)]
36+
pub struct RootSearchMetricsFuture<F> {
37+
#[pin]
38+
pub tracked: F,
39+
pub start: Instant,
40+
pub step: RootSearchMetricsStep,
41+
pub is_success: Option<bool>,
42+
}
43+
44+
#[pinned_drop]
45+
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
46+
fn drop(self: Pin<&mut Self>) {
47+
let (num_targeted_splits, status) = match (&self.step, self.is_success) {
48+
// is is a partial success, actual success is recorded during the search step
49+
(RootSearchMetricsStep::Plan, Some(true)) => return,
50+
(RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"),
51+
(RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"),
52+
(
53+
RootSearchMetricsStep::Exec {
54+
num_targeted_splits,
55+
},
56+
Some(true),
57+
) => (*num_targeted_splits, "success"),
58+
(
59+
RootSearchMetricsStep::Exec {
60+
num_targeted_splits,
61+
},
62+
Some(false),
63+
) => (*num_targeted_splits, "error"),
64+
(
65+
RootSearchMetricsStep::Exec {
66+
num_targeted_splits,
67+
},
68+
None,
69+
) => (*num_targeted_splits, "cancelled"),
70+
};
71+
72+
let label_values = [status];
73+
SEARCH_METRICS
74+
.root_search_requests_total
75+
.with_label_values(label_values)
76+
.inc();
77+
SEARCH_METRICS
78+
.root_search_request_duration_seconds
79+
.with_label_values(label_values)
80+
.observe(self.start.elapsed().as_secs_f64());
81+
SEARCH_METRICS
82+
.root_search_targeted_splits
83+
.with_label_values(label_values)
84+
.observe(num_targeted_splits as f64);
85+
}
86+
}
87+
88+
impl<F, R, E> Future for RootSearchMetricsFuture<F>
89+
where F: Future<Output = Result<R, E>>
90+
{
91+
type Output = Result<R, E>;
92+
93+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
94+
let this = self.project();
95+
let response = ready!(this.tracked.poll(cx));
96+
*this.is_success = Some(response.is_ok());
97+
Poll::Ready(Ok(response?))
98+
}
99+
}
100+
101+
// leaf
102+
103+
/// Wrapper around the search future to track metrics.
104+
#[pin_project(PinnedDrop)]
105+
pub struct LeafSearchMetricsFuture<F>
106+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
107+
{
108+
#[pin]
109+
pub tracked: F,
110+
pub start: Instant,
111+
pub targeted_splits: usize,
112+
pub status: Option<&'static str>,
113+
}
114+
115+
#[pinned_drop]
116+
impl<F> PinnedDrop for LeafSearchMetricsFuture<F>
117+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
118+
{
119+
fn drop(self: Pin<&mut Self>) {
120+
let label_values = [self.status.unwrap_or("cancelled")];
121+
SEARCH_METRICS
122+
.leaf_search_requests_total
123+
.with_label_values(label_values)
124+
.inc();
125+
SEARCH_METRICS
126+
.leaf_search_request_duration_seconds
127+
.with_label_values(label_values)
128+
.observe(self.start.elapsed().as_secs_f64());
129+
SEARCH_METRICS
130+
.leaf_search_targeted_splits
131+
.with_label_values(label_values)
132+
.observe(self.targeted_splits as f64);
133+
}
134+
}
135+
136+
impl<F> Future for LeafSearchMetricsFuture<F>
137+
where F: Future<Output = Result<LeafSearchResponse, SearchError>>
138+
{
139+
type Output = Result<LeafSearchResponse, SearchError>;
140+
141+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
142+
let this = self.project();
143+
let response = ready!(this.tracked.poll(cx));
144+
*this.status = if response.is_ok() {
145+
Some("success")
146+
} else {
147+
Some("error")
148+
};
149+
Poll::Ready(Ok(response?))
150+
}
151+
}

0 commit comments

Comments
 (0)