diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock
index d289332f6be..b8c48cd0db1 100644
--- a/quickwit/Cargo.lock
+++ b/quickwit/Cargo.lock
@@ -6285,6 +6285,7 @@ dependencies = [
"bytes",
"bytesize",
"elasticsearch-dsl",
+ "flate2",
"futures",
"futures-util",
"hex",
@@ -6339,6 +6340,7 @@ dependencies = [
"tracing-opentelemetry",
"utoipa",
"warp",
+ "zstd 0.13.0",
]
[[package]]
diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml
index 744fc648340..b59e73c3172 100644
--- a/quickwit/quickwit-serve/Cargo.toml
+++ b/quickwit/quickwit-serve/Cargo.toml
@@ -16,6 +16,7 @@ base64 = { workspace = true }
bytes = { workspace = true }
bytesize = { workspace = true }
elasticsearch-dsl = "0.4.15"
+flate2 = { workspace = true }
futures = { workspace = true }
futures-util = { workspace = true }
hex = { workspace = true }
@@ -46,6 +47,7 @@ tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
utoipa = { workspace = true }
warp = { workspace = true }
+zstd = { workspace = true }
quickwit-actors = { workspace = true }
quickwit-cluster = { workspace = true }
diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs
new file mode 100644
index 00000000000..c0be92cb214
--- /dev/null
+++ b/quickwit/quickwit-serve/src/decompression.rs
@@ -0,0 +1,87 @@
+// Copyright (C) 2024 Quickwit, Inc.
+//
+// Quickwit is offered under the AGPL v3.0 and as commercial software.
+// For commercial licensing, contact us at hello@quickwit.io.
+//
+// AGPL:
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Affero General Public License as
+// published by the Free Software Foundation, either version 3 of the
+// License, or (at your option) any later version.
+//
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Affero General Public License for more details.
+//
+// You should have received a copy of the GNU Affero General Public License
+// along with this program. If not, see .
+
+use std::io::Read;
+
+use bytes::Bytes;
+use flate2::read::GzDecoder;
+use thiserror::Error;
+use tokio::task;
+use warp::reject::Reject;
+use warp::Filter;
+
+/// There are two ways to decompress the body:
+/// - Stream the body through an async decompressor
+/// - Fetch the body and then decompress the bytes
+///
+/// The first approach lowers the latency, while the second approach is more CPU efficient.
+/// Ingesting data is usually CPU bound and there is considerable latency until the data is
+/// searchable, so the second approach is more suitable for this use case.
+async fn decompress_body(encoding: Option, body: Bytes) -> Result {
+ match encoding.as_deref() {
+ Some("gzip" | "x-gzip") => {
+ let decompressed = task::spawn_blocking(move || {
+ let mut decompressed = Vec::new();
+ let mut decoder = GzDecoder::new(body.as_ref());
+ decoder
+ .read_to_end(&mut decompressed)
+ .map_err(|_| warp::reject::custom(CorruptedData))?;
+ Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed))
+ })
+ .await
+ .map_err(|_| warp::reject::custom(CorruptedData))??;
+ Ok(decompressed)
+ }
+ Some("zstd") => {
+ let decompressed = task::spawn_blocking(move || {
+ zstd::decode_all(body.as_ref())
+ .map(Bytes::from)
+ .map_err(|_| warp::reject::custom(CorruptedData))
+ })
+ .await
+ .map_err(|_| warp::reject::custom(CorruptedData))??;
+ Ok(decompressed)
+ }
+ Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding(
+ encoding.to_string(),
+ ))),
+ _ => Ok(body),
+ }
+}
+
+#[derive(Debug, Error)]
+#[error("Error while decompressing the data")]
+pub(crate) struct CorruptedData;
+
+impl Reject for CorruptedData {}
+
+#[derive(Debug, Error)]
+#[error("Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'", self.0)]
+pub(crate) struct UnsupportedEncoding(String);
+
+impl Reject for UnsupportedEncoding {}
+
+/// Custom filter for optional decompression
+pub(crate) fn get_body_bytes() -> impl Filter + Clone {
+ warp::header::optional("content-encoding")
+ .and(warp::body::bytes())
+ .and_then(|encoding: Option, body: Bytes| async move {
+ decompress_body(encoding, body).await
+ })
+}
diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
index 4ea8e12e757..3825f97939f 100644
--- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
+++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
@@ -27,6 +27,7 @@ use super::model::{
CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
MultiSearchQueryParams, SearchQueryParamsCount,
};
+use crate::decompression::get_body_bytes;
use crate::elasticsearch_api::model::{
ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams,
};
@@ -76,7 +77,7 @@ pub(crate) fn elastic_bulk_filter(
.and(warp::body::content_length_limit(
CONTENT_LENGTH_LIMIT.as_u64(),
))
- .and(warp::body::bytes())
+ .and(get_body_bytes())
.and(serde_qs::warp::query(serde_qs::Config::default()))
}
diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
index fa958abae4b..828b3556ba3 100644
--- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
+++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
@@ -32,6 +32,7 @@ use serde::Deserialize;
use thiserror::Error;
use warp::{Filter, Rejection};
+use crate::decompression::get_body_bytes;
use crate::format::extract_format_from_qs;
use crate::rest_api_response::into_rest_api_response;
use crate::{with_arg, BodyFormat};
@@ -80,7 +81,7 @@ fn ingest_filter(
.and(warp::body::content_length_limit(
config.content_length_limit.as_u64(),
))
- .and(warp::body::bytes())
+ .and(get_body_bytes())
.and(serde_qs::warp::query::(
serde_qs::Config::default(),
))
@@ -104,7 +105,7 @@ fn ingest_v2_filter(
.and(warp::body::content_length_limit(
config.content_length_limit.as_u64(),
))
- .and(warp::body::bytes())
+ .and(get_body_bytes())
.and(serde_qs::warp::query::(
serde_qs::Config::default(),
))
diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs
index a44495f9731..16d68d0f3bc 100644
--- a/quickwit/quickwit-serve/src/lib.rs
+++ b/quickwit/quickwit-serve/src/lib.rs
@@ -20,6 +20,7 @@
mod build_info;
mod cluster_api;
mod debugging_api;
+mod decompression;
mod delete_task_api;
mod elasticsearch_api;
mod format;
diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs
index a4265167ba8..d80c87377eb 100644
--- a/quickwit/quickwit-serve/src/rest.rs
+++ b/quickwit/quickwit-serve/src/rest.rs
@@ -34,6 +34,7 @@ use warp::{redirect, Filter, Rejection, Reply};
use crate::cluster_api::cluster_handler;
use crate::debugging_api::debugging_handler;
+use crate::decompression::{CorruptedData, UnsupportedEncoding};
use crate::delete_task_api::delete_task_api_handlers;
use crate::elasticsearch_api::elastic_api_handlers;
use crate::health_check_api::health_check_handlers;
@@ -273,6 +274,16 @@ fn get_status_with_error(rejection: Rejection) -> RestApiError {
service_code: ServiceErrorCode::UnsupportedMediaType,
message: error.to_string(),
}
+ } else if let Some(error) = rejection.find::() {
+ RestApiError {
+ service_code: ServiceErrorCode::UnsupportedMediaType,
+ message: error.to_string(),
+ }
+ } else if let Some(error) = rejection.find::() {
+ RestApiError {
+ service_code: ServiceErrorCode::BadRequest,
+ message: error.to_string(),
+ }
} else if let Some(error) = rejection.find::() {
RestApiError {
service_code: ServiceErrorCode::BadRequest,
diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py
index 5b3fa39fab5..5e65e6668ff 100755
--- a/quickwit/rest-api-tests/run_tests.py
+++ b/quickwit/rest-api-tests/run_tests.py
@@ -52,12 +52,6 @@ def run_step(step, previous_result):
time.sleep(step["sleep_after"])
return result
-def load_data(path):
- if path.endswith("gz"):
- return gzip.open(path, 'rb').read()
- else:
- return open(path, 'rb').read()
-
def run_request_with_retry(run_req, expected_status_code=None, num_retries=10, wait_time=0.5):
for try_number in range(num_retries + 1):
r = run_req()
@@ -100,7 +94,8 @@ def run_request_step(method, step, previous_result):
body_from_file = step.get("body_from_file", None)
if body_from_file is not None:
body_from_file = osp.join(step["cwd"], body_from_file)
- kvargs["data"] = load_data(body_from_file)
+ kvargs["data"] = open(body_from_file, 'rb').read()
+
kvargs = resolve_previous_result(kvargs, previous_result)
ndjson = step.get("ndjson", None)
if ndjson is not None:
diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml
index 1e29d7ffead..12390f6bade 100644
--- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.elasticsearch.yaml
@@ -102,6 +102,6 @@ method: POST
endpoint: _bulk
params:
refresh: "true"
-headers: {"Content-Type": "application/json"}
+headers: {"Content-Type": "application/json", "content-encoding": "gzip"}
body_from_file: gharchive-bulk.json.gz
diff --git a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml
index bdc10d1249b..2e227090a9c 100644
--- a/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml
+++ b/quickwit/rest-api-tests/scenarii/es_compatibility/_setup.quickwit.yaml
@@ -56,6 +56,6 @@ endpoint: _bulk
num_retries: 10
params:
refresh: "true"
-headers: {"Content-Type": "application/json"}
+headers: {"Content-Type": "application/json", "content-encoding": "gzip"}
body_from_file: gharchive-bulk.json.gz
sleep_after: 3