Skip to content

Commit 08e0013

Browse files
authored
Compress requests and responses for ingest v2 endpoints (#5779)
* Compress requests and responses for ingest v2 endpoints * Switch to ZSTD * Test code-generated client/server compression * Make gRPC ingest client compression configurable * Document feature and upgrade
1 parent 75eb6fa commit 08e0013

File tree

21 files changed

+462
-46
lines changed

21 files changed

+462
-46
lines changed

docs/configuration/node-config.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ indexer:
178178
| `max_queue_memory_usage` | Maximum size in bytes of the in-memory Ingest queue. | `2GiB` |
179179
| `max_queue_disk_usage` | Maximum disk-space in bytes taken by the Ingest queue. The minimum size is at least `256M` and be at least `max_queue_memory_usage`. | `4GiB` |
180180
| `content_length_limit` | Maximum payload size uncompressed. Increasing this is discouraged, use a [file source](../ingest-data/sqs-files.md) instead. | `10MiB` |
181+
| `grpc_compression_algorithm` | Compression algorithm (`gzip` or `zstd`) to use for gRPC traffic between nodes for the ingest service | `None` |
181182

182183
Example:
183184

@@ -186,6 +187,7 @@ ingest_api:
186187
max_queue_memory_usage: 2GiB
187188
max_queue_disk_usage: 4GiB
188189
content_length_limit: 10MiB
190+
grpc_compression_algorithm: zstd
189191
```
190192

191193
## Searcher configuration

docs/operating/upgrades.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,4 @@ No migration is done if `otel-traces-v0_7` already exists. If you want `service_
2323

2424
Quickwit 0.9 introduces a new ingestion service to to power the ingest and bulk APIs (v2). The new ingest is enabled and used by default, even though the legacy one (v1) remains enabled to finish indexing residual data in the legacy write ahead logs. Note that `ingest_api.max_queue_disk_usage` is enforced on both ingest versions separately, which means that the cumulated disk usage might be up to twice this limit.
2525

26-
The control plane should be upgraded first in order to enable the new ingest source (v2) on all existing indexes. Ingested data into previously existing indexes on upgraded indexer nodes will not be picked by the indexing pipelines until the control plane is upgraded. Because the indexing plan is computed differently in 0.9, all pipelines will be restarted when upgrading the control plane. If possible, we recommend avoiding rolling upgrades for indexers. Instead, scale down the number of indexers to zero first, then upgrade the control plane and finally scale the upgraded indexers back up.
26+
The control plane should be upgraded first in order to enable the new ingest source (v2) on all existing indexes. Ingested data into previously existing indexes on upgraded indexer nodes will not be picked by the indexing pipelines until the control plane is upgraded. Because the indexing plan is computed differently in 0.9, all pipelines will be restarted when upgrading the control plane. If possible, we recommend avoiding rolling upgrades for indexers. Instead, scale down the number of indexers to zero first, then upgrade the control plane and finally scale the upgraded indexers back up. Finally, if you intend to enable compression for the ingest service (`ingest_api.grpc_compression_algorithm`), you must do so in two steps: first, upgrade the indexer nodes with compression disabled, then update the node configuration to enable compression, and finally restart the indexer nodes.

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ members = [
1010
"quickwit-common",
1111
"quickwit-config",
1212
"quickwit-control-plane",
13-
"quickwit-index-management",
1413
"quickwit-datetime",
1514
"quickwit-directories",
1615
"quickwit-doc-mapper",
16+
"quickwit-index-management",
1717
"quickwit-indexing",
1818
"quickwit-ingest",
1919
"quickwit-integration-tests",
@@ -245,9 +245,10 @@ tokio-stream = { version = "0.1", features = ["sync"] }
245245
tokio-util = { version = "0.7", features = ["full"] }
246246
toml = "0.7"
247247
tonic = { version = "0.13", features = [
248-
"gzip",
249248
"_tls-any",
249+
"gzip",
250250
"tls-native-roots",
251+
"zstd",
251252
] }
252253
tonic-build = "0.13"
253254
tonic-health = "0.13"

quickwit/quickwit-cluster/src/grpc_service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ pub(crate) async fn cluster_grpc_client(
4343

4444
ClusterServiceClient::tower()
4545
.stack_layer(CLUSTER_GRPC_CLIENT_METRICS_LAYER.clone())
46-
.build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE)
46+
.build_from_channel(socket_addr, channel, MAX_MESSAGE_SIZE, None)
4747
}
4848

4949
pub fn cluster_grpc_server(

quickwit/quickwit-codegen/example/src/codegen/hello.rs

Lines changed: 27 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-codegen/example/src/lib.rs

Lines changed: 150 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,9 @@ mod tests {
165165
use quickwit_common::tower::{BalanceChannel, Change, TimeoutLayer};
166166
use tokio::sync::mpsc::error::TrySendError;
167167
use tokio_stream::StreamExt;
168+
use tonic::codec::CompressionEncoding;
168169
use tonic::transport::{Endpoint, Server};
170+
use tonic::{Code, Status};
169171

170172
use super::*;
171173
use crate::hello::MockHello;
@@ -263,9 +265,8 @@ mod tests {
263265

264266
#[tokio::test]
265267
async fn test_hello_codegen_grpc() {
266-
let grpc_server_adapter = HelloGrpcServerAdapter::new(HelloImpl::default());
267-
let grpc_server: HelloGrpcServer<HelloGrpcServerAdapter> =
268-
HelloGrpcServer::new(grpc_server_adapter);
268+
let grpc_server =
269+
HelloClient::new(HelloImpl::default()).as_grpc_service(MAX_GRPC_MESSAGE_SIZE);
269270
let addr: SocketAddr = "127.0.0.1:6666".parse().unwrap();
270271

271272
tokio::spawn({
@@ -281,7 +282,7 @@ mod tests {
281282
"127.0.0.1:6666".parse().unwrap(),
282283
Endpoint::from_static("http://127.0.0.1:6666").connect_lazy(),
283284
);
284-
let grpc_client = HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE);
285+
let grpc_client = HelloClient::from_balance_channel(channel, MAX_GRPC_MESSAGE_SIZE, None);
285286

286287
assert_eq!(
287288
grpc_client
@@ -340,7 +341,8 @@ mod tests {
340341

341342
// The connectivity check fails if there is no client behind the channel.
342343
let (balance_channel, _): (BalanceChannel<SocketAddr>, _) = BalanceChannel::new();
343-
let grpc_client = HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE);
344+
let grpc_client =
345+
HelloClient::from_balance_channel(balance_channel, MAX_GRPC_MESSAGE_SIZE, None);
344346
assert_eq!(
345347
grpc_client
346348
.check_connectivity()
@@ -351,6 +353,146 @@ mod tests {
351353
);
352354
}
353355

356+
#[tokio::test]
357+
async fn test_hello_codegen_grpc_with_compression() {
358+
#[derive(Debug, Clone)]
359+
struct CheckCompression<S> {
360+
inner: S,
361+
}
362+
363+
impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for CheckCompression<S>
364+
where
365+
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>
366+
+ Clone
367+
+ Send
368+
+ 'static,
369+
S::Future: Send + 'static,
370+
ReqBody: Send + 'static,
371+
{
372+
type Response = S::Response;
373+
type Error = S::Error;
374+
type Future = BoxFuture<Self::Response, Self::Error>;
375+
376+
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
377+
self.inner.poll_ready(cx)
378+
}
379+
380+
fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
381+
let Some(grpc_encoding) = request.headers().get("grpc-encoding") else {
382+
panic!("request should be compressed");
383+
};
384+
assert!(grpc_encoding.to_str().unwrap().contains("zstd"));
385+
386+
let Some(grpc_accept_encoding) = request.headers().get("grpc-accept-encoding")
387+
else {
388+
panic!("client should accept compressed responses");
389+
};
390+
assert!(grpc_accept_encoding.to_str().unwrap().contains("zstd"));
391+
let fut = self.inner.call(request);
392+
393+
Box::pin(async move {
394+
let response = fut.await?;
395+
396+
let grpc_status_code = Status::from_header_map(response.headers())
397+
.map(|status| status.code())
398+
.unwrap_or(Code::Ok);
399+
400+
if grpc_status_code == Code::Ok {
401+
let Some(grpc_encoding) = response.headers().get("grpc-encoding") else {
402+
panic!("response should be compressed");
403+
};
404+
assert!(grpc_encoding.to_str().unwrap().contains("zstd"));
405+
}
406+
Ok(response)
407+
})
408+
}
409+
}
410+
411+
#[derive(Debug, Clone)]
412+
struct CheckCompressionLayer;
413+
414+
impl<S> Layer<S> for CheckCompressionLayer {
415+
type Service = CheckCompression<S>;
416+
417+
fn layer(&self, inner: S) -> Self::Service {
418+
Self::Service { inner }
419+
}
420+
}
421+
422+
let grpc_server =
423+
HelloClient::new(HelloImpl::default()).as_grpc_service(MAX_GRPC_MESSAGE_SIZE);
424+
let addr: SocketAddr = "127.0.0.1:33333".parse().unwrap();
425+
426+
tokio::spawn({
427+
async move {
428+
Server::builder()
429+
.layer(CheckCompressionLayer)
430+
.add_service(grpc_server)
431+
.serve(addr)
432+
.await
433+
.unwrap();
434+
}
435+
});
436+
let channel = BalanceChannel::from_channel(
437+
"127.0.0.1:33333".parse().unwrap(),
438+
Endpoint::from_static("http://127.0.0.1:33333").connect_lazy(),
439+
);
440+
let grpc_client = HelloClient::from_balance_channel(
441+
channel,
442+
MAX_GRPC_MESSAGE_SIZE,
443+
Some(CompressionEncoding::Zstd),
444+
);
445+
446+
assert_eq!(
447+
grpc_client
448+
.hello(HelloRequest {
449+
name: "gRPC client".to_string()
450+
})
451+
.await
452+
.unwrap(),
453+
HelloResponse {
454+
message: "Hello, gRPC client!".to_string()
455+
}
456+
);
457+
458+
assert!(matches!(
459+
grpc_client
460+
.hello(HelloRequest {
461+
name: "".to_string()
462+
})
463+
.await
464+
.unwrap_err(),
465+
HelloError::InvalidArgument(_)
466+
));
467+
468+
let (ping_stream_tx, ping_stream) = ServiceStream::new_bounded(1);
469+
let mut pong_stream = grpc_client.ping(ping_stream).await.unwrap();
470+
471+
ping_stream_tx
472+
.try_send(PingRequest {
473+
name: "gRPC client".to_string(),
474+
})
475+
.unwrap();
476+
assert_eq!(
477+
pong_stream.next().await.unwrap().unwrap().message,
478+
"Pong, gRPC client!"
479+
);
480+
481+
ping_stream_tx
482+
.try_send(PingRequest {
483+
name: "stop".to_string(),
484+
})
485+
.unwrap();
486+
assert!(pong_stream.next().await.is_none());
487+
488+
let error = ping_stream_tx
489+
.try_send(PingRequest {
490+
name: "stop".to_string(),
491+
})
492+
.unwrap_err();
493+
assert!(matches!(error, TrySendError::Closed(_)));
494+
}
495+
354496
#[tokio::test]
355497
async fn test_hello_codegen_actor() {
356498
#[derive(Debug)]
@@ -646,7 +788,7 @@ mod tests {
646788
"127.0.0.1:7777".parse().unwrap(),
647789
Endpoint::from_static("http://127.0.0.1:7777").connect_lazy(),
648790
);
649-
HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE);
791+
HelloClient::from_balance_channel(balance_channed, MAX_GRPC_MESSAGE_SIZE, None);
650792
}
651793

652794
#[tokio::test]
@@ -734,7 +876,7 @@ mod tests {
734876
.timeout(Duration::from_millis(100))
735877
.connect_lazy();
736878
let max_message_size = ByteSize::mib(1);
737-
let grpc_client = HelloClient::from_channel(addr, channel, max_message_size);
879+
let grpc_client = HelloClient::from_channel(addr, channel, max_message_size, None);
738880

739881
let error = grpc_client
740882
.hello(HelloRequest {
@@ -813,7 +955,7 @@ mod tests {
813955
// this test hangs forever if we comment out the TimeoutLayer, which
814956
// shows that a request without explicit timeout might hang forever
815957
.stack_layer(TimeoutLayer::new(Duration::from_secs(3)))
816-
.build_from_balance_channel(balance_channel, ByteSize::mib(1));
958+
.build_from_balance_channel(balance_channel, ByteSize::mib(1), None);
817959

818960
let response_fut = async move {
819961
grpc_client

0 commit comments

Comments
 (0)