Skip to content

Commit 58944fc

Browse files
authored
Record object storage request latencies (#5521)
1 parent 826f10f commit 58944fc

File tree

4 files changed

+70
-41
lines changed

4 files changed

+70
-41
lines changed

quickwit/quickwit-common/src/metrics.rs

Lines changed: 0 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -90,25 +90,6 @@ pub fn new_counter(
9090
counter
9191
}
9292

93-
pub fn new_counter_with_labels(
94-
name: &str,
95-
help: &str,
96-
subsystem: &str,
97-
const_labels: &[(&str, &str)],
98-
) -> IntCounter {
99-
let owned_const_labels: HashMap<String, String> = const_labels
100-
.iter()
101-
.map(|(label_name, label_value)| (label_name.to_string(), label_value.to_string()))
102-
.collect();
103-
let counter_opts = Opts::new(name, help)
104-
.namespace("quickwit")
105-
.subsystem(subsystem)
106-
.const_labels(owned_const_labels);
107-
let counter = IntCounter::with_opts(counter_opts).expect("failed to create counter");
108-
prometheus::register(Box::new(counter.clone())).expect("failed to register counter");
109-
counter
110-
}
111-
11293
pub fn new_counter_vec<const N: usize>(
11394
name: &str,
11495
help: &str,

quickwit/quickwit-search/src/metrics.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ impl Default for SearchMetrics {
7070
),
7171
root_search_request_duration_seconds: new_histogram_vec(
7272
"root_search_request_duration_seconds",
73-
"Duration of root search gRPC request in seconds.",
73+
"Duration of root search gRPC requests in seconds.",
7474
"search",
7575
&[("kind", "server")],
7676
["status"],
@@ -93,7 +93,7 @@ impl Default for SearchMetrics {
9393
),
9494
leaf_search_request_duration_seconds: new_histogram_vec(
9595
"leaf_search_request_duration_seconds",
96-
"Duration of leaf search gRPC request in seconds.",
96+
"Duration of leaf search gRPC requests in seconds.",
9797
"search",
9898
&[("kind", "server")],
9999
["status"],

quickwit/quickwit-storage/src/metrics.rs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121

2222
use once_cell::sync::Lazy;
2323
use quickwit_common::metrics::{
24-
new_counter, new_counter_vec, new_counter_with_labels, new_gauge, IntCounter, IntCounterVec,
25-
IntGauge,
24+
new_counter, new_counter_vec, new_gauge, new_histogram_vec, Histogram, IntCounter,
25+
IntCounterVec, IntGauge,
2626
};
2727

2828
/// Counters associated to storage operations.
@@ -41,6 +41,11 @@ pub struct StorageMetrics {
4141
pub object_storage_put_parts: IntCounter,
4242
pub object_storage_download_num_bytes: IntCounter,
4343
pub object_storage_upload_num_bytes: IntCounter,
44+
45+
pub object_storage_delete_requests_total: IntCounter,
46+
pub object_storage_bulk_delete_requests_total: IntCounter,
47+
pub object_storage_delete_request_duration: Histogram,
48+
pub object_storage_bulk_delete_request_duration: Histogram,
4449
}
4550

4651
impl Default for StorageMetrics {
@@ -60,6 +65,32 @@ impl Default for StorageMetrics {
6065
];
6166
let get_slice_timeout_all_timeouts =
6267
get_slice_timeout_outcome_total_vec.with_label_values(["all_timeouts"]);
68+
69+
let object_storage_requests_total = new_counter_vec(
70+
"object_storage_requests_total",
71+
"Total number of object storage requests performed.",
72+
"storage",
73+
&[],
74+
["action"],
75+
);
76+
let object_storage_delete_requests_total =
77+
object_storage_requests_total.with_label_values(["delete_object"]);
78+
let object_storage_bulk_delete_requests_total =
79+
object_storage_requests_total.with_label_values(["delete_objects"]);
80+
81+
let object_storage_request_duration = new_histogram_vec(
82+
"object_storage_request_duration_seconds",
83+
"Duration of object storage requests in seconds.",
84+
"storage",
85+
&[],
86+
["action"],
87+
vec![0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0],
88+
);
89+
let object_storage_delete_request_duration =
90+
object_storage_request_duration.with_label_values(["delete_object"]);
91+
let object_storage_bulk_delete_request_duration =
92+
object_storage_request_duration.with_label_values(["delete_objects"]);
93+
6394
StorageMetrics {
6495
fast_field_cache: CacheMetrics::for_component("fastfields"),
6596
fd_cache_metrics: CacheMetrics::for_component("fd"),
@@ -107,6 +138,10 @@ impl Default for StorageMetrics {
107138
"storage",
108139
&[],
109140
),
141+
object_storage_delete_requests_total,
142+
object_storage_bulk_delete_requests_total,
143+
object_storage_delete_request_duration,
144+
object_storage_bulk_delete_request_duration,
110145
}
111146
}
112147
}
@@ -141,31 +176,31 @@ impl CacheMetrics {
141176
CACHE_METRICS_NAMESPACE,
142177
&[("component_name", component_name)],
143178
),
144-
hits_num_items: new_counter_with_labels(
179+
hits_num_items: new_counter(
145180
"cache_hits_total",
146181
"Number of cache hits by component",
147182
CACHE_METRICS_NAMESPACE,
148183
&[("component_name", component_name)],
149184
),
150-
hits_num_bytes: new_counter_with_labels(
185+
hits_num_bytes: new_counter(
151186
"cache_hits_bytes",
152187
"Number of cache hits in bytes by component",
153188
CACHE_METRICS_NAMESPACE,
154189
&[("component_name", component_name)],
155190
),
156-
misses_num_items: new_counter_with_labels(
191+
misses_num_items: new_counter(
157192
"cache_misses_total",
158193
"Number of cache misses by component",
159194
CACHE_METRICS_NAMESPACE,
160195
&[("component_name", component_name)],
161196
),
162-
evict_num_items: new_counter_with_labels(
197+
evict_num_items: new_counter(
163198
"cache_evict_total",
164199
"Number of cache entry evicted by component",
165200
CACHE_METRICS_NAMESPACE,
166201
&[("component_name", component_name)],
167202
),
168-
evict_num_bytes: new_counter_with_labels(
203+
evict_num_bytes: new_counter(
169204
"cache_evict_bytes",
170205
"Number of cache entry evicted in bytes by component",
171206
CACHE_METRICS_NAMESPACE,

quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -289,6 +289,12 @@ impl S3CompatibleObjectStorage {
289289
.byte_stream()
290290
.await
291291
.map_err(|io_error| Retry::Permanent(StorageError::from(io_error)))?;
292+
293+
crate::STORAGE_METRICS.object_storage_put_parts.inc();
294+
crate::STORAGE_METRICS
295+
.object_storage_upload_num_bytes
296+
.inc_by(len);
297+
292298
self.s3_client
293299
.put_object()
294300
.bucket(bucket)
@@ -304,11 +310,6 @@ impl S3CompatibleObjectStorage {
304310
Retry::Permanent(StorageError::from(sdk_error))
305311
}
306312
})?;
307-
308-
crate::STORAGE_METRICS.object_storage_put_parts.inc();
309-
crate::STORAGE_METRICS
310-
.object_storage_upload_num_bytes
311-
.inc_by(len);
312313
Ok(())
313314
}
314315

@@ -423,6 +424,7 @@ impl S3CompatibleObjectStorage {
423424
.map_err(StorageError::from)
424425
.map_err(Retry::Permanent)?;
425426
let md5 = BASE64_STANDARD.encode(part.md5.0);
427+
426428
crate::STORAGE_METRICS.object_storage_put_parts.inc();
427429
crate::STORAGE_METRICS
428430
.object_storage_upload_num_bytes
@@ -449,7 +451,7 @@ impl S3CompatibleObjectStorage {
449451
})?;
450452

451453
let completed_part = CompletedPart::builder()
452-
.set_e_tag(upload_part_output.e_tag().map(|tag| tag.to_string()))
454+
.set_e_tag(upload_part_output.e_tag)
453455
.part_number(part.part_number as i32)
454456
.build();
455457
Ok(completed_part)
@@ -538,13 +540,14 @@ impl S3CompatibleObjectStorage {
538540
Ok(())
539541
}
540542

541-
async fn create_get_object_request(
543+
async fn get_object(
542544
&self,
543545
path: &Path,
544546
range_opt: Option<Range<usize>>,
545547
) -> Result<GetObjectOutput, SdkError<GetObjectError>> {
546548
let key = self.key(path);
547549
let range_str = range_opt.map(|range| format!("bytes={}-{}", range.start, range.end - 1));
550+
548551
crate::STORAGE_METRICS.object_storage_get_total.inc();
549552

550553
let get_object_output = self
@@ -565,7 +568,7 @@ impl S3CompatibleObjectStorage {
565568
) -> StorageResult<Vec<u8>> {
566569
let cap = range_opt.as_ref().map(Range::len).unwrap_or(0);
567570
let get_object_output = aws_retry(&self.retry_params, || {
568-
self.create_get_object_request(path, range_opt.clone())
571+
self.get_object(path, range_opt.clone())
569572
})
570573
.await?;
571574
let mut buf: Vec<u8> = Vec::with_capacity(cap);
@@ -638,6 +641,12 @@ impl S3CompatibleObjectStorage {
638641
for (path_chunk, delete) in &mut delete_requests_it {
639642
let delete_objects_res: StorageResult<DeleteObjectsOutput> =
640643
aws_retry(&self.retry_params, || async {
644+
crate::STORAGE_METRICS
645+
.object_storage_bulk_delete_requests_total
646+
.inc();
647+
let _timer = crate::STORAGE_METRICS
648+
.object_storage_bulk_delete_request_duration
649+
.start_timer();
641650
self.s3_client
642651
.delete_objects()
643652
.bucket(self.bucket.clone())
@@ -752,10 +761,8 @@ impl Storage for S3CompatibleObjectStorage {
752761

753762
async fn copy_to(&self, path: &Path, output: &mut dyn SendableAsync) -> StorageResult<()> {
754763
let _permit = REQUEST_SEMAPHORE.acquire().await;
755-
let get_object_output = aws_retry(&self.retry_params, || {
756-
self.create_get_object_request(path, None)
757-
})
758-
.await?;
764+
let get_object_output =
765+
aws_retry(&self.retry_params, || self.get_object(path, None)).await?;
759766
let mut body_read = BufReader::new(get_object_output.body.into_async_read());
760767
let num_bytes_copied = tokio::io::copy_buf(&mut body_read, output).await?;
761768
STORAGE_METRICS
@@ -770,6 +777,12 @@ impl Storage for S3CompatibleObjectStorage {
770777
let bucket = self.bucket.clone();
771778
let key = self.key(path);
772779
let delete_res = aws_retry(&self.retry_params, || async {
780+
crate::STORAGE_METRICS
781+
.object_storage_delete_requests_total
782+
.inc();
783+
let _timer = crate::STORAGE_METRICS
784+
.object_storage_delete_request_duration
785+
.start_timer();
773786
self.s3_client
774787
.delete_object()
775788
.bucket(&bucket)
@@ -818,7 +831,7 @@ impl Storage for S3CompatibleObjectStorage {
818831
) -> crate::StorageResult<Box<dyn AsyncRead + Send + Unpin>> {
819832
let permit = REQUEST_SEMAPHORE.acquire().await;
820833
let get_object_output = aws_retry(&self.retry_params, || {
821-
self.create_get_object_request(path, Some(range.clone()))
834+
self.get_object(path, Some(range.clone()))
822835
})
823836
.await?;
824837
Ok(Box::new(S3AsyncRead {

0 commit comments

Comments
 (0)