Skip to content

Commit b045483

Browse files
authored
Prevents single split searches from different leaf_search from interleaving (#5509)
* Prevents single split searches from different leaf_search from interleaving. Before this PR, we just used a semaphore to acquire a permit and start a new tokio task to run our single split search. In pseudo code, a leaf_search would look like: ``` for split in splits { let permit = semaphore.acquire().await; tokio::spawn(async move { single_split_search(split); drop(permit) }) } ``` The problem with this is that it allows interleaving split search from one search request with another one. This interleaving strongly impacts search latency. For instance, one can imagine two queries A and B with 3 splits each. Executing as follows | A | A | A | B | B | B | offers a much short latency for A than | A | B | B | A | B | A | This PR also adds two metrics to monitor the number of queue single split search. * CR comments
1 parent 8572903 commit b045483

File tree

8 files changed

+261
-20
lines changed

8 files changed

+261
-20
lines changed

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-search/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ quickwit-storage = { workspace = true }
5050
[dev-dependencies]
5151
assert-json-diff = { workspace = true }
5252
proptest = { workspace = true }
53+
rand = { workspace = true }
5354
serde_json = { workspace = true }
5455

5556
quickwit-indexing = { workspace = true, features = ["testsuite"] }

quickwit/quickwit-search/src/leaf.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use tracing::*;
5050
use crate::collector::{make_collector_for_split, make_merge_collector, IncrementalCollector};
5151
use crate::metrics::SEARCH_METRICS;
5252
use crate::root::is_metadata_count_request_with_ast;
53+
use crate::search_permit_provider::SearchPermit;
5354
use crate::service::{deserialize_doc_mapper, SearcherContext};
5455
use crate::{QuickwitAggregations, SearchError};
5556

@@ -1183,7 +1184,6 @@ async fn resolve_storage_and_leaf_search(
11831184
aggregations_limits: AggregationLimitsGuard,
11841185
) -> crate::Result<LeafSearchResponse> {
11851186
let storage = storage_resolver.resolve(&index_uri).await?;
1186-
11871187
leaf_search(
11881188
searcher_context.clone(),
11891189
search_request.clone(),
@@ -1259,10 +1259,16 @@ pub async fn leaf_search(
12591259
let incremental_merge_collector = IncrementalCollector::new(merge_collector);
12601260
let incremental_merge_collector = Arc::new(Mutex::new(incremental_merge_collector));
12611261

1262-
for (split, mut request) in split_with_req {
1263-
let leaf_split_search_permit = searcher_context.leaf_search_split_semaphore
1264-
.clone()
1265-
.acquire_owned()
1262+
// We acquire all of the leaf search permits to make sure our single split search tasks
1263+
// do no interleave with other leaf search requests.
1264+
let permit_futures = searcher_context
1265+
.search_permit_provider
1266+
.get_permits(split_with_req.len());
1267+
1268+
for ((split, mut request), permit_fut) in
1269+
split_with_req.into_iter().zip(permit_futures.into_iter())
1270+
{
1271+
let leaf_split_search_permit = permit_fut
12661272
.instrument(info_span!("waiting_for_leaf_search_split_semaphore"))
12671273
.await
12681274
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
@@ -1355,7 +1361,7 @@ async fn leaf_search_single_split_wrapper(
13551361
split: SplitIdAndFooterOffsets,
13561362
split_filter: Arc<RwLock<CanSplitDoBetter>>,
13571363
incremental_merge_collector: Arc<Mutex<IncrementalCollector>>,
1358-
leaf_split_search_permit: tokio::sync::OwnedSemaphorePermit,
1364+
search_permit: SearchPermit,
13591365
aggregations_limits: AggregationLimitsGuard,
13601366
) {
13611367
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
@@ -1374,7 +1380,7 @@ async fn leaf_search_single_split_wrapper(
13741380
.await;
13751381

13761382
// We explicitly drop it, to highlight it to the reader
1377-
std::mem::drop(leaf_split_search_permit);
1383+
std::mem::drop(search_permit);
13781384

13791385
if leaf_search_single_split_res.is_ok() {
13801386
timer.observe_duration();

quickwit/quickwit-search/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ mod service;
4444
pub(crate) mod top_k_collector;
4545

4646
mod metrics;
47+
mod search_permit_provider;
4748

4849
#[cfg(test)]
4950
mod tests;

quickwit/quickwit-search/src/list_terms.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,10 +331,12 @@ pub async fn leaf_list_terms(
331331
let index_storage_clone = index_storage.clone();
332332
let searcher_context_clone = searcher_context.clone();
333333
async move {
334-
let _leaf_split_search_permit = searcher_context_clone.leaf_search_split_semaphore.clone()
335-
.acquire_owned()
334+
let _leaf_split_search_permit = searcher_context_clone
335+
.search_permit_provider
336+
.get_permit()
336337
.await
337338
.expect("Failed to acquire permit. This should never happen! Please, report on https://github.com/quickwit-oss/quickwit/issues.");
339+
338340
// TODO dedicated counter and timer?
339341
crate::SEARCH_METRICS.leaf_searches_splits_total.inc();
340342
let timer = crate::SEARCH_METRICS

quickwit/quickwit-search/src/metrics.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
use once_cell::sync::Lazy;
2323
use quickwit_common::metrics::{
24-
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_histogram,
25-
new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec,
24+
exponential_buckets, linear_buckets, new_counter, new_counter_vec, new_gauge_vec,
25+
new_histogram, new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge,
2626
};
2727

2828
pub struct SearchMetrics {
@@ -35,6 +35,8 @@ pub struct SearchMetrics {
3535
pub leaf_searches_splits_total: IntCounter,
3636
pub leaf_search_split_duration_secs: Histogram,
3737
pub job_assigned_total: IntCounterVec<1>,
38+
pub leaf_search_single_split_tasks_pending: IntGauge,
39+
pub leaf_search_single_split_tasks_ongoing: IntGauge,
3840
}
3941

4042
impl Default for SearchMetrics {
@@ -50,6 +52,14 @@ impl Default for SearchMetrics {
5052
.copied()
5153
.collect();
5254

55+
let leaf_search_single_split_tasks = new_gauge_vec::<1>(
56+
"leaf_search_single_split_tasks",
57+
"Number of single split search tasks pending or ongoing",
58+
"search",
59+
&[],
60+
["status"], // takes values "ongoing" or "pending"
61+
);
62+
5363
SearchMetrics {
5464
root_search_requests_total: new_counter_vec(
5565
"root_search_requests_total",
@@ -110,6 +120,10 @@ impl Default for SearchMetrics {
110120
"search",
111121
exponential_buckets(0.001, 2.0, 15).unwrap(),
112122
),
123+
leaf_search_single_split_tasks_ongoing: leaf_search_single_split_tasks
124+
.with_label_values(["ongoing"]),
125+
leaf_search_single_split_tasks_pending: leaf_search_single_split_tasks
126+
.with_label_values(["pending"]),
113127
job_assigned_total: new_counter_vec(
114128
"job_assigned_total",
115129
"Number of job assigned to searchers, per affinity rank.",
Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at [email protected].
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use std::collections::VecDeque;
21+
use std::sync::{Arc, Mutex};
22+
23+
use quickwit_common::metrics::GaugeGuard;
24+
use tokio::sync::oneshot;
25+
26+
/// `SearchPermitProvider` is a distributor of permits to perform single split
27+
/// search operation.
28+
///
29+
/// Requests are served in order.
30+
#[derive(Clone)]
31+
pub struct SearchPermitProvider {
32+
inner_arc: Arc<Mutex<InnerSearchPermitProvider>>,
33+
}
34+
35+
impl SearchPermitProvider {
36+
pub fn new(num_permits: usize) -> SearchPermitProvider {
37+
SearchPermitProvider {
38+
inner_arc: Arc::new(Mutex::new(InnerSearchPermitProvider {
39+
num_permits_available: num_permits,
40+
permits_requests: VecDeque::new(),
41+
})),
42+
}
43+
}
44+
45+
/// Returns a future permit in the form of a oneshot Receiver channel.
46+
///
47+
/// At this point the permit is not acquired yet.
48+
#[must_use]
49+
pub fn get_permit(&self) -> oneshot::Receiver<SearchPermit> {
50+
let mut permits_lock = self.inner_arc.lock().unwrap();
51+
permits_lock.get_permit(&self.inner_arc)
52+
}
53+
54+
/// Returns a list of future permits in the form of oneshot Receiver channels.
55+
///
56+
/// The permits returned are guaranteed to be resolved in order.
57+
/// In addition, the permits are guaranteed to be resolved before permits returned by
58+
/// subsequent calls to this function (or `get_permit`).
59+
#[must_use]
60+
pub fn get_permits(&self, num_permits: usize) -> Vec<oneshot::Receiver<SearchPermit>> {
61+
let mut permits_lock = self.inner_arc.lock().unwrap();
62+
permits_lock.get_permits(num_permits, &self.inner_arc)
63+
}
64+
}
65+
66+
struct InnerSearchPermitProvider {
67+
num_permits_available: usize,
68+
permits_requests: VecDeque<oneshot::Sender<SearchPermit>>,
69+
}
70+
71+
impl InnerSearchPermitProvider {
72+
fn get_permit(
73+
&mut self,
74+
inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>,
75+
) -> oneshot::Receiver<SearchPermit> {
76+
let (tx, rx) = oneshot::channel();
77+
self.permits_requests.push_back(tx);
78+
self.assign_available_permits(inner_arc);
79+
rx
80+
}
81+
82+
fn get_permits(
83+
&mut self,
84+
num_permits: usize,
85+
inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>,
86+
) -> Vec<oneshot::Receiver<SearchPermit>> {
87+
let mut permits = Vec::with_capacity(num_permits);
88+
for _ in 0..num_permits {
89+
let (tx, rx) = oneshot::channel();
90+
self.permits_requests.push_back(tx);
91+
permits.push(rx);
92+
}
93+
self.assign_available_permits(inner_arc);
94+
permits
95+
}
96+
97+
fn recycle_permit(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
98+
self.num_permits_available += 1;
99+
self.assign_available_permits(inner_arc);
100+
}
101+
102+
fn assign_available_permits(&mut self, inner_arc: &Arc<Mutex<InnerSearchPermitProvider>>) {
103+
while self.num_permits_available > 0 {
104+
let Some(sender) = self.permits_requests.pop_front() else {
105+
break;
106+
};
107+
let mut ongoing_gauge_guard = GaugeGuard::from_gauge(
108+
&crate::SEARCH_METRICS.leaf_search_single_split_tasks_ongoing,
109+
);
110+
ongoing_gauge_guard.add(1);
111+
let send_res = sender.send(SearchPermit {
112+
_ongoing_gauge_guard: ongoing_gauge_guard,
113+
inner_arc: inner_arc.clone(),
114+
recycle_on_drop: true,
115+
});
116+
match send_res {
117+
Ok(()) => {
118+
self.num_permits_available -= 1;
119+
}
120+
Err(search_permit) => {
121+
search_permit.drop_without_recycling_permit();
122+
}
123+
}
124+
}
125+
crate::SEARCH_METRICS
126+
.leaf_search_single_split_tasks_pending
127+
.set(self.permits_requests.len() as i64);
128+
}
129+
}
130+
131+
pub struct SearchPermit {
132+
_ongoing_gauge_guard: GaugeGuard<'static>,
133+
inner_arc: Arc<Mutex<InnerSearchPermitProvider>>,
134+
recycle_on_drop: bool,
135+
}
136+
137+
impl SearchPermit {
138+
fn drop_without_recycling_permit(mut self) {
139+
self.recycle_on_drop = false;
140+
drop(self);
141+
}
142+
}
143+
144+
impl Drop for SearchPermit {
145+
fn drop(&mut self) {
146+
if !self.recycle_on_drop {
147+
return;
148+
}
149+
let mut inner_guard = self.inner_arc.lock().unwrap();
150+
inner_guard.recycle_permit(&self.inner_arc.clone());
151+
}
152+
}
153+
154+
#[cfg(test)]
155+
mod tests {
156+
use tokio::task::JoinSet;
157+
158+
use super::*;
159+
160+
#[tokio::test]
161+
async fn test_search_permits_get_permits_future() {
162+
// We test here that `get_permits_futures` does not interleave
163+
let search_permits = SearchPermitProvider::new(1);
164+
let mut all_futures = Vec::new();
165+
let first_batch_of_permits = search_permits.get_permits(10);
166+
assert_eq!(first_batch_of_permits.len(), 10);
167+
all_futures.extend(
168+
first_batch_of_permits
169+
.into_iter()
170+
.enumerate()
171+
.map(move |(i, fut)| ((1, i), fut)),
172+
);
173+
174+
let second_batch_of_permits = search_permits.get_permits(10);
175+
assert_eq!(second_batch_of_permits.len(), 10);
176+
all_futures.extend(
177+
second_batch_of_permits
178+
.into_iter()
179+
.enumerate()
180+
.map(move |(i, fut)| ((2, i), fut)),
181+
);
182+
183+
use rand::seq::SliceRandom;
184+
// not super useful, considering what join set does, but still a tiny bit more sound.
185+
all_futures.shuffle(&mut rand::thread_rng());
186+
187+
let mut join_set = JoinSet::new();
188+
for (res, fut) in all_futures {
189+
join_set.spawn(async move {
190+
let permit = fut.await;
191+
(res, permit)
192+
});
193+
}
194+
let mut ordered_result: Vec<(usize, usize)> = Vec::with_capacity(20);
195+
while let Some(Ok(((batch_id, order), _permit))) = join_set.join_next().await {
196+
ordered_result.push((batch_id, order));
197+
}
198+
199+
assert_eq!(ordered_result.len(), 20);
200+
for (i, res) in ordered_result[0..10].iter().enumerate() {
201+
assert_eq!(res, &(1, i));
202+
}
203+
for (i, res) in ordered_result[10..20].iter().enumerate() {
204+
assert_eq!(res, &(2, i));
205+
}
206+
}
207+
208+
#[tokio::test]
209+
async fn test_search_permits_receiver_race_condition() {
210+
// Here we test that we don't have a problem if the Receiver is dropped.
211+
// In particular, we want to check that there is not a race condition where drop attempts to
212+
// lock the mutex.
213+
let search_permits = SearchPermitProvider::new(1);
214+
let permit_rx = search_permits.get_permit();
215+
let permit_rx2 = search_permits.get_permit();
216+
drop(permit_rx2);
217+
drop(permit_rx);
218+
let _permit_rx = search_permits.get_permit();
219+
}
220+
}

0 commit comments

Comments
 (0)