Skip to content

Commit 6e2e630

Browse files
authored
Keeping details of ingest error longer. (#5258)
This PR: - logs an error on router timeout (as there is already an error on the ingester side) - added metrics on the persist subrequests as measured at the end of the router work. - stops returning a rate limiting error when all of the subrequests were rate limited. - advertising: rate limiting as a rate limiting error. - stopped lifting all subrequests as rate limiting as a rate limited request. We now just return a 200, but the individual subrequests return a rate limiting error.
1 parent ded426c commit 6e2e630

File tree

19 files changed

+366
-119
lines changed

19 files changed

+366
-119
lines changed

quickwit/quickwit-ingest/src/error.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use quickwit_actors::AskError;
2424
use quickwit_common::rate_limited_error;
2525
use quickwit_common::tower::BufferError;
2626
pub(crate) use quickwit_proto::error::{grpc_error_to_grpc_status, grpc_status_to_service_error};
27-
use quickwit_proto::ingest::IngestV2Error;
27+
use quickwit_proto::ingest::{IngestV2Error, RateLimitingCause};
2828
use quickwit_proto::types::IndexId;
2929
use quickwit_proto::{tonic, GrpcServiceError, ServiceError, ServiceErrorCode};
3030
use serde::{Deserialize, Serialize};
@@ -43,8 +43,8 @@ pub enum IngestServiceError {
4343
InvalidPosition(String),
4444
#[error("io error {0}")]
4545
IoError(String),
46-
#[error("rate limited")]
47-
RateLimited,
46+
#[error("rate limited {0}")]
47+
RateLimited(RateLimitingCause),
4848
#[error("ingest service is unavailable ({0})")]
4949
Unavailable(String),
5050
}
@@ -89,7 +89,9 @@ impl From<IngestV2Error> for IngestServiceError {
8989
IngestV2Error::ShardNotFound { .. } => {
9090
IngestServiceError::Internal("shard not found".to_string())
9191
}
92-
IngestV2Error::TooManyRequests => IngestServiceError::RateLimited,
92+
IngestV2Error::TooManyRequests(rate_limiting_cause) => {
93+
IngestServiceError::RateLimited(rate_limiting_cause)
94+
}
9395
}
9496
}
9597
}
@@ -115,7 +117,7 @@ impl ServiceError for IngestServiceError {
115117
rate_limited_error!(limit_per_min = 6, "ingest/io internal error: {io_err}");
116118
ServiceErrorCode::Internal
117119
}
118-
Self::RateLimited => ServiceErrorCode::TooManyRequests,
120+
Self::RateLimited(_) => ServiceErrorCode::TooManyRequests,
119121
Self::Unavailable(_) => ServiceErrorCode::Unavailable,
120122
}
121123
}
@@ -131,7 +133,7 @@ impl GrpcServiceError for IngestServiceError {
131133
}
132134

133135
fn new_too_many_requests() -> Self {
134-
Self::RateLimited
136+
Self::RateLimited(RateLimitingCause::Unknown)
135137
}
136138

137139
fn new_unavailable(error_msg: String) -> Self {
@@ -158,7 +160,7 @@ impl From<IngestServiceError> for tonic::Status {
158160
IngestServiceError::Internal(_) => tonic::Code::Internal,
159161
IngestServiceError::InvalidPosition(_) => tonic::Code::InvalidArgument,
160162
IngestServiceError::IoError { .. } => tonic::Code::Internal,
161-
IngestServiceError::RateLimited => tonic::Code::ResourceExhausted,
163+
IngestServiceError::RateLimited(_) => tonic::Code::ResourceExhausted,
162164
IngestServiceError::Unavailable(_) => tonic::Code::Unavailable,
163165
};
164166
let message = error.to_string();

quickwit/quickwit-ingest/src/ingest_api_service.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use quickwit_actors::{
2727
};
2828
use quickwit_common::runtimes::RuntimeType;
2929
use quickwit_common::tower::Cost;
30+
use quickwit_proto::ingest::RateLimitingCause;
3031
use tracing::{error, info};
3132
use ulid::Ulid;
3233

@@ -166,7 +167,7 @@ impl IngestApiService {
166167

167168
if disk_used > self.disk_limit {
168169
info!("ingestion rejected due to disk limit");
169-
return Err(IngestServiceError::RateLimited);
170+
return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull));
170171
}
171172

