Skip to content

Commit

Permalink
Add high throughput integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jan 28, 2025
1 parent ffd816b commit eaceb17
Show file tree
Hide file tree
Showing 8 changed files with 233 additions and 102 deletions.
7 changes: 6 additions & 1 deletion docs/internals/ingest-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@ Ingest V2 is the latest ingestion API that is designed to be more efficient and

## Architecture

Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. The assigned shard can be local or on another indexer. The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes. The progress within each shard is not tracked as an index metadata checkpoint anymore but in a dedicated metastore `shards` table.
Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. Here are a few key behaviors of this new mechanism:
- When an indexer receives a document for ingestion, the assigned shard can be local or on another indexer.
- The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes.
- Each shard has a throughput limit (5MB). If the ingest rate on an index is becoming greater than the cumulated throughput of all its shards, the control plane schedules the creation of new shards. Note that when the cumulated throughput is exceeded on an index, the ingest API returns "too many requests" errors until the new shards are effectively created.
- The progress within each shard is tracked in a dedicated metastore `shards` table (instead of the index metadata checkpoint like for other sources).

In the future, the shard based ingest will also be capable of writing a replica for each shard, thus ensuring a high durability of the documents that are waiting to be indexed (durability of the indexed documents is guarantied by the object store).

Expand Down Expand Up @@ -33,3 +37,4 @@ See [full configuration example](https://github.com/quickwit-oss/quickwit/blob/m
- `ingest_api.replication_factor`, not working yet
- ingest V1 always writes to the WAL of the node receiving the request, V2 potentially forwards it to another node, dynamically assigned by the control plane to distribute the indexing work more evenly.
- ingest V2 parses and validates input documents synchronously. Schema and JSON formatting errors are returned in the ingest response (for ingest V1 those errors were available in the server logs only).
- ingest V2 returns transient 429 (too many requests) errors when the ingestion rate is too fast
2 changes: 1 addition & 1 deletion docs/operating/upgrades.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
---
title: Version 0.7 upgrade
title: Version upgrade
sidebar_position: 4
---

Expand Down
7 changes: 7 additions & 0 deletions quickwit/quickwit-cli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,13 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> {
println!("└ document: {}", failure.document);
}
}
if response.num_too_many_requests > 0 {
println!("Retried request counts:");
println!(
" 429 (too many requests) = {}",
response.num_too_many_requests
);
}
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,12 @@ use quickwit_proto::jaeger::storage::v1::span_reader_plugin_client::SpanReaderPl
use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient;
use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_client::TraceServiceClient;
use quickwit_proto::types::NodeId;
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource};
use quickwit_rest_client::rest_client::{
CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL,
};
use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver;
use quickwit_serve::{
serve_quickwit, ListSplitsQueryParams, RestIngestResponse, SearchRequestQueryString,
};
use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString};
use quickwit_storage::StorageResolver;
use reqwest::Url;
use serde_json::Value;
Expand Down Expand Up @@ -243,7 +241,7 @@ pub(crate) async fn ingest(
index_id: &str,
ingest_source: IngestSource,
commit_type: CommitType,
) -> anyhow::Result<RestIngestResponse> {
) -> anyhow::Result<CumulatedIngestResponse> {
let resp = client
.ingest(index_id, ingest_source, None, None, commit_type)
.await?;
Expand Down
100 changes: 86 additions & 14 deletions quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;
use std::time::{Duration, Instant};

use futures_util::FutureExt;
use itertools::Itertools;
Expand All @@ -23,9 +23,9 @@ use quickwit_indexing::actors::INDEXING_DIR_NAME;
use quickwit_metastore::SplitState;
use quickwit_proto::ingest::ParseFailureReason;
use quickwit_rest_client::error::{ApiError, Error};
use quickwit_rest_client::models::IngestSource;
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource};
use quickwit_rest_client::rest_client::CommitType;
use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure};
use quickwit_serve::{ListSplitsQueryParams, RestParseFailure, SearchRequestQueryString};
use serde_json::json;

