diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index d289332f6be..b8c48cd0db1 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -6285,6 +6285,7 @@ dependencies = [ "bytes", "bytesize", "elasticsearch-dsl", + "flate2", "futures", "futures-util", "hex", @@ -6339,6 +6340,7 @@ dependencies = [ "tracing-opentelemetry", "utoipa", "warp", + "zstd 0.13.0", ] [[package]] diff --git a/quickwit/quickwit-serve/Cargo.toml b/quickwit/quickwit-serve/Cargo.toml index 744fc648340..b59e73c3172 100644 --- a/quickwit/quickwit-serve/Cargo.toml +++ b/quickwit/quickwit-serve/Cargo.toml @@ -16,6 +16,7 @@ base64 = { workspace = true } bytes = { workspace = true } bytesize = { workspace = true } elasticsearch-dsl = "0.4.15" +flate2 = { workspace = true } futures = { workspace = true } futures-util = { workspace = true } hex = { workspace = true } @@ -46,6 +47,7 @@ tracing = { workspace = true } tracing-opentelemetry = { workspace = true } utoipa = { workspace = true } warp = { workspace = true } +zstd = { workspace = true } quickwit-actors = { workspace = true } quickwit-cluster = { workspace = true } diff --git a/quickwit/quickwit-serve/src/decompression.rs b/quickwit/quickwit-serve/src/decompression.rs new file mode 100644 index 00000000000..a77fc2c3b7f --- /dev/null +++ b/quickwit/quickwit-serve/src/decompression.rs @@ -0,0 +1,90 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use core::fmt; +use std::io::Read; + +use bytes::Bytes; +use flate2::read::GzDecoder; +use tokio::task; +use warp::reject::Reject; +use warp::Filter; + +/// There are two ways to decompress the body: +/// - Stream the body through an async decompressor +/// - Fetch the body and then decompress the bytes +/// +/// The first approach lowers the latency, while the second approach is more CPU efficient. +/// Ingesting data is usually CPU bound and there is considerable latency until the data is +/// searchable, so the second approach is more suitable for this use case. +async fn decompress_body(encoding: Option, body: Bytes) -> Result { + match encoding.as_deref() { + Some("gzip" | "x-gzip") => { + let decompressed = task::spawn_blocking(move || { + let mut decompressed = Vec::new(); + let mut decoder = GzDecoder::new(body.as_ref()); + decoder + .read_to_end(&mut decompressed) + .map_err(|_| warp::reject())?; + Result::<_, warp::Rejection>::Ok(Bytes::from(decompressed)) + }) + .await + .map_err(|_| warp::reject())??; + Ok(decompressed) + } + Some("zstd") => { + let decompressed = task::spawn_blocking(move || { + zstd::decode_all(body.as_ref()) + .map(Bytes::from) + .map_err(|_| warp::reject()) + }) + .await + .map_err(|_| warp::reject())??; + Ok(decompressed) + } + Some(encoding) => Err(warp::reject::custom(UnsupportedEncoding( + encoding.to_string(), + ))), + _ => Ok(body), + } +} + +#[derive(Debug)] +struct UnsupportedEncoding(String); + +impl fmt::Display for UnsupportedEncoding { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "Unsupported Content-Encoding {}. Supported encodings are 'gzip' and 'zstd'.", + self.0 + ) + } +} + +impl Reject for UnsupportedEncoding {} + +/// Custom filter for optional decompression +pub(crate) fn get_body_bytes() -> impl Filter + Clone { + warp::header::optional("content-encoding") + .and(warp::body::bytes()) + .and_then(|encoding: Option, body: Bytes| async move { + decompress_body(encoding, body).await + }) +} diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs index 4ea8e12e757..3825f97939f 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/filter.rs @@ -27,6 +27,7 @@ use super::model::{ CatIndexQueryParams, FieldCapabilityQueryParams, FieldCapabilityRequestBody, MultiSearchQueryParams, SearchQueryParamsCount, }; +use crate::decompression::get_body_bytes; use crate::elasticsearch_api::model::{ ElasticBulkOptions, ScrollQueryParams, SearchBody, SearchQueryParams, }; @@ -76,7 +77,7 @@ pub(crate) fn elastic_bulk_filter( .and(warp::body::content_length_limit( CONTENT_LENGTH_LIMIT.as_u64(), )) - .and(warp::body::bytes()) + .and(get_body_bytes()) .and(serde_qs::warp::query(serde_qs::Config::default())) } diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index fa958abae4b..828b3556ba3 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -32,6 +32,7 @@ use serde::Deserialize; use thiserror::Error; use warp::{Filter, Rejection}; +use crate::decompression::get_body_bytes; use crate::format::extract_format_from_qs; use crate::rest_api_response::into_rest_api_response; use crate::{with_arg, BodyFormat}; @@ -80,7 +81,7 @@ fn ingest_filter( .and(warp::body::content_length_limit( config.content_length_limit.as_u64(), )) - .and(warp::body::bytes()) + .and(get_body_bytes()) .and(serde_qs::warp::query::( serde_qs::Config::default(), )) @@ -104,7 +105,7 @@ fn ingest_v2_filter( .and(warp::body::content_length_limit( config.content_length_limit.as_u64(), )) - .and(warp::body::bytes()) + .and(get_body_bytes()) .and(serde_qs::warp::query::( serde_qs::Config::default(), )) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index fa5c992121f..6e1bd75dcc1 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -20,6 +20,7 @@ mod build_info; mod cluster_api; mod debugging_api; +mod decompression; mod delete_task_api; mod elasticsearch_api; mod format; diff --git a/quickwit/rest-api-tests/run_tests.py b/quickwit/rest-api-tests/run_tests.py index 5b3fa39fab5..6fbf7f91d46 100755 --- a/quickwit/rest-api-tests/run_tests.py +++ b/quickwit/rest-api-tests/run_tests.py @@ -100,7 +100,10 @@ def run_request_step(method, step, previous_result): body_from_file = step.get("body_from_file", None) if body_from_file is not None: body_from_file = osp.join(step["cwd"], body_from_file) - kvargs["data"] = load_data(body_from_file) + kvargs["data"] = open(body_from_file, 'rb').read() + if body_from_file.endswith("gz"): + kvargs.setdefault("headers")["content-encoding"] = "gzip" + kvargs = resolve_previous_result(kvargs, previous_result) ndjson = step.get("ndjson", None) if ndjson is not None: