-
Notifications
You must be signed in to change notification settings - Fork 455
Add number of throttled query in CLI response #5655
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,5 +1,5 @@ | ||
--- | ||
title: Version 0.7 upgrade | ||
title: Version upgrade | ||
sidebar_position: 4 | ||
--- | ||
|
||
|
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -12,7 +12,7 @@ | |||||||||||||||||||||||||
// See the License for the specific language governing permissions and | ||||||||||||||||||||||||||
// limitations under the License. | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
use std::time::Duration; | ||||||||||||||||||||||||||
use std::time::{Duration, Instant}; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
use futures_util::FutureExt; | ||||||||||||||||||||||||||
use itertools::Itertools; | ||||||||||||||||||||||||||
|
@@ -23,9 +23,9 @@ use quickwit_indexing::actors::INDEXING_DIR_NAME; | |||||||||||||||||||||||||
use quickwit_metastore::SplitState; | ||||||||||||||||||||||||||
use quickwit_proto::ingest::ParseFailureReason; | ||||||||||||||||||||||||||
use quickwit_rest_client::error::{ApiError, Error}; | ||||||||||||||||||||||||||
use quickwit_rest_client::models::IngestSource; | ||||||||||||||||||||||||||
use quickwit_rest_client::models::{CumulatedIngestResponse, IngestSource}; | ||||||||||||||||||||||||||
use quickwit_rest_client::rest_client::CommitType; | ||||||||||||||||||||||||||
use quickwit_serve::{ListSplitsQueryParams, RestIngestResponse, RestParseFailure}; | ||||||||||||||||||||||||||
use quickwit_serve::{ListSplitsQueryParams, RestParseFailure, SearchRequestQueryString}; | ||||||||||||||||||||||||||
use serde_json::json; | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
use crate::ingest_json; | ||||||||||||||||||||||||||
|
@@ -306,11 +306,11 @@ async fn test_ingest_v2_happy_path() { | |||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||
assert_eq!( | ||||||||||||||||||||||||||
ingest_resp, | ||||||||||||||||||||||||||
RestIngestResponse { | ||||||||||||||||||||||||||
CumulatedIngestResponse { | ||||||||||||||||||||||||||
num_docs_for_processing: 1, | ||||||||||||||||||||||||||
num_ingested_docs: Some(1), | ||||||||||||||||||||||||||
num_rejected_docs: Some(0), | ||||||||||||||||||||||||||
parse_failures: None, | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -332,6 +332,83 @@ async fn test_ingest_v2_happy_path() { | |||||||||||||||||||||||||
sandbox.shutdown().await.unwrap(); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
#[tokio::test] | ||||||||||||||||||||||||||
async fn test_ingest_v2_high_throughput() { | ||||||||||||||||||||||||||
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; | ||||||||||||||||||||||||||
let index_id = "test_high_throughput"; | ||||||||||||||||||||||||||
let index_config = format!( | ||||||||||||||||||||||||||
r#" | ||||||||||||||||||||||||||
version: 0.8 | ||||||||||||||||||||||||||
index_id: {index_id} | ||||||||||||||||||||||||||
doc_mapping: | ||||||||||||||||||||||||||
field_mappings: | ||||||||||||||||||||||||||
- name: body | ||||||||||||||||||||||||||
type: text | ||||||||||||||||||||||||||
indexing_settings: | ||||||||||||||||||||||||||
commit_timeout_secs: 1 | ||||||||||||||||||||||||||
"# | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
sandbox | ||||||||||||||||||||||||||
.rest_client(QuickwitService::Indexer) | ||||||||||||||||||||||||||
.indexes() | ||||||||||||||||||||||||||
.create(index_config, ConfigFormat::Yaml, false) | ||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
let body_size = 20 * 1000 * 1000; | ||||||||||||||||||||||||||
let line = json!({"body": "my dummy repeated payload"}).to_string(); | ||||||||||||||||||||||||||
let num_docs = body_size / line.len(); | ||||||||||||||||||||||||||
let body = std::iter::repeat_n(&line, num_docs).join("\n"); | ||||||||||||||||||||||||||
let ingest_resp = sandbox | ||||||||||||||||||||||||||
.rest_client(QuickwitService::Indexer) | ||||||||||||||||||||||||||
.ingest( | ||||||||||||||||||||||||||
index_id, | ||||||||||||||||||||||||||
IngestSource::Str(body), | ||||||||||||||||||||||||||
// TODO: when using the default 10MiB batch size, we get persist | ||||||||||||||||||||||||||
// timeouts with code 500 on some lower performance machines (e.g. | ||||||||||||||||||||||||||
// Github runners). We should investigate why this happens exactly. | ||||||||||||||||||||||||||
Some(5_000_000), | ||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @guilload I didn't find a good explanation for why this timeout occur here in the persist quickwit/quickwit/quickwit-ingest/src/ingest_v2/router.rs Lines 432 to 443 in ce4501f
Persisting 10MB should not take 6 sec, even on a slow system and in debug mode, should it? |
||||||||||||||||||||||||||
None, | ||||||||||||||||||||||||||
CommitType::Auto, | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
.await | ||||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||
assert_eq!(ingest_resp.num_docs_for_processing, num_docs as u64); | ||||||||||||||||||||||||||
assert_eq!(ingest_resp.num_ingested_docs, Some(num_docs as u64)); | ||||||||||||||||||||||||||
assert_eq!(ingest_resp.num_rejected_docs, Some(0)); | ||||||||||||||||||||||||||
// num_too_many_requests might actually be > 0 | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
let searcher_client = sandbox.rest_client(QuickwitService::Searcher); | ||||||||||||||||||||||||||
// wait for the docs to be indexed | ||||||||||||||||||||||||||
let start_time = Instant::now(); | ||||||||||||||||||||||||||
loop { | ||||||||||||||||||||||||||
let res = searcher_client | ||||||||||||||||||||||||||
.search( | ||||||||||||||||||||||||||
index_id, | ||||||||||||||||||||||||||
SearchRequestQueryString { | ||||||||||||||||||||||||||
query: "*".to_string(), | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||
.await; | ||||||||||||||||||||||||||
if let Ok(success_resp) = res { | ||||||||||||||||||||||||||
if success_resp.num_hits == num_docs as u64 { | ||||||||||||||||||||||||||
break; | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
if start_time.elapsed() > Duration::from_secs(20) { | ||||||||||||||||||||||||||
panic!( | ||||||||||||||||||||||||||
"didn't manage to index {} docs in {:?}", | ||||||||||||||||||||||||||
num_docs, | ||||||||||||||||||||||||||
start_time.elapsed() | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
tokio::time::sleep(Duration::from_secs(1)).await; | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
sandbox.shutdown().await.unwrap(); | ||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
#[tokio::test] | ||||||||||||||||||||||||||
async fn test_commit_force() { | ||||||||||||||||||||||||||
let sandbox = ClusterSandboxBuilder::build_and_start_standalone().await; | ||||||||||||||||||||||||||
|
@@ -372,11 +449,11 @@ async fn test_commit_force() { | |||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||
assert_eq!( | ||||||||||||||||||||||||||
ingest_resp, | ||||||||||||||||||||||||||
RestIngestResponse { | ||||||||||||||||||||||||||
CumulatedIngestResponse { | ||||||||||||||||||||||||||
num_docs_for_processing: 1, | ||||||||||||||||||||||||||
num_ingested_docs: Some(1), | ||||||||||||||||||||||||||
num_rejected_docs: Some(0), | ||||||||||||||||||||||||||
parse_failures: None, | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -452,20 +529,20 @@ async fn test_commit_wait_for() { | |||||||||||||||||||||||||
let (ingest_resp_1, ingest_resp_2) = tokio::join!(ingest_1_fut, ingest_2_fut); | ||||||||||||||||||||||||||
assert_eq!( | ||||||||||||||||||||||||||
ingest_resp_1, | ||||||||||||||||||||||||||
RestIngestResponse { | ||||||||||||||||||||||||||
CumulatedIngestResponse { | ||||||||||||||||||||||||||
num_docs_for_processing: 1, | ||||||||||||||||||||||||||
num_ingested_docs: Some(1), | ||||||||||||||||||||||||||
num_rejected_docs: Some(0), | ||||||||||||||||||||||||||
parse_failures: None, | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
assert_eq!( | ||||||||||||||||||||||||||
ingest_resp_2, | ||||||||||||||||||||||||||
RestIngestResponse { | ||||||||||||||||||||||||||
CumulatedIngestResponse { | ||||||||||||||||||||||||||
num_docs_for_processing: 1, | ||||||||||||||||||||||||||
num_ingested_docs: Some(1), | ||||||||||||||||||||||||||
num_rejected_docs: Some(0), | ||||||||||||||||||||||||||
parse_failures: None, | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -523,11 +600,11 @@ async fn test_commit_auto() { | |||||||||||||||||||||||||
.unwrap(); | ||||||||||||||||||||||||||
assert_eq!( | ||||||||||||||||||||||||||
ingest_resp, | ||||||||||||||||||||||||||
RestIngestResponse { | ||||||||||||||||||||||||||
CumulatedIngestResponse { | ||||||||||||||||||||||||||
num_docs_for_processing: 1, | ||||||||||||||||||||||||||
num_ingested_docs: Some(1), | ||||||||||||||||||||||||||
num_rejected_docs: Some(0), | ||||||||||||||||||||||||||
parse_failures: None, | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
|
||||||||||||||||||||||||||
|
@@ -577,7 +654,7 @@ async fn test_detailed_ingest_response() { | |||||||||||||||||||||||||
|
||||||||||||||||||||||||||
assert_eq!( | ||||||||||||||||||||||||||
ingest_resp, | ||||||||||||||||||||||||||
RestIngestResponse { | ||||||||||||||||||||||||||
CumulatedIngestResponse { | ||||||||||||||||||||||||||
num_docs_for_processing: 2, | ||||||||||||||||||||||||||
num_ingested_docs: Some(1), | ||||||||||||||||||||||||||
num_rejected_docs: Some(1), | ||||||||||||||||||||||||||
|
@@ -586,6 +663,7 @@ async fn test_detailed_ingest_response() { | |||||||||||||||||||||||||
message: "failed to parse JSON document".to_string(), | ||||||||||||||||||||||||||
reason: ParseFailureReason::InvalidJson, | ||||||||||||||||||||||||||
}]), | ||||||||||||||||||||||||||
..Default::default() | ||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||
); | ||||||||||||||||||||||||||
sandbox.shutdown().await.unwrap(); | ||||||||||||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,7 @@ | |
use std::path::PathBuf; | ||
use std::time::Duration; | ||
|
||
use quickwit_serve::{RestIngestResponse, RestParseFailure}; | ||
use reqwest::StatusCode; | ||
use serde::de::DeserializeOwned; | ||
use serde::{Deserialize, Serialize}; | ||
|
@@ -94,6 +95,43 @@ pub enum IngestSource { | |
Stdin, | ||
} | ||
|
||
#[derive(Debug, PartialEq, Default)] | ||
pub struct CumulatedIngestResponse { | ||
pub num_docs_for_processing: u64, | ||
pub num_ingested_docs: Option<u64>, | ||
pub num_rejected_docs: Option<u64>, | ||
pub parse_failures: Option<Vec<RestParseFailure>>, | ||
pub num_too_many_requests: u64, | ||
} | ||
|
||
impl CumulatedIngestResponse { | ||
/// Aggregates ingest counts and errors. | ||
pub fn merge(self, other: RestIngestResponse) -> Self { | ||
Self { | ||
num_docs_for_processing: self.num_docs_for_processing + other.num_docs_for_processing, | ||
num_ingested_docs: apply_op(self.num_ingested_docs, other.num_ingested_docs, |a, b| { | ||
a + b | ||
}), | ||
num_rejected_docs: apply_op(self.num_rejected_docs, other.num_rejected_docs, |a, b| { | ||
a + b | ||
}), | ||
parse_failures: apply_op(self.parse_failures, other.parse_failures, |a, b| { | ||
a.into_iter().chain(b).collect() | ||
}), | ||
num_too_many_requests: self.num_too_many_requests, | ||
} | ||
} | ||
Comment on lines
+109
to
+123
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I moved this back here as it makes more sense than in the API model because accumulating responses is quite specific to the rest client. |
||
} | ||
|
||
fn apply_op<T>(a: Option<T>, b: Option<T>, f: impl Fn(T, T) -> T) -> Option<T> { | ||
match (a, b) { | ||
(Some(a), Some(b)) => Some(f(a, b)), | ||
(Some(a), None) => Some(a), | ||
(None, Some(b)) => Some(b), | ||
(None, None) => None, | ||
} | ||
} | ||
|
||
/// A structure that represent a timeout. Unlike Duration it can also represent an infinite or no | ||
/// timeout value. | ||
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash, Default, Debug)] | ||
|
@@ -149,3 +187,55 @@ impl Timeout { | |
} | ||
} | ||
} | ||
|
||
#[cfg(test)] | ||
mod test { | ||
use quickwit_proto::ingest::ParseFailureReason; | ||
|
||
use super::*; | ||
|
||
#[test] | ||
fn test_merge_responses() { | ||
let mut merged_response = CumulatedIngestResponse::default(); | ||
let response1 = RestIngestResponse { | ||
num_docs_for_processing: 10, | ||
num_ingested_docs: Some(5), | ||
num_rejected_docs: Some(2), | ||
parse_failures: Some(vec![RestParseFailure { | ||
message: "error1".to_string(), | ||
document: "doc1".to_string(), | ||
reason: ParseFailureReason::InvalidJson, | ||
}]), | ||
}; | ||
let response2 = RestIngestResponse { | ||
num_docs_for_processing: 15, | ||
num_ingested_docs: Some(10), | ||
num_rejected_docs: Some(3), | ||
parse_failures: Some(vec![RestParseFailure { | ||
message: "error2".to_string(), | ||
document: "doc2".to_string(), | ||
reason: ParseFailureReason::InvalidJson, | ||
}]), | ||
}; | ||
merged_response = merged_response.merge(response1); | ||
merged_response = merged_response.merge(response2); | ||
assert_eq!(merged_response.num_docs_for_processing, 25); | ||
assert_eq!(merged_response.num_ingested_docs.unwrap(), 15); | ||
assert_eq!(merged_response.num_rejected_docs.unwrap(), 5); | ||
assert_eq!( | ||
merged_response.parse_failures.unwrap(), | ||
vec![ | ||
RestParseFailure { | ||
message: "error1".to_string(), | ||
document: "doc1".to_string(), | ||
reason: ParseFailureReason::InvalidJson, | ||
}, | ||
RestParseFailure { | ||
message: "error2".to_string(), | ||
document: "doc2".to_string(), | ||
reason: ParseFailureReason::InvalidJson, | ||
} | ||
] | ||
); | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.