Skip to content

Commit fdc729f

Browse files
authored
Optimize FindTraceIdsCollector (#2832)
1 parent cb0a0e0 commit fdc729f

File tree

16 files changed

+965
-370
lines changed

16 files changed

+965
-370
lines changed

quickwit/Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-aws/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ tokio = { workspace = true }
2525
tracing = { workspace = true }
2626

2727
[dev-dependencies]
28-
quickwit-actors = { workspace = true }
28+
quickwit-actors = { workspace = true, features = ["testsuite"] }
2929

3030
[features]
3131
kinesis = ["rusoto_kinesis"]

quickwit/quickwit-cli/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ predicates = { workspace = true }
7676
reqwest = { workspace = true }
7777
serial_test = { workspace = true }
7878

79+
quickwit-actors = { workspace = true, features = ["testsuite"] }
7980
quickwit-common = { workspace = true, features = ["testsuite"] }
8081
quickwit-config = { workspace = true, features = ["testsuite"] }
8182
quickwit-metastore = { workspace = true, features = ["testsuite"] }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
// Copyright (C) 2023 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at [email protected].
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use std::collections::BinaryHeap;
21+
use std::iter::FusedIterator;
22+
23+
// TODO: Remove this once `BinaryHeap::into_iter_sorted` is stabilized.
24+
25+
#[must_use = "iterators are lazy and do nothing unless consumed"]
26+
#[derive(Clone, Debug)]
27+
pub struct IntoIterSorted<T> {
28+
inner: BinaryHeap<T>,
29+
}
30+
31+
impl<T> IntoIterSorted<T> {
32+
pub fn new(instance: BinaryHeap<T>) -> Self {
33+
Self { inner: instance }
34+
}
35+
}
36+
37+
impl<T: Ord> Iterator for IntoIterSorted<T> {
38+
type Item = T;
39+
40+
#[inline]
41+
fn next(&mut self) -> Option<T> {
42+
self.inner.pop()
43+
}
44+
45+
#[inline]
46+
fn size_hint(&self) -> (usize, Option<usize>) {
47+
let exact = self.inner.len();
48+
(exact, Some(exact))
49+
}
50+
}
51+
52+
impl<T: Ord> ExactSizeIterator for IntoIterSorted<T> {}
53+
54+
impl<T: Ord> FusedIterator for IntoIterSorted<T> {}

quickwit/quickwit-common/src/lib.rs

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
mod checklist;
2323
mod coolid;
2424

25+
pub mod binary_heap;
2526
mod file_entry;
2627
pub mod fs;
2728
pub mod io;

quickwit/quickwit-jaeger/src/lib.rs

+66-108
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use prost::Message;
3131
use prost_types::{Duration as WellKnownDuration, Timestamp as WellKnownTimestamp};
3232
use quickwit_config::JaegerConfig;
3333
use quickwit_opentelemetry::otlp::{
34-
Event as QwEvent, Link as QwLink, Span as QwSpan, SpanFingerprint, SpanKind as QwSpanKind,
35-
SpanStatus as QwSpanStatus, OTEL_TRACE_INDEX_ID,
34+
B64TraceId, Event as QwEvent, Link as QwLink, Span as QwSpan, SpanFingerprint,
35+
SpanKind as QwSpanKind, SpanStatus as QwSpanStatus, TraceId, OTEL_TRACE_INDEX_ID,
3636
};
3737
use quickwit_proto::jaeger::api_v2::{
3838
KeyValue as JaegerKeyValue, Log as JaegerLog, Process as JaegerProcess, Span as JaegerSpan,
@@ -45,9 +45,10 @@ use quickwit_proto::jaeger::storage::v1::{
4545
SpansResponseChunk, TraceQueryParameters,
4646
};
4747
use quickwit_proto::{ListTermsRequest, SearchRequest};
48-
use quickwit_search::SearchService;
48+
use quickwit_search::{FindTraceIdsCollector, SearchService};
4949
use serde::Deserialize;
5050
use serde_json::Value as JsonValue;
51+
use tantivy::collector::Collector;
5152
use time::format_description::well_known::Rfc3339;
5253
use time::OffsetDateTime;
5354
use tokio::sync::mpsc;
@@ -63,9 +64,6 @@ mod metrics;
6364
// OpenTelemetry to Jaeger Transformation
6465
// <https://opentelemetry.io/docs/reference/specification/trace/sdk_exporters/jaeger/>
6566

66-
/// A base64-encoded 16-byte array.
67-
type TraceId = String;
68-
6967
type TimeIntervalSecs = RangeInclusive<i64>;
7068

7169
type JaegerResult<T> = Result<T, Status>;
@@ -171,13 +169,12 @@ impl JaegerService {
171169
.query
172170
.ok_or_else(|| Status::invalid_argument("Query is empty."))?;
173171

174-
let (trace_ids_b64, _) = self.find_trace_ids(trace_query).await?;
175-
debug!(trace_ids=?trace_ids_b64, "`find_trace_ids` response");
176-
177-
let trace_ids = trace_ids_b64
172+
let (trace_ids, _) = self.find_trace_ids(trace_query).await?;
173+
let trace_ids = trace_ids
178174
.into_iter()
179-
.map(|trace_id_b64| base64_decode(trace_id_b64.as_bytes(), "trace ID"))
180-
.collect::<Result<_, _>>()?;
175+
.map(|trace_id| trace_id.b64_decode().to_vec())
176+
.collect();
177+
debug!(trace_ids=?trace_ids, "`find_trace_ids` response");
181178
let response = FindTraceIDsResponse { trace_ids };
182179
Ok(response)
183180
}
@@ -212,7 +209,10 @@ impl JaegerService {
212209
request_start: Instant,
213210
) -> JaegerResult<SpanStream> {
214211
debug!(request=?request, "`get_trace` request");
215-
let trace_id = BASE64_STANDARD.encode(request.trace_id);
212+
debug_assert_eq!(request.trace_id.len(), 16);
213+
let trace_id = TraceId::try_from(request.trace_id)
214+
.map_err(|error| Status::invalid_argument(error.to_string()))?
215+
.b64_encode();
216216
let end = OffsetDateTime::now_utc().unix_timestamp();
217217
let start = end - self.lookback_period_secs;
218218
let search_window = start..=end;
@@ -226,7 +226,7 @@ impl JaegerService {
226226
async fn find_trace_ids(
227227
&self,
228228
trace_query: TraceQueryParameters,
229-
) -> Result<(Vec<TraceId>, TimeIntervalSecs), Status> {
229+
) -> Result<(Vec<B64TraceId>, TimeIntervalSecs), Status> {
230230
let index_id = OTEL_TRACE_INDEX_ID.to_string();
231231
let span_kind_opt = None;
232232
let min_span_start_timestamp_secs_opt = trace_query.start_time_min.map(|ts| ts.seconds);
@@ -269,15 +269,14 @@ impl JaegerService {
269269
return Ok((Vec::new(), 0..=0));
270270
};
271271
let trace_ids = collect_trace_ids(&agg_result_json)?;
272-
debug!(trace_ids=?trace_ids.0, "The query matched {} traces.", trace_ids.0.len());
273-
272+
debug!("The query matched {} traces.", trace_ids.0.len());
274273
Ok(trace_ids)
275274
}
276275

277276
#[instrument("stream_spans", skip_all, fields(num_traces=%trace_ids.len(), num_spans=Empty, num_bytes=Empty))]
278277
async fn stream_spans(
279278
&self,
280-
trace_ids: &[TraceId],
279+
trace_ids: &[B64TraceId],
281280
search_window: TimeIntervalSecs,
282281
operation_name: &'static str,
283282
request_start: Instant,
@@ -294,8 +293,10 @@ impl JaegerService {
294293
query.push_str(" OR ");
295294
}
296295
query.push_str("trace_id:");
297-
query.push_str(trace_id);
296+
query.push_str(trace_id.as_str())
298297
}
298+
debug!(query=%query, "Fetch spans query");
299+
299300
let search_request = SearchRequest {
300301
index_id: OTEL_TRACE_INDEX_ID.to_string(),
301302
query,
@@ -650,28 +651,12 @@ fn build_search_query(
650651
}
651652

652653
fn build_aggregations_query(num_traces: usize) -> String {
653-
// DANGER: The fast field is truncated to seconds but the aggregation returns timestamps in
654-
// microseconds by appending a bunch of zeros.
655-
let query = format!(
656-
r#"{{
657-
"trace_ids": {{
658-
"terms": {{
659-
"field": "trace_id",
660-
"size": {num_traces},
661-
"order": {{
662-
"max_span_start_timestamp_micros": "desc"
663-
}}
664-
}},
665-
"aggs": {{
666-
"max_span_start_timestamp_micros": {{
667-
"max": {{
668-
"field": "span_start_timestamp_secs"
669-
}}
670-
}}
671-
}}
672-
}}
673-
}}"#,
674-
);
654+
let query = serde_json::to_string(&FindTraceIdsCollector {
655+
num_traces,
656+
trace_id_field_name: "trace_id".to_string(),
657+
span_timestamp_field_name: "span_start_timestamp_secs".to_string(),
658+
})
659+
.expect("The collector should be JSON serializable.");
675660
debug!(query=%query, "Aggregations query");
676661
query
677662
}
@@ -953,44 +938,21 @@ fn qw_event_to_jaeger_log(event: QwEvent) -> Result<JaegerLog, Status> {
953938
Ok(log)
954939
}
955940

956-
#[derive(Deserialize)]
957-
struct TraceIdsAggResult {
958-
trace_ids: TraceIdBuckets,
959-
}
960-
961-
#[derive(Deserialize)]
962-
struct TraceIdBuckets {
963-
#[serde(default)]
964-
buckets: Vec<TraceIdBucket>,
965-
}
966-
967-
#[derive(Deserialize)]
968-
struct TraceIdBucket {
969-
key: String,
970-
max_span_start_timestamp_micros: MetricValue,
971-
}
972-
973-
#[derive(Deserialize)]
974-
struct MetricValue {
975-
value: f64,
976-
}
977-
978-
fn collect_trace_ids(agg_result_json: &str) -> Result<(Vec<TraceId>, TimeIntervalSecs), Status> {
979-
let agg_result: TraceIdsAggResult = json_deserialize(agg_result_json, "trace IDs aggregation")?;
980-
if agg_result.trace_ids.buckets.is_empty() {
941+
fn collect_trace_ids(trace_ids_json: &str) -> Result<(Vec<B64TraceId>, TimeIntervalSecs), Status> {
942+
let collector_fruit: <FindTraceIdsCollector as Collector>::Fruit =
943+
json_deserialize(trace_ids_json, "trace IDs aggregation")?;
944+
if collector_fruit.is_empty() {
981945
return Ok((Vec::new(), 0..=0));
982946
}
983-
let mut trace_ids = Vec::with_capacity(agg_result.trace_ids.buckets.len());
947+
let mut trace_ids = Vec::with_capacity(collector_fruit.len());
984948
let mut start = i64::MAX;
985949
let mut end = i64::MIN;
986950

987-
for bucket in agg_result.trace_ids.buckets {
988-
trace_ids.push(bucket.key);
989-
start = start.min(bucket.max_span_start_timestamp_micros.value as i64);
990-
end = end.max(bucket.max_span_start_timestamp_micros.value as i64);
951+
for trace_id in collector_fruit {
952+
trace_ids.push(trace_id.trace_id);
953+
start = start.min(trace_id.span_timestamp.into_timestamp_secs());
954+
end = end.max(trace_id.span_timestamp.into_timestamp_secs());
991955
}
992-
let start = start / 1_000_000;
993-
let end = end / 1_000_000;
994956
Ok((trace_ids, start..=end))
995957
}
996958

@@ -1011,7 +973,7 @@ where T: Deserialize<'a> {
1011973
match serde_json::from_str(json) {
1012974
Ok(deserialized) => Ok(deserialized),
1013975
Err(error) => {
1014-
error!("Failed to deserialize {label}: {error:?}",);
976+
error!("Failed to deserialize {label}: {error:?}");
1015977
Err(Status::internal(format!("Failed to deserialize {json}.")))
1016978
}
1017979
}
@@ -1020,11 +982,8 @@ where T: Deserialize<'a> {
1020982
#[cfg(test)]
1021983
mod tests {
1022984
use quickwit_proto::jaeger::api_v2::ValueType;
1023-
use quickwit_search::{encode_term_for_test, MockSearchService};
985+
use quickwit_search::{encode_term_for_test, MockSearchService, QuickwitAggregations};
1024986
use serde_json::json;
1025-
use tantivy::aggregation::agg_req::{
1026-
Aggregation, Aggregations, BucketAggregationType, MetricAggregation,
1027-
};
1028987

1029988
use super::*;
1030989

@@ -1478,21 +1437,16 @@ mod tests {
14781437
#[test]
14791438
fn test_build_aggregations_query() {
14801439
let aggregations_query = build_aggregations_query(77);
1481-
let aggregations: Aggregations = serde_json::from_str(&aggregations_query).unwrap();
1482-
let aggregation = aggregations.get("trace_ids").unwrap();
1483-
let Aggregation::Bucket(ref bucket_aggregation) = aggregation else {
1484-
panic!("Expected a bucket aggregation!");
1440+
let aggregations: QuickwitAggregations = serde_json::from_str(&aggregations_query).unwrap();
1441+
let QuickwitAggregations::FindTraceIdsAggregation(collector) = aggregations else {
1442+
panic!("Expected find trace IDs aggregation!");
14851443
};
1486-
let BucketAggregationType::Terms(ref terms_aggregation) = bucket_aggregation.bucket_agg else {
1487-
panic!("Expected a terms aggregation!");
1488-
};
1489-
assert_eq!(terms_aggregation.field, "trace_id");
1490-
assert_eq!(terms_aggregation.size.unwrap(), 77);
1491-
1492-
let Aggregation::Metric(MetricAggregation::Max(max_aggregation)) = bucket_aggregation.sub_aggregation.get("max_span_start_timestamp_micros").unwrap() else {
1493-
panic!("Expected a max metric aggregation!");
1494-
};
1495-
assert_eq!(max_aggregation.field, "span_start_timestamp_secs");
1444+
assert_eq!(collector.num_traces, 77);
1445+
assert_eq!(collector.trace_id_field_name, "trace_id");
1446+
assert_eq!(
1447+
collector.span_timestamp_field_name,
1448+
"span_start_timestamp_secs"
1449+
);
14961450
}
14971451

14981452
#[test]
@@ -1738,31 +1692,35 @@ mod tests {
17381692
#[test]
17391693
fn test_collect_trace_ids() {
17401694
{
1741-
let agg_result_json = r#"{"trace_ids": {}}"#;
1695+
let agg_result_json = r#"[]"#;
17421696
let (trace_ids, _span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap();
17431697
assert!(trace_ids.is_empty());
17441698
}
17451699
{
1746-
let agg_result_json = r#"{
1747-
"trace_ids": {
1748-
"buckets": [
1749-
{"key": "jIr1E97+2DJBcBnOb/wjQg==", "doc_count": 3, "max_span_start_timestamp_micros": {"value": 1674611393000000.0 }}]}}"#;
1700+
let agg_result_json = r#"[
1701+
{
1702+
"trace_id": "AQEBAQEBAQEBAQEBAQEBAQ==",
1703+
"span_timestamp": 1736522020000000
1704+
}
1705+
]"#;
17501706
let (trace_ids, span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap();
1751-
assert_eq!(trace_ids, &["jIr1E97+2DJBcBnOb/wjQg=="]);
1752-
assert_eq!(span_timestamps_range, 1674611393..=1674611393);
1707+
assert_eq!(trace_ids.len(), 1);
1708+
assert_eq!(span_timestamps_range, 1736522020..=1736522020);
17531709
}
17541710
{
1755-
let agg_result_json = r#"{
1756-
"trace_ids": {
1757-
"buckets": [
1758-
{"key": "FKvicG794620BNsewGCknA==", "doc_count": 7, "max_span_start_timestamp_micros": { "value": 1674611388000000.0 }},
1759-
{"key": "jIr1E97+2DJBcBnOb/wjQg==", "doc_count": 3, "max_span_start_timestamp_micros": { "value": 1674611393000000.0 }}]}}"#;
1711+
let agg_result_json = r#"[
1712+
{
1713+
"trace_id": "AQIDBAUGBwgJCgsMDQ4PEA==",
1714+
"span_timestamp": 1736522020000000
1715+
},
1716+
{
1717+
"trace_id": "AgICAgICAgICAgICAgICAg==",
1718+
"span_timestamp": 1704899620000000
1719+
}
1720+
]"#;
17601721
let (trace_ids, span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap();
1761-
assert_eq!(
1762-
trace_ids,
1763-
&["FKvicG794620BNsewGCknA==", "jIr1E97+2DJBcBnOb/wjQg=="]
1764-
);
1765-
assert_eq!(span_timestamps_range, 1674611388..=1674611393);
1722+
assert_eq!(trace_ids.len(), 2);
1723+
assert_eq!(span_timestamps_range, 1704899620..=1736522020);
17661724
}
17671725
}
17681726

0 commit comments

Comments
 (0)