use crate::ingest_json;
Expand Down Expand Up @@ -306,11 +306,11 @@ async fn test_ingest_v2_happy_path() {
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

Expand All @@ -332,6 +332,77 @@ async fn test_ingest_v2_happy_path() {
sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_ingest_v2_high_throughput() {
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
let index_id = "test_high_throughput";
let index_config = format!(
r#"
version: 0.8
index_id: {index_id}
doc_mapping:
field_mappings:
- name: body
type: text
indexing_settings:
commit_timeout_secs: 1
"#
);
sandbox
.rest_client(QuickwitService::Indexer)
.indexes()
.create(index_config, ConfigFormat::Yaml, false)
.await
.unwrap();

let body_size = 20 * 1000 * 1000;
let line = json!({"body": "my dummy repeated payload"}).to_string();
let num_docs = body_size / line.len();
let body = std::iter::repeat_n(&line, num_docs).join("\n");
let ingest_resp = ingest(
&sandbox.rest_client(QuickwitService::Indexer),
index_id,
IngestSource::Str(body),
CommitType::Auto,
)
.await
.unwrap();
assert_eq!(ingest_resp.num_docs_for_processing, num_docs as u64);
assert_eq!(ingest_resp.num_ingested_docs, Some(num_docs as u64));
assert_eq!(ingest_resp.num_rejected_docs, Some(0));
// num_too_many_requests might actually be > 0

let searcher_client = sandbox.rest_client(QuickwitService::Searcher);
// wait for the docs to be indexed
let start_time = Instant::now();
loop {
let res = searcher_client
.search(
index_id,
SearchRequestQueryString {
query: "*".to_string(),
..Default::default()
},
)
.await;
if let Ok(success_resp) = res {
if success_resp.num_hits == num_docs as u64 {
break;
}
}
if start_time.elapsed() > Duration::from_secs(20) {
panic!(
"didn't manage to index {} docs in {:?}",
num_docs,
start_time.elapsed()
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}

sandbox.shutdown().await.unwrap();
}

#[tokio::test]
async fn test_commit_force() {
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await;
Expand Down Expand Up @@ -372,11 +443,11 @@ async fn test_commit_force() {
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

Expand Down Expand Up @@ -452,20 +523,20 @@ async fn test_commit_wait_for() {
let (ingest_resp_1, ingest_resp_2) = tokio::join!(ingest_1_fut, ingest_2_fut);
assert_eq!(
ingest_resp_1,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);
assert_eq!(
ingest_resp_2,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

Expand Down Expand Up @@ -523,11 +594,11 @@ async fn test_commit_auto() {
.unwrap();
assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 1,
num_ingested_docs: Some(1),
num_rejected_docs: Some(0),
parse_failures: None,
..Default::default()
},
);

Expand Down Expand Up @@ -577,7 +648,7 @@ async fn test_detailed_ingest_response() {

assert_eq!(
ingest_resp,
RestIngestResponse {
CumulatedIngestResponse {
num_docs_for_processing: 2,
num_ingested_docs: Some(1),
num_rejected_docs: Some(1),
Expand All @@ -586,6 +657,7 @@ async fn test_detailed_ingest_response() {
message: "failed to parse JSON document".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
..Default::default()
},
);
sandbox.shutdown().await.unwrap();
Expand Down
90 changes: 90 additions & 0 deletions quickwit/quickwit-rest-client/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use std::path::PathBuf;
use std::time::Duration;

use quickwit_serve::{RestIngestResponse, RestParseFailure};
use reqwest::StatusCode;
use serde::de::DeserializeOwned;

Expand Down Expand Up @@ -78,6 +79,43 @@ pub enum IngestSource {
Stdin,
}

#[derive(Debug, PartialEq, Default)]
pub struct CumulatedIngestResponse {
pub num_docs_for_processing: u64,
pub num_ingested_docs: Option<u64>,
pub num_rejected_docs: Option<u64>,
pub parse_failures: Option<Vec<RestParseFailure>>,
pub num_too_many_requests: u64,
}

impl CumulatedIngestResponse {
/// Aggregates ingest counts and errors.
pub fn merge(self, other: RestIngestResponse) -> Self {
Self {
num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing,
num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| {
a + b
}),
num_rejected_docs: apply_op(self.num_rejected_docs, other.num_rejected_docs, |a, b| {
a + b
}),
parse_failures: apply_op(self.parse_failures, other.parse_failures, |a, b| {
a.into_iter().chain(b).collect()
}),
num_too_many_requests: self.num_too_many_requests,
}
}
}

fn apply_op<T>(a: Option<T>, b: Option<T>, f: impl Fn(T, T) -> T) -> Option<T> {
match (a, b) {
(Some(a), Some(b)) => Some(f(a, b)),
(Some(a), None) => Some(a),
(None, Some(b)) => Some(b),
(None, None) => None,
}
}

/// A structure that represent a timeout. Unlike Duration it can also represent an infinite or no
/// timeout value.
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)]
Expand Down Expand Up @@ -133,3 +171,55 @@ impl Timeout {
}
}
}

#[cfg(test)]
mod test {
use quickwit_proto::ingest::ParseFailureReason;

use super::*;

#[test]
fn test_merge_responses() {
let mut merged_response = CumulatedIngestResponse::default();
let response1 = RestIngestResponse {
num_docs_for_processing: 10,
num_ingested_docs: Some(5),
num_rejected_docs: Some(2),
parse_failures: Some(vec![RestParseFailure {
message: "error1".to_string(),
document: "doc1".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
};
let response2 = RestIngestResponse {
num_docs_for_processing: 15,
num_ingested_docs: Some(10),
num_rejected_docs: Some(3),
parse_failures: Some(vec![RestParseFailure {
message: "error2".to_string(),
document: "doc2".to_string(),
reason: ParseFailureReason::InvalidJson,
}]),
};
merged_response = merged_response.merge(response1);
merged_response = merged_response.merge(response2);
assert_eq!(merged_response.num_docs_for_processing, 25);
assert_eq!(merged_response.num_ingested_docs.unwrap(), 15);
assert_eq!(merged_response.num_rejected_docs.unwrap(), 5);
assert_eq!(
merged_response.parse_failures.unwrap(),
vec![
RestParseFailure {
message: "error1".to_string(),
document: "doc1".to_string(),
reason: ParseFailureReason::InvalidJson,
},
RestParseFailure {
message: "error2".to_string(),
document: "doc2".to_string(),
reason: ParseFailureReason::InvalidJson,
}
]
);
}
}
Loading

0 comments on commit eaceb17

Please sign in to comment.