172173
if self
@@ -175,7 +176,7 @@ impl IngestApiService {
175176
.is_err()
176177
{
177178
info!("ingest request rejected due to memory limit");
178-
return Err(IngestServiceError::RateLimited);
179+
return Err(IngestServiceError::RateLimited(RateLimitingCause::WalFull));
179180
}
180181
let mut num_docs = 0usize;
181182
let mut notifications = Vec::new();

quickwit/quickwit-ingest/src/ingest_v2/ingester.rs

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use quickwit_common::pretty::PrettyDisplay;
3737
use quickwit_common::pubsub::{EventBroker, EventSubscriber};
3838
use quickwit_common::rate_limiter::{RateLimiter, RateLimiterSettings};
3939
use quickwit_common::tower::Pool;
40-
use quickwit_common::{rate_limited_warn, ServiceStream};
40+
use quickwit_common::{rate_limited_error, rate_limited_warn, ServiceStream};
4141
use quickwit_proto::control_plane::{
4242
AdviseResetShardsRequest, ControlPlaneService, ControlPlaneServiceClient,
4343
};
@@ -544,7 +544,7 @@ impl Ingester {
544544
index_uid: subrequest.index_uid,
545545
source_id: subrequest.source_id,
546546
shard_id: subrequest.shard_id,
547-
reason: PersistFailureReason::ResourceExhausted as i32,
547+
reason: PersistFailureReason::WalFull as i32,
548548
};
549549
persist_failures.push(persist_failure);
550550
continue;
@@ -562,7 +562,7 @@ impl Ingester {
562562
index_uid: subrequest.index_uid,
563563
source_id: subrequest.source_id,
564564
shard_id: subrequest.shard_id,
565-
reason: PersistFailureReason::RateLimited as i32,
565+
reason: PersistFailureReason::ShardRateLimited as i32,
566566
};
567567
persist_failures.push(persist_failure);
568568
continue;
@@ -695,9 +695,7 @@ impl Ingester {
695695
PersistFailureReason::ShardNotFound
696696
}
697697
ReplicateFailureReason::ShardClosed => PersistFailureReason::ShardClosed,
698-
ReplicateFailureReason::ResourceExhausted => {
699-
PersistFailureReason::ResourceExhausted
700-
}
698+
ReplicateFailureReason::WalFull => PersistFailureReason::WalFull,
701699
};
702700
let persist_failure = PersistFailure {
703701
subrequest_id: replicate_failure.subrequest_id,
@@ -889,13 +887,12 @@ impl Ingester {
889887

890888
let mut state_guard = self.state.lock_partially().await?;
891889

892-
let shard =
893-
state_guard
894-
.shards
895-
.get_mut(&queue_id)
896-
.ok_or_else(|| IngestV2Error::ShardNotFound {
897-
shard_id: open_fetch_stream_request.shard_id().clone(),
898-
})?;
890+
let shard = state_guard.shards.get_mut(&queue_id).ok_or_else(|| {
891+
rate_limited_error!(limit_per_min=6, queue_id=%queue_id, "shard not found");
892+
IngestV2Error::ShardNotFound {
893+
shard_id: open_fetch_stream_request.shard_id().clone(),
894+
}
895+
})?;
899896
// An indexer can only know about a newly opened shard if it has been scheduled by the
900897
// control plane, which confirms that the shard was correctly opened in the
901898
// metastore.
@@ -2039,10 +2036,7 @@ mod tests {
20392036
assert_eq!(persist_response.failures.len(), 1);
20402037

20412038
let persist_failure = &persist_response.failures[0];
2042-
assert_eq!(
2043-
persist_failure.reason(),
2044-
PersistFailureReason::ResourceExhausted
2045-
);
2039+
assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull);
20462040
}
20472041

20482042
#[tokio::test]
@@ -2102,7 +2096,10 @@ mod tests {
21022096
assert_eq!(persist_response.failures.len(), 1);
21032097

21042098
let persist_failure = &persist_response.failures[0];
2105-
assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited);
2099+
assert_eq!(
2100+
persist_failure.reason(),
2101+
PersistFailureReason::ShardRateLimited
2102+
);
21062103
}
21072104

21082105
// This test should be run manually and independently of other tests with the `failpoints`
@@ -2725,7 +2722,10 @@ mod tests {
27252722
assert_eq!(persist_failure.index_uid(), &index_uid);
27262723
assert_eq!(persist_failure.source_id, "test-source");
27272724
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
2728-
assert_eq!(persist_failure.reason(), PersistFailureReason::RateLimited);
2725+
assert_eq!(
2726+
persist_failure.reason(),
2727+
PersistFailureReason::ShardRateLimited
2728+
);
27292729

27302730
let state_guard = ingester.state.lock_fully().await.unwrap();
27312731
assert_eq!(state_guard.shards.len(), 1);
@@ -2802,10 +2802,7 @@ mod tests {
28022802
assert_eq!(persist_failure.index_uid(), &index_uid);
28032803
assert_eq!(persist_failure.source_id, "test-source");
28042804
assert_eq!(persist_failure.shard_id(), ShardId::from(1));
2805-
assert_eq!(
2806-
persist_failure.reason(),
2807-
PersistFailureReason::ResourceExhausted
2808-
);
2805+
assert_eq!(persist_failure.reason(), PersistFailureReason::WalFull);
28092806

28102807
let state_guard = ingester.state.lock_fully().await.unwrap();
28112808
assert_eq!(state_guard.shards.len(), 1);

quickwit/quickwit-ingest/src/ingest_v2/metrics.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,61 @@ use mrecordlog::ResourceUsage;
2121
use once_cell::sync::Lazy;
2222
use quickwit_common::metrics::{
2323
exponential_buckets, linear_buckets, new_counter_vec, new_gauge, new_gauge_vec, new_histogram,
24-
new_histogram_vec, Histogram, HistogramVec, IntCounterVec, IntGauge, IntGaugeVec,
24+
new_histogram_vec, Histogram, HistogramVec, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
2525
};
2626

27+
// Counter vec counting the different outcomes of ingest requests as
28+
// measure at the end of the router work.
29+
//
30+
// The counter are counting persist subrequests.
31+
pub(crate) struct IngestResultMetrics {
32+
pub success: IntCounter,
33+
pub circuit_breaker: IntCounter,
34+
pub unspecified: IntCounter,
35+
pub index_not_found: IntCounter,
36+
pub source_not_found: IntCounter,
37+
pub internal: IntCounter,
38+
pub no_shards_available: IntCounter,
39+
pub shard_rate_limited: IntCounter,
40+
pub wal_full: IntCounter,
41+
pub timeout: IntCounter,
42+
pub router_timeout: IntCounter,
43+
pub router_load_shedding: IntCounter,
44+
pub load_shedding: IntCounter,
45+
pub shard_not_found: IntCounter,
46+
pub unavailable: IntCounter,
47+
}
48+
49+
impl Default for IngestResultMetrics {
50+
fn default() -> Self {
51+
let ingest_result_total_vec = new_counter_vec::<1>(
52+
"ingest_result_total",
53+
"Number of ingest requests by result",
54+
"ingest",
55+
&[],
56+
["result"],
57+
);
58+
Self {
59+
success: ingest_result_total_vec.with_label_values(["success"]),
60+
circuit_breaker: ingest_result_total_vec.with_label_values(["circuit_breaker"]),
61+
unspecified: ingest_result_total_vec.with_label_values(["unspecified"]),
62+
index_not_found: ingest_result_total_vec.with_label_values(["index_not_found"]),
63+
source_not_found: ingest_result_total_vec.with_label_values(["source_not_found"]),
64+
internal: ingest_result_total_vec.with_label_values(["internal"]),
65+
no_shards_available: ingest_result_total_vec.with_label_values(["no_shards_available"]),
66+
shard_rate_limited: ingest_result_total_vec.with_label_values(["shard_rate_limited"]),
67+
wal_full: ingest_result_total_vec.with_label_values(["wal_full"]),
68+
timeout: ingest_result_total_vec.with_label_values(["timeout"]),
69+
router_timeout: ingest_result_total_vec.with_label_values(["router_timeout"]),
70+
router_load_shedding: ingest_result_total_vec
71+
.with_label_values(["router_load_shedding"]),
72+
load_shedding: ingest_result_total_vec.with_label_values(["load_shedding"]),
73+
unavailable: ingest_result_total_vec.with_label_values(["unavailable"]),
74+
shard_not_found: ingest_result_total_vec.with_label_values(["shard_not_found"]),
75+
}
76+
}
77+
}
78+
2779
pub(super) struct IngestV2Metrics {
2880
pub reset_shards_operations_total: IntCounterVec<1>,
2981
pub open_shards: IntGauge,
@@ -34,11 +86,13 @@ pub(super) struct IngestV2Metrics {
3486
pub wal_acquire_lock_request_duration_secs: HistogramVec<2>,
3587
pub wal_disk_used_bytes: IntGauge,
3688
pub wal_memory_used_bytes: IntGauge,
89+
pub ingest_results: IngestResultMetrics,
3790
}
3891

3992
impl Default for IngestV2Metrics {
4093
fn default() -> Self {
4194
Self {
95+
ingest_results: IngestResultMetrics::default(),
4296
reset_shards_operations_total: new_counter_vec(
4397
"reset_shards_operations_total",
4498
"Total number of reset shards operations performed.",

quickwit/quickwit-ingest/src/ingest_v2/replication.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ impl ReplicationTask {
624624
index_uid: subrequest.index_uid,
625625
source_id: subrequest.source_id,
626626
shard_id: subrequest.shard_id,
627-
reason: ReplicateFailureReason::ResourceExhausted as i32,
627+
reason: ReplicateFailureReason::WalFull as i32,
628628
};
629629
replicate_failures.push(replicate_failure);
630630
continue;
@@ -1626,7 +1626,7 @@ mod tests {
16261626
assert_eq!(replicate_failure_0.shard_id(), ShardId::from(1));
16271627
assert_eq!(
16281628
replicate_failure_0.reason(),
1629-
ReplicateFailureReason::ResourceExhausted
1629+
ReplicateFailureReason::WalFull
16301630
);
16311631
}
16321632
}

0 commit comments

Comments
 (0)