From eaceb17be8beb7a67ac7dd4f6953844895d3c02b Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Tue, 28 Jan 2025 12:10:08 +0100 Subject: [PATCH] Add high throughput integration test --- docs/internals/ingest-v2.md | 7 +- docs/operating/upgrades.md | 2 +- quickwit/quickwit-cli/src/index.rs | 7 ++ .../src/test_utils/cluster_sandbox.rs | 8 +- .../src/tests/ingest_v2_tests.rs | 100 +++++++++++++++--- quickwit/quickwit-rest-client/src/models.rs | 90 ++++++++++++++++ .../quickwit-rest-client/src/rest_client.rs | 54 +++++++--- .../quickwit-serve/src/ingest_api/response.rs | 67 ------------ 8 files changed, 233 insertions(+), 102 deletions(-) diff --git a/docs/internals/ingest-v2.md b/docs/internals/ingest-v2.md index 4c46b2f824a..049c9347c2c 100644 --- a/docs/internals/ingest-v2.md +++ b/docs/internals/ingest-v2.md @@ -4,7 +4,11 @@ Ingest V2 is the latest ingestion API that is designed to be more efficient and ## Architecture -Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. The assigned shard can be local or on another indexer. The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes. The progress within each shard is not tracked as an index metadata checkpoint anymore but in a dedicated metastore `shards` table. +Just like ingest V1, the new ingest uses [`mrecordlog`](https://github.com/quickwit-oss/mrecordlog) to persist ingested documents that are waiting to be indexed. But unlike V1, which always persists the documents locally on the node that receives them, ingest V2 can dynamically distribute them into WAL units called _shards_. Here are a few key behaviors of this new mechanism: +- When an indexer receives a document for ingestion, the assigned shard can be local or on another indexer. +- The control plane is in charge of distributing the shards to balance the indexing work as well as possible across all indexer nodes. +- Each shard has a throughput limit (5MB). If the ingest rate on an index is becoming greater than the cumulated throughput of all its shards, the control plane schedules the creation of new shards. Note that when the cumulated throughput is exceeded on an index, the ingest API returns "too many requests" errors until the new shards are effectively created. +- The progress within each shard is tracked in a dedicated metastore `shards` table (instead of the index metadata checkpoint like for other sources). In the future, the shard based ingest will also be capable of writing a replica for each shard, thus ensuring a high durability of the documents that are waiting to be indexed (durability of the indexed documents is guarantied by the object store). @@ -33,3 +37,4 @@ See [full configuration example](https://github.com/quickwit-oss/quickwit/blob/m - `ingest_api.replication_factor`, not working yet - ingest V1 always writes to the WAL of the node receiving the request, V2 potentially forwards it to another node, dynamically assigned by the control plane to distribute the indexing work more evenly. - ingest V2 parses and validates input documents synchronously. Schema and JSON formatting errors are returned in the ingest response (for ingest V1 those errors were available in the server logs only). +- ingest V2 returns transient 429 (too many requests) errors when the ingestion rate is too fast diff --git a/docs/operating/upgrades.md b/docs/operating/upgrades.md index ec60855baeb..053e53972b4 100644 --- a/docs/operating/upgrades.md +++ b/docs/operating/upgrades.md @@ -1,5 +1,5 @@ --- -title: Version 0.7 upgrade +title: Version upgrade sidebar_position: 4 --- diff --git a/quickwit/quickwit-cli/src/index.rs b/quickwit/quickwit-cli/src/index.rs index 41851ea52f7..eaf55d172c8 100644 --- a/quickwit/quickwit-cli/src/index.rs +++ b/quickwit/quickwit-cli/src/index.rs @@ -1072,6 +1072,13 @@ pub async fn ingest_docs_cli(args: IngestDocsArgs) -> anyhow::Result<()> { println!("└ document: {}", failure.document); } } + if response.num_too_many_requests > 0 { + println!("Retried request counts:"); + println!( + " 429 (too many requests) = {}", + response.num_too_many_requests + ); + } Ok(()) } diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 40d5c70277a..be5db89be9b 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -34,14 +34,12 @@ use quickwit_proto::jaeger::storage::v1::span_reader_plugin_client::SpanReaderPl use quickwit_proto::opentelemetry::proto::collector::logs::v1::logs_service_client::LogsServiceClient; use quickwit_proto::opentelemetry::proto::collector::trace::v1::trace_service_client::TraceServiceClient; use quickwit_proto::types::NodeId; -use quickwit_rest_client::models::IngestSource; +use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource}; use quickwit_rest_client::rest_client::{ CommitType, QuickwitClient, QuickwitClientBuilder, DEFAULT_BASE_URL, }; use quickwit_serve::tcp_listener::for_tests::TestTcpListenerResolver; -use quickwit_serve::{ - serve_quickwit, ListSplitsQueryParams, RestIngestResponse, SearchRequestQueryString, -}; +use quickwit_serve::{serve_quickwit, ListSplitsQueryParams, SearchRequestQueryString}; use quickwit_storage::StorageResolver; use reqwest::Url; use serde_json::Value; @@ -243,7 +241,7 @@ pub(crate) async fn ingest( index_id: &str, ingest_source: IngestSource, commit_type: CommitType, -) -> anyhow::Result { +) -> anyhow::Result { let resp = client .ingest(index_id, ingest_source, None, None, commit_type) .await?; diff --git a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs index 80cb081b9a1..673e7b7ffee 100644 --- a/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs +++ b/quickwit/quickwit-integration-tests/src/tests/ingest_v2_tests.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::time::Duration; +use std::time::{Duration, Instant}; use futures_util::FutureExt; use itertools::Itertools; @@ -23,9 +23,9 @@ use quickwit_indexing::actors::INDEXING_DIR_NAME; use quickwit_metastore::SplitState; use quickwit_proto::ingest::ParseFailureReason; use quickwit_rest_client::error::{ApiError, Error}; -use quickwit_rest_client::models::IngestSource; +use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource}; use quickwit_rest_client::rest_client::CommitType; -use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure}; +use quickwit_serve::{ListSplitsQueryParams, RestParseFailure, SearchRequestQueryString}; use serde_json::json; use crate::ingest_json; @@ -306,11 +306,11 @@ async fn test_ingest_v2_happy_path() { .unwrap(); assert_eq!( ingest_resp, - RestIngestResponse { + CumulatedIngestResponse { num_docs_for_processing: 1, num_ingested_docs: Some(1), num_rejected_docs: Some(0), - parse_failures: None, + ..Default::default() }, ); @@ -332,6 +332,77 @@ async fn test_ingest_v2_happy_path() { sandbox.shutdown().await.unwrap(); } +#[tokio::test] +async fn test_ingest_v2_high_throughput() { + let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; + let index_id = "test_high_throughput"; + let index_config = format!( + r#" + version: 0.8 + index_id: {index_id} + doc_mapping: + field_mappings: + - name: body + type: text + indexing_settings: + commit_timeout_secs: 1 + "# + ); + sandbox + .rest_client(QuickwitService::Indexer) + .indexes() + .create(index_config, ConfigFormat::Yaml, false) + .await + .unwrap(); + + let body_size = 20 * 1000 * 1000; + let line = json!({"body": "my dummy repeated payload"}).to_string(); + let num_docs = body_size / line.len(); + let body = std::iter::repeat_n(&line, num_docs).join("\n"); + let ingest_resp = ingest( + &sandbox.rest_client(QuickwitService::Indexer), + index_id, + IngestSource::Str(body), + CommitType::Auto, + ) + .await + .unwrap(); + assert_eq!(ingest_resp.num_docs_for_processing, num_docs as u64); + assert_eq!(ingest_resp.num_ingested_docs, Some(num_docs as u64)); + assert_eq!(ingest_resp.num_rejected_docs, Some(0)); + // num_too_many_requests might actually be > 0 + + let searcher_client = sandbox.rest_client(QuickwitService::Searcher); + // wait for the docs to be indexed + let start_time = Instant::now(); + loop { + let res = searcher_client + .search( + index_id, + SearchRequestQueryString { + query: "*".to_string(), + ..Default::default() + }, + ) + .await; + if let Ok(success_resp) = res { + if success_resp.num_hits == num_docs as u64 { + break; + } + } + if start_time.elapsed() > Duration::from_secs(20) { + panic!( + "didn't manage to index {} docs in {:?}", + num_docs, + start_time.elapsed() + ); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + sandbox.shutdown().await.unwrap(); +} + #[tokio::test] async fn test_commit_force() { let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; @@ -372,11 +443,11 @@ async fn test_commit_force() { .unwrap(); assert_eq!( ingest_resp, - RestIngestResponse { + CumulatedIngestResponse { num_docs_for_processing: 1, num_ingested_docs: Some(1), num_rejected_docs: Some(0), - parse_failures: None, + ..Default::default() }, ); @@ -452,20 +523,20 @@ async fn test_commit_wait_for() { let (ingest_resp_1, ingest_resp_2) = tokio::join!(ingest_1_fut, ingest_2_fut); assert_eq!( ingest_resp_1, - RestIngestResponse { + CumulatedIngestResponse { num_docs_for_processing: 1, num_ingested_docs: Some(1), num_rejected_docs: Some(0), - parse_failures: None, + ..Default::default() }, ); assert_eq!( ingest_resp_2, - RestIngestResponse { + CumulatedIngestResponse { num_docs_for_processing: 1, num_ingested_docs: Some(1), num_rejected_docs: Some(0), - parse_failures: None, + ..Default::default() }, ); @@ -523,11 +594,11 @@ async fn test_commit_auto() { .unwrap(); assert_eq!( ingest_resp, - RestIngestResponse { + CumulatedIngestResponse { num_docs_for_processing: 1, num_ingested_docs: Some(1), num_rejected_docs: Some(0), - parse_failures: None, + ..Default::default() }, ); @@ -577,7 +648,7 @@ async fn test_detailed_ingest_response() { assert_eq!( ingest_resp, - RestIngestResponse { + CumulatedIngestResponse { num_docs_for_processing: 2, num_ingested_docs: Some(1), num_rejected_docs: Some(1), @@ -586,6 +657,7 @@ async fn test_detailed_ingest_response() { message: "failed to parse JSON document".to_string(), reason: ParseFailureReason::InvalidJson, }]), + ..Default::default() }, ); sandbox.shutdown().await.unwrap(); diff --git a/quickwit/quickwit-rest-client/src/models.rs b/quickwit/quickwit-rest-client/src/models.rs index e110571121a..878aa74fe2b 100644 --- a/quickwit/quickwit-rest-client/src/models.rs +++ b/quickwit/quickwit-rest-client/src/models.rs @@ -15,6 +15,7 @@ use std::path::PathBuf; use std::time::Duration; +use quickwit_serve::{RestIngestResponse, RestParseFailure}; use reqwest::StatusCode; use serde::de::DeserializeOwned; @@ -78,6 +79,43 @@ pub enum IngestSource { Stdin, } +#[derive(Debug, PartialEq, Default)] +pub struct CumulatedIngestResponse { + pub num_docs_for_processing: u64, + pub num_ingested_docs: Option, + pub num_rejected_docs: Option, + pub parse_failures: Option>, + pub num_too_many_requests: u64, +} + +impl CumulatedIngestResponse { + /// Aggregates ingest counts and errors. + pub fn merge(self, other: RestIngestResponse) -> Self { + Self { + num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing, + num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| { + a + b + }), + num_rejected_docs: apply_op(self.num_rejected_docs, other.num_rejected_docs, |a, b| { + a + b + }), + parse_failures: apply_op(self.parse_failures, other.parse_failures, |a, b| { + a.into_iter().chain(b).collect() + }), + num_too_many_requests: self.num_too_many_requests, + } + } +} + +fn apply_op(a: Option, b: Option, f: impl Fn(T, T) -> T) -> Option { + match (a, b) { + (Some(a), Some(b)) => Some(f(a, b)), + (Some(a), None) => Some(a), + (None, Some(b)) => Some(b), + (None, None) => None, + } +} + /// A structure that represent a timeout. Unlike Duration it can also represent an infinite or no /// timeout value. #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)] @@ -133,3 +171,55 @@ impl Timeout { } } } + +#[cfg(test)] +mod test { + use quickwit_proto::ingest::ParseFailureReason; + + use super::*; + + #[test] + fn test_merge_responses() { + let mut merged_response = CumulatedIngestResponse::default(); + let response1 = RestIngestResponse { + num_docs_for_processing: 10, + num_ingested_docs: Some(5), + num_rejected_docs: Some(2), + parse_failures: Some(vec![RestParseFailure { + message: "error1".to_string(), + document: "doc1".to_string(), + reason: ParseFailureReason::InvalidJson, + }]), + }; + let response2 = RestIngestResponse { + num_docs_for_processing: 15, + num_ingested_docs: Some(10), + num_rejected_docs: Some(3), + parse_failures: Some(vec![RestParseFailure { + message: "error2".to_string(), + document: "doc2".to_string(), + reason: ParseFailureReason::InvalidJson, + }]), + }; + merged_response = merged_response.merge(response1); + merged_response = merged_response.merge(response2); + assert_eq!(merged_response.num_docs_for_processing, 25); + assert_eq!(merged_response.num_ingested_docs.unwrap(), 15); + assert_eq!(merged_response.num_rejected_docs.unwrap(), 5); + assert_eq!( + merged_response.parse_failures.unwrap(), + vec![ + RestParseFailure { + message: "error1".to_string(), + document: "doc1".to_string(), + reason: ParseFailureReason::InvalidJson, + }, + RestParseFailure { + message: "error2".to_string(), + document: "doc2".to_string(), + reason: ParseFailureReason::InvalidJson, + } + ] + ); + } +} diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 401163b2d2e..a8cff9f99cf 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -23,16 +23,14 @@ pub use quickwit_ingest::CommitType; use quickwit_metastore::{IndexMetadata, Split, SplitInfo}; use quickwit_proto::ingest::Shard; use quickwit_search::SearchResponseRest; -use quickwit_serve::{ - ListSplitsQueryParams, ListSplitsResponse, RestIngestResponse, SearchRequestQueryString, -}; +use quickwit_serve::{ListSplitsQueryParams, ListSplitsResponse, SearchRequestQueryString}; use reqwest::header::{HeaderMap, HeaderValue, CONTENT_TYPE}; use reqwest::{Client, ClientBuilder, Method, StatusCode, Url}; use serde::Serialize; use serde_json::json; use crate::error::Error; -use crate::models::{ApiResponse, IngestSource, Timeout}; +use crate::models::{ApiResponse, CumulatedIngestResponse, IngestSource, Timeout}; use crate::BatchLineReader; pub const DEFAULT_BASE_URL: &str = "http://127.0.0.1:7280"; @@ -263,7 +261,7 @@ impl QuickwitClient { batch_size_limit_opt: Option, mut on_ingest_event: Option<&mut (dyn FnMut(IngestEvent) + Sync)>, last_block_commit: CommitType, - ) -> Result { + ) -> Result { let ingest_path = format!("{index_id}/ingest"); let mut query_params = HashMap::new(); // TODO(#5604) @@ -283,7 +281,7 @@ impl QuickwitClient { BatchLineReader::from_string(ingest_payload, batch_size_limit) } }; - let mut cumulated_resp = RestIngestResponse::default(); + let mut cumulated_resp = CumulatedIngestResponse::default(); while let Some(batch) = batch_reader.next_batch().await? { loop { let timeout = if !batch_reader.has_next() && last_block_commit != CommitType::Auto { @@ -312,15 +310,16 @@ impl QuickwitClient { ) .await?; if response.status_code() == StatusCode::TOO_MANY_REQUESTS { - if let Some(event_fn) = &mut on_ingest_event { - event_fn(IngestEvent::Sleep) - } - tokio::time::sleep(Duration::from_millis(500)).await; + cumulated_resp.num_too_many_requests += 1; } else { let current_parsed_resp = response.deserialize().await?; cumulated_resp = cumulated_resp.merge(current_parsed_resp); break; } + if let Some(event_fn) = &mut on_ingest_event { + event_fn(IngestEvent::Sleep) + } + tokio::time::sleep(Duration::from_millis(500)).await; } if let Some(event_fn) = &mut on_ingest_event { event_fn(IngestEvent::IngestedDocBatch(batch.len())) @@ -750,7 +749,7 @@ mod test { use wiremock::{Mock, MockServer, ResponseTemplate}; use crate::error::Error; - use crate::models::IngestSource; + use crate::models::{CumulatedIngestResponse, IngestSource}; use crate::rest_client::QuickwitClientBuilder; #[tokio::test] @@ -847,7 +846,16 @@ mod test { .ingest("my-index", ingest_source, None, None, CommitType::Auto) .await .unwrap(); - assert_eq!(actual_response, mock_response); + assert_eq!( + actual_response, + CumulatedIngestResponse { + num_docs_for_processing: 2, + num_ingested_docs: Some(2), + num_rejected_docs: Some(0), + parse_failures: Some(Vec::new()), + ..Default::default() + } + ); } #[tokio::test] @@ -882,7 +890,16 @@ mod test { .ingest("my-index", ingest_source, None, None, CommitType::Force) .await .unwrap(); - assert_eq!(actual_response, mock_response); + assert_eq!( + actual_response, + CumulatedIngestResponse { + num_docs_for_processing: 2, + num_ingested_docs: Some(2), + num_rejected_docs: Some(0), + parse_failures: Some(Vec::new()), + ..Default::default() + } + ); } #[tokio::test] @@ -917,7 +934,16 @@ mod test { .ingest("my-index", ingest_source, None, None, CommitType::WaitFor) .await .unwrap(); - assert_eq!(actual_response, mock_response); + assert_eq!( + actual_response, + CumulatedIngestResponse { + num_docs_for_processing: 2, + num_ingested_docs: Some(2), + num_rejected_docs: Some(0), + parse_failures: Some(Vec::new()), + ..Default::default() + } + ); } #[tokio::test] diff --git a/quickwit/quickwit-serve/src/ingest_api/response.rs b/quickwit/quickwit-serve/src/ingest_api/response.rs index ab0e96165d1..c649814d133 100644 --- a/quickwit/quickwit-serve/src/ingest_api/response.rs +++ b/quickwit/quickwit-serve/src/ingest_api/response.rs @@ -98,32 +98,8 @@ impl RestIngestResponse { } Ok(resp) } - - /// Aggregates ingest counts and errors. - pub fn merge(self, other: Self) -> Self { - Self { - num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing, - num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| { - a + b - }), - num_rejected_docs: apply_op(self.num_rejected_docs, other.num_rejected_docs, |a, b| { - a + b - }), - parse_failures: apply_op(self.parse_failures, other.parse_failures, |a, b| { - a.into_iter().chain(b).collect() - }), - } - } } -fn apply_op(a: Option, b: Option, f: impl Fn(T, T) -> T) -> Option { - match (a, b) { - (Some(a), Some(b)) => Some(f(a, b)), - (Some(a), None) => Some(a), - (None, Some(b)) => Some(b), - (None, None) => None, - } -} #[cfg(test)] mod tests { use quickwit_proto::ingest::router::{IngestFailure, IngestFailureReason, IngestSuccess}; @@ -204,47 +180,4 @@ mod tests { let result = RestIngestResponse::from_ingest_v2(failure_resp, None, 10); assert!(result.is_err()); } - - #[test] - fn test_merge_responses() { - let response1 = RestIngestResponse { - num_docs_for_processing: 10, - num_ingested_docs: Some(5), - num_rejected_docs: Some(2), - parse_failures: Some(vec![RestParseFailure { - message: "error1".to_string(), - document: "doc1".to_string(), - reason: ParseFailureReason::InvalidJson, - }]), - }; - let response2 = RestIngestResponse { - num_docs_for_processing: 15, - num_ingested_docs: Some(10), - num_rejected_docs: Some(3), - parse_failures: Some(vec![RestParseFailure { - message: "error2".to_string(), - document: "doc2".to_string(), - reason: ParseFailureReason::InvalidJson, - }]), - }; - let merged_response = response1.merge(response2); - assert_eq!(merged_response.num_docs_for_processing, 25); - assert_eq!(merged_response.num_ingested_docs.unwrap(), 15); - assert_eq!(merged_response.num_rejected_docs.unwrap(), 5); - assert_eq!( - merged_response.parse_failures.unwrap(), - vec![ - RestParseFailure { - message: "error1".to_string(), - document: "doc1".to_string(), - reason: ParseFailureReason::InvalidJson, - }, - RestParseFailure { - message: "error2".to_string(), - document: "doc2".to_string(), - reason: ParseFailureReason::InvalidJson, - } - ] - ); - } }