Skip to content

Commit cbaa65a

Browse files
PSeitzfulmicoton
andauthored
switch from serde_json to postcard (#3170)
* switch from json to postcard switch to postcard for better datastructure support when sending intermediate results between nodes * Apply suggestions from code review Co-authored-by: Paul Masurel <[email protected]> * Update quickwit/quickwit-proto/protos/quickwit/search_api.proto Co-authored-by: Paul Masurel <[email protected]> --------- Co-authored-by: Paul Masurel <[email protected]>
1 parent 625b25f commit cbaa65a

File tree

12 files changed

+99
-78
lines changed

12 files changed

+99
-78
lines changed

quickwit/Cargo.lock

+10-9
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ quickwit-serve = { version = "0.5.0", path = "./quickwit-serve" }
225225
quickwit-storage = { version = "0.5.0", path = "./quickwit-storage" }
226226
quickwit-telemetry = { version = "0.5.0", path = "./quickwit-telemetry" }
227227

228-
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "80df1d9", default-features = false, features = [
228+
tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "c599bf3", default-features = false, features = [
229229
"mmap",
230230
"lz4-compression",
231231
"zstd-compression",

quickwit/quickwit-opentelemetry/src/otlp/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,7 @@ use quickwit_proto::opentelemetry::proto::common::v1::any_value::Value as OtlpVa
2727
use quickwit_proto::opentelemetry::proto::common::v1::{
2828
AnyValue as OtlpAnyValue, KeyValue as OtlpKeyValue,
2929
};
30-
use serde;
31-
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
30+
use serde::{self, de, Deserialize, Deserializer, Serialize, Serializer};
3231
use serde_json::{Number as JsonNumber, Value as JsonValue};
3332
use tracing::warn;
3433

quickwit/quickwit-proto/protos/quickwit/search_api.proto

+6-2
Original file line numberDiff line numberDiff line change
@@ -240,8 +240,12 @@ message LeafSearchResponse {
240240
// num_attempted_splits = num_successful_splits + num_failed_splits.
241241
uint64 num_attempted_splits = 4;
242242

243-
// json serialized intermediate aggregation_result.
244-
optional string intermediate_aggregation_result = 5;
243+
// Deprecated json serialized intermediate aggregation_result.
244+
reserved 5;
245+
246+
// postcard serialized intermediate aggregation_result.
247+
optional bytes intermediate_aggregation_result = 6;
248+
245249

246250
}
247251

quickwit/quickwit-proto/src/quickwit.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -199,10 +199,10 @@ pub struct LeafSearchResponse {
199199
/// num_attempted_splits = num_successful_splits + num_failed_splits.
200200
#[prost(uint64, tag = "4")]
201201
pub num_attempted_splits: u64,
202-
/// json serialized intermediate aggregation_result.
203-
#[prost(string, optional, tag = "5")]
202+
/// postcard serialized intermediate aggregation_result.
203+
#[prost(bytes = "vec", optional, tag = "5")]
204204
pub intermediate_aggregation_result: ::core::option::Option<
205-
::prost::alloc::string::String,
205+
::prost::alloc::vec::Vec<u8>,
206206
>,
207207
}
208208
#[derive(Serialize, Deserialize, utoipa::ToSchema)]

quickwit/quickwit-search/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ itertools = { workspace = true }
2121
lru = { workspace = true }
2222
mockall = { workspace = true }
2323
once_cell = { workspace = true }
24+
postcard = { workspace = true }
2425
rayon = { workspace = true }
2526
serde = { workspace = true }
2627
serde_json = { workspace = true }

quickwit/quickwit-search/src/cluster_client.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -168,17 +168,18 @@ fn merge_leaf_search_results(
168168
.append(&mut retry_response.partial_hits);
169169
let intermediate_aggregation_result = initial_response
170170
.intermediate_aggregation_result
171-
.map::<crate::Result<_>, _>(|res1_str| {
171+
.map::<crate::Result<_>, _>(|res1_bytes| {
172172
if let Some(res2_str) = retry_response.intermediate_aggregation_result.as_ref()
173173
{
174174
let mut res1: IntermediateAggregationResults =
175-
serde_json::from_str(&res1_str)?;
176-
let res2: IntermediateAggregationResults = serde_json::from_str(res2_str)?;
175+
postcard::from_bytes(res1_bytes.as_slice())?;
176+
let res2: IntermediateAggregationResults =
177+
postcard::from_bytes(res2_str.as_slice())?;
177178
res1.merge_fruits(res2)?;
178-
let res = serde_json::to_string(&res1)?;
179-
Ok(res)
179+
let serialized = postcard::to_allocvec(&res1)?;
180+
Ok(serialized)
180181
} else {
181-
Ok(res1_str)
182+
Ok(res1_bytes)
182183
}
183184
})
184185
.transpose()?;

quickwit/quickwit-search/src/collector.rs

+25-15
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ use tantivy::aggregation::{AggregationLimits, AggregationSegmentCollector};
3131
use tantivy::collector::{Collector, SegmentCollector};
3232
use tantivy::columnar::ColumnType;
3333
use tantivy::fastfield::Column;
34-
use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader};
34+
use tantivy::{DocId, Score, SegmentOrdinal, SegmentReader, TantivyError};
3535

3636
use crate::filters::{create_timestamp_filter_builder, TimestampFilter, TimestampFilterBuilder};
3737
use crate::find_trace_ids_collector::{FindTraceIdsCollector, FindTraceIdsSegmentCollector};
@@ -267,15 +267,16 @@ impl SegmentCollector for QuickwitSegmentCollector {
267267
.collect();
268268

269269
let intermediate_aggregation_result = match self.aggregation {
270-
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => Some(
271-
serde_json::to_string(&collector.harvest())
272-
.expect("Collector fruit should be JSON serializable."),
273-
),
270+
Some(AggregationSegmentCollectors::FindTraceIdsSegmentCollector(collector)) => {
271+
let fruit = collector.harvest();
272+
let serialized =
273+
postcard::to_allocvec(&fruit).expect("Collector fruit should be serializable.");
274+
Some(serialized)
275+
}
274276
Some(AggregationSegmentCollectors::TantivyAggregationSegmentCollector(collector)) => {
275-
Some(
276-
serde_json::to_string(&collector.harvest()?)
277-
.expect("Collector fruit should be JSON serializable."),
278-
)
277+
let serialized = postcard::to_allocvec(&collector.harvest()?)
278+
.expect("Collector fruit should be serializable.");
279+
Some(serialized)
279280
}
280281
None => None,
281282
};
@@ -438,15 +439,19 @@ impl Collector for QuickwitCollector {
438439
}
439440
}
440441

442+
fn map_error(err: postcard::Error) -> TantivyError {
443+
TantivyError::InternalError(format!("Merge Result Postcard Error: {}", err))
444+
}
445+
441446
/// Merges a set of Leaf Results.
442447
fn merge_leaf_responses(
443448
aggregations_opt: &Option<QuickwitAggregations>,
444-
leaf_responses: Vec<LeafSearchResponse>,
449+
mut leaf_responses: Vec<LeafSearchResponse>,
445450
max_hits: usize,
446451
) -> tantivy::Result<LeafSearchResponse> {
447452
// Optimization: No merging needed if there is only one result.
448453
if leaf_responses.len() == 1 {
449-
return Ok(leaf_responses.into_iter().next().unwrap_or_default()); //< default is actually never called
454+
return Ok(leaf_responses.pop().unwrap());
450455
}
451456
let merged_intermediate_aggregation_result = match aggregations_opt {
452457
Some(QuickwitAggregations::FindTraceIdsAggregation(collector)) => {
@@ -457,21 +462,24 @@ fn merge_leaf_responses(
457462
.filter_map(|leaf_response| {
458463
leaf_response.intermediate_aggregation_result.as_ref().map(
459464
|intermediate_aggregation_result| {
460-
serde_json::from_str(intermediate_aggregation_result)
465+
postcard::from_bytes(intermediate_aggregation_result.as_slice())
466+
.map_err(map_error)
461467
},
462468
)
463469
})
464470
.collect::<Result<_, _>>()?;
465471
let merged_fruit = collector.merge_fruits(fruits)?;
466-
Some(serde_json::to_string(&merged_fruit)?)
472+
let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?;
473+
Some(serialized)
467474
}
468475
Some(QuickwitAggregations::TantivyAggregations(_)) => {
469476
let fruits: Vec<IntermediateAggregationResults> = leaf_responses
470477
.iter()
471478
.filter_map(|leaf_response| {
472479
leaf_response.intermediate_aggregation_result.as_ref().map(
473480
|intermediate_aggregation_result| {
474-
serde_json::from_str(intermediate_aggregation_result)
481+
postcard::from_bytes(intermediate_aggregation_result.as_slice())
482+
.map_err(map_error)
475483
},
476484
)
477485
})
@@ -483,7 +491,9 @@ fn merge_leaf_responses(
483491
for fruit in fruit_iter {
484492
merged_fruit.merge_fruits(fruit)?;
485493
}
486-
Some(serde_json::to_string(&merged_fruit)?)
494+
let serialized = postcard::to_allocvec(&merged_fruit).map_err(map_error)?;
495+
496+
Some(serialized)
487497
} else {
488498
None
489499
}

quickwit/quickwit-search/src/error.rs

+6
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ impl From<TantivyError> for SearchError {
7575
}
7676
}
7777

78+
impl From<postcard::Error> for SearchError {
79+
fn from(error: postcard::Error) -> Self {
80+
SearchError::InternalError(format!("Postcard error: {error}"))
81+
}
82+
}
83+
7884
impl From<serde_json::Error> for SearchError {
7985
fn from(serde_error: serde_json::Error) -> Self {
8086
SearchError::InternalError(format!("Serde error: {serde_error}"))

quickwit/quickwit-search/src/leaf.rs

+1
Original file line numberDiff line numberDiff line change
@@ -347,6 +347,7 @@ async fn leaf_search_single_split(
347347
.map_err(|_| {
348348
crate::SearchError::InternalError(format!("Leaf search panicked. split={split_id}"))
349349
})??;
350+
350351
Ok(leaf_search_response)
351352
}
352353

quickwit/quickwit-search/src/lib.rs

+13-28
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,8 @@ mod tests;
4646
pub use collector::QuickwitAggregations;
4747
use metrics::SEARCH_METRICS;
4848
use quickwit_doc_mapper::DocMapper;
49-
use root::validate_request;
49+
use root::{finalize_aggregation, validate_request};
5050
use service::SearcherContext;
51-
use tantivy::aggregation::AggregationLimits;
5251
use tantivy::query::Query as TantivyQuery;
5352
use tantivy::schema::NamedFieldDocument;
5453

@@ -66,8 +65,6 @@ use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
6665
use quickwit_metastore::{ListSplitsQuery, Metastore, SplitMetadata, SplitState};
6766
use quickwit_proto::{Hit, PartialHit, SearchRequest, SearchResponse, SplitIdAndFooterOffsets};
6867
use quickwit_storage::StorageUriResolver;
69-
use tantivy::aggregation::agg_result::AggregationResults;
70-
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
7168
use tantivy::DocAddress;
7269

7370
pub use crate::client::{create_search_service_client, SearchServiceClient};
@@ -193,6 +190,7 @@ pub async fn single_node_search(
193190
// Validates the query by effectively building it against the current schema.
194191
doc_mapper.query(doc_mapper.schema(), search_request)?;
195192
let searcher_context = Arc::new(SearcherContext::new(SearcherConfig::default()));
193+
196194
let leaf_search_response = leaf_search(
197195
searcher_context.clone(),
198196
search_request,
@@ -229,30 +227,17 @@ pub async fn single_node_search(
229227
})
230228
.collect();
231229
let elapsed = start_instant.elapsed();
232-
let aggregation = if let Some(intermediate_aggregation_result) =
233-
leaf_search_response.intermediate_aggregation_result
234-
{
235-
let aggregations: QuickwitAggregations =
236-
serde_json::from_str(search_request.aggregation_request.as_ref().expect(
237-
"Aggregation should be present since we are processing an intermediate \
238-
aggregation result.",
239-
))?;
240-
match aggregations {
241-
QuickwitAggregations::FindTraceIdsAggregation(_) => {
242-
// There is nothing to merge here because there is only one leaf response.
243-
Some(intermediate_aggregation_result)
244-
}
245-
QuickwitAggregations::TantivyAggregations(aggregations) => {
246-
let res: IntermediateAggregationResults =
247-
serde_json::from_str(&intermediate_aggregation_result)?;
248-
let res: AggregationResults =
249-
res.into_final_result(aggregations, &AggregationLimits::default())?;
250-
Some(serde_json::to_string(&res)?)
251-
}
252-
}
253-
} else {
254-
None
255-
};
230+
231+
let aggregations: Option<QuickwitAggregations> = search_request
232+
.aggregation_request
233+
.as_ref()
234+
.map(|agg| serde_json::from_str(agg))
235+
.transpose()?;
236+
237+
let aggregation = finalize_aggregation(
238+
leaf_search_response.intermediate_aggregation_result,
239+
aggregations,
240+
)?;
256241
Ok(SearchResponse {
257242
aggregation,
258243
num_hits: leaf_search_response.num_hits,

0 commit comments

Comments
 (0)