Skip to content

Commit 693c9e9

Browse files
committed
support compressed data
closes #3990
1 parent b630f9b commit 693c9e9

File tree

7 files changed

+104
-4
lines changed

7 files changed

+104
-4
lines changed

quickwit/Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-serve/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ base64 = { workspace = true }
1616
bytes = { workspace = true }
1717
bytesize = { workspace = true }
1818
elasticsearch-dsl = "0.4.15"
19+
flate2 = { workspace = true }
1920
futures = { workspace = true }
2021
futures-util = { workspace = true }
2122
hex = { workspace = true }
@@ -46,6 +47,7 @@ tracing = { workspace = true }
4647
tracing-opentelemetry = { workspace = true }
4748
utoipa = { workspace = true }
4849
warp = { workspace = true }
50+
zstd = { workspace = true }
4951

5052
quickwit-actors = { workspace = true }
5153
quickwit-cluster = { workspace = true }
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
// Copyright (C) 2024 Quickwit, Inc.
2+
//
3+
// Quickwit is offered under the AGPL v3.0 and as commercial software.
4+
// For commercial licensing, contact us at [email protected].
5+
//
6+
// AGPL:
7+
// This program is free software: you can redistribute it and/or modify
8+
// it under the terms of the GNU Affero General Public License as
9+
// published by the Free Software Foundation, either version 3 of the
10+
// License, or (at your option) any later version.
11+
//
12+
// This program is distributed in the hope that it will be useful,
13+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
// GNU Affero General Public License for more details.
16+
//
17+
// You should have received a copy of the GNU Affero General Public License
18+
// along with this program. If not, see <http://www.gnu.org/licenses/>.
19+
20+
use core::fmt;
21+
use std::io::Read;
22+
23+
use bytes::Bytes;
24+
use flate2::read::GzDecoder;
25+
use tokio::task;
26+
use warp::reject::Reject;
27+
use warp::Filter;
28+
29+
/// There are two ways to decompress the body:
30+
/// - Stream the body through an async decompressor
31+
/// - Fetch the body and then decompress the bytes
32+
///
33+
/// The first approach lowers the latency, while the second approach is more CPU efficient.
34+
/// Ingesting data is usually CPU bound and there is considerable latency until the data is
35+
/// searchable, so the second approach is more suitable for this use case.
36+
async fn decompress_body(encoding: Option<String>, body: Bytes) -> Result<Bytes, warp::Rejection> {
37+
match encoding.as_deref() {
38+
Some("gzip" | "x-gzip") => {
39+
let decompressed = task::spawn_blocking(move || {
40+
let mut decompressed = Vec::new();
41+
let mut decoder = GzDecoder::new(body.as_ref());
42+
decoder
43+
.read_to_end(&mut decompressed)
44+
.map_err(|_| warp::reject())?;
45+
Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed))
46+
})
47+
.await
48+
.map_err(|_| warp::reject())??;
49+
Ok(decompressed)
50+
}
51+
Some("zstd") => {
52+
let decompressed = task::spawn_blocking(move || {
53+
zstd::decode_all(body.as_ref())
54+
.map(Bytes::from)
55+
.map_err(|_| warp::reject())
56+
})
57+
.await
58+
.map_err(|_| warp::reject())??;
59+
Ok(decompressed)
60+
}
61+
Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding(
62+
encoding.to_string(),
63+
))),
64+
_ => Ok(body),
65+
}
66+
}
67+
68+
#[derive(Debug)]
69+
struct UnsupportedEncoding(String);
70+
71+
impl fmt::Display for UnsupportedEncoding {
72+
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
73+
write!(
74+
f,
75+
"Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'.",
76+
self.0
77+
)
78+
}
79+
}
80+
81+
impl Reject for UnsupportedEncoding {}
82+
83+
/// Custom filter for optional decompression
84+
pub(crate) fn get_body_bytes() -> impl Filter<Extract = (Bytes,), Error = warp::Rejection> + Clone {
85+
warp::header::optional("content-encoding")
86+
.and(warp::body::bytes())
87+
.and_then(|encoding: Option<String>, body: Bytes| async move {
88+
decompress_body(encoding, body).await
89+
})
90+
}

quickwit/quickwit-serve/src/elasticsearch_api/filter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use super::model::{
2727
CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
2828
MultiSearchQueryParams, SearchQueryParamsCount,
2929
};
30+
use crate::decompression::get_body_bytes;
3031
use crate::elasticsearch_api::model::{
3132
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
3233
};
@@ -76,7 +77,7 @@ pub(crate) fn elastic_bulk_filter(
7677
.and(warp::body::content_length_limit(
7778
CONTENT_LENGTH_LIMIT.as_u64(),
7879
))
79-
.and(warp::body::bytes())
80+
.and(get_body_bytes())
8081
.and(serde_qs::warp::query(serde_qs::Config::default()))
8182
}
8283

quickwit/quickwit-serve/src/ingest_api/rest_handler.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ use serde::Deserialize;
3232
use thiserror::Error;
3333
use warp::{Filter, Rejection};
3434

35+
use crate::decompression::get_body_bytes;
3536
use crate::format::extract_format_from_qs;
3637
use crate::rest_api_response::into_rest_api_response;
3738
use crate::{with_arg, BodyFormat};
@@ -80,7 +81,7 @@ fn ingest_filter(
8081
.and(warp::body::content_length_limit(
8182
config.content_length_limit.as_u64(),
8283
))
83-
.and(warp::body::bytes())
84+
.and(get_body_bytes())
8485
.and(serde_qs::warp::query::<IngestOptions>(
8586
serde_qs::Config::default(),
8687
))
@@ -104,7 +105,7 @@ fn ingest_v2_filter(
104105
.and(warp::body::content_length_limit(
105106
config.content_length_limit.as_u64(),
106107
))
107-
.and(warp::body::bytes())
108+
.and(get_body_bytes())
108109
.and(serde_qs::warp::query::<IngestOptions>(
109110
serde_qs::Config::default(),
110111
))

quickwit/quickwit-serve/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
mod build_info;
2121
mod cluster_api;
2222
mod debugging_api;
23+
mod decompression;
2324
mod delete_task_api;
2425
mod elasticsearch_api;
2526
mod format;

quickwit/rest-api-tests/run_tests.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,10 @@ def run_request_step(method, step, previous_result):
100100
body_from_file = step.get("body_from_file", None)
101101
if body_from_file is not None:
102102
body_from_file = osp.join(step["cwd"], body_from_file)
103-
kvargs["data"] = load_data(body_from_file)
103+
kvargs["data"] = open(body_from_file, 'rb').read()
104+
if body_from_file.endswith("gz"):
105+
kvargs.setdefault("headers")["content-encoding"] = "gzip"
106+
104107
kvargs = resolve_previous_result(kvargs, previous_result)
105108
ndjson = step.get("ndjson", None)
106109
if ndjson is not None:

0 commit comments

Comments
 (0)