Skip to content

Commit ea35a53

Browse files
authored
Add in flight get slice metric (#5726)
1 parent 1ed7ece commit ea35a53

File tree

5 files changed

+52
-8
lines changed

5 files changed

+52
-8
lines changed

quickwit/quickwit-storage/src/local_file_storage.rs

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use quickwit_config::StorageBackend;
2828
use tokio::io::{AsyncRead, AsyncReadExt, AsyncSeekExt, AsyncWriteExt};
2929
use tracing::warn;
3030

31+
use crate::metrics::object_storage_get_slice_in_flight_guards;
3132
use crate::storage::SendableAsync;
3233
use crate::{
3334
BulkDeleteError, DebouncedStorage, DeleteFailure, OwnedBytes, Storage, StorageError,
@@ -214,6 +215,7 @@ impl Storage for LocalFileStorage {
214215
// step, as there would be if using tokio async File.
215216
let mut file = std::fs::File::open(full_path)?;
216217
file.seek(SeekFrom::Start(range.start as u64))?;
218+
let _in_flight_guards = object_storage_get_slice_in_flight_guards(range.len());
217219
let mut content_bytes: Vec<u8> = Vec::with_capacity(range.len());
218220
#[allow(clippy::uninit_vec)]
219221
unsafe {

quickwit/quickwit-storage/src/metrics.rs

+31-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
use once_cell::sync::Lazy;
1818
use quickwit_common::metrics::{
19-
new_counter, new_counter_vec, new_gauge, new_histogram_vec, Histogram, IntCounter,
19+
new_counter, new_counter_vec, new_gauge, new_histogram_vec, GaugeGuard, Histogram, IntCounter,
2020
IntCounterVec, IntGauge,
2121
};
2222

@@ -32,6 +32,8 @@ pub struct StorageMetrics {
3232
pub get_slice_timeout_all_timeouts: IntCounter,
3333
pub object_storage_get_total: IntCounter,
3434
pub object_storage_get_errors_total: IntCounterVec<1>,
35+
pub object_storage_get_slice_in_flight_count: IntGauge,
36+
pub object_storage_get_slice_in_flight_num_bytes: IntGauge,
3537
pub object_storage_put_total: IntCounter,
3638
pub object_storage_put_parts: IntCounter,
3739
pub object_storage_download_num_bytes: IntCounter,
@@ -97,7 +99,8 @@ impl Default for StorageMetrics {
9799
get_slice_timeout_all_timeouts,
98100
object_storage_get_total: new_counter(
99101
"object_storage_gets_total",
100-
"Number of objects fetched.",
102+
"Number of objects fetched. Might be lower than get_slice_timeout_outcome if \
103+
queries are debounced.",
101104
"storage",
102105
&[],
103106
),
@@ -108,6 +111,19 @@ impl Default for StorageMetrics {
108111
&[],
109112
["code"],
110113
),
114+
object_storage_get_slice_in_flight_count: new_gauge(
115+
"object_storage_get_slice_in_flight_count",
116+
"Number of GetObject for which the memory was allocated but the download is still \
117+
in progress.",
118+
"storage",
119+
&[],
120+
),
121+
object_storage_get_slice_in_flight_num_bytes: new_gauge(
122+
"object_storage_get_slice_in_flight_num_bytes",
123+
"Memory allocated for GetObject requests that are still in progress.",
124+
"storage",
125+
&[],
126+
),
111127
object_storage_put_total: new_counter(
112128
"object_storage_puts_total",
113129
"Number of objects uploaded. May differ from object_storage_requests_parts due to \
@@ -212,3 +228,16 @@ pub static STORAGE_METRICS: Lazy<StorageMetrics> = Lazy::new(StorageMetrics::def
212228
#[cfg(test)]
213229
pub static CACHE_METRICS_FOR_TESTS: Lazy<CacheMetrics> =
214230
Lazy::new(|| CacheMetrics::for_component("fortest"));
231+
232+
pub fn object_storage_get_slice_in_flight_guards(
233+
get_request_size: usize,
234+
) -> (GaugeGuard<'static>, GaugeGuard<'static>) {
235+
let mut bytes_guard = GaugeGuard::from_gauge(
236+
&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_num_bytes,
237+
);
238+
bytes_guard.add(get_request_size as i64);
239+
let mut count_guard =
240+
GaugeGuard::from_gauge(&crate::STORAGE_METRICS.object_storage_get_slice_in_flight_count);
241+
count_guard.add(1);
242+
(bytes_guard, count_guard)
243+
}

quickwit/quickwit-storage/src/object_storage/azure_blob_storage.rs

+10-6
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use tokio_util::io::StreamReader;
4444
use tracing::{instrument, warn};
4545

4646
use crate::debouncer::DebouncedStorage;
47+
use crate::metrics::object_storage_get_slice_in_flight_guards;
4748
use crate::storage::SendableAsync;
4849
use crate::{
4950
BulkDeleteError, DeleteFailure, MultiPartPolicy, PutPayload, Storage, StorageError,
@@ -194,18 +195,21 @@ impl AzureBlobStorage {
194195
) -> StorageResult<Vec<u8>> {
195196
let name = self.blob_name(path);
196197
let capacity = range_opt.as_ref().map(Range::len).unwrap_or(0);
197-
198198
retry(&self.retry_params, || async {
199-
let mut response_stream = if let Some(range) = range_opt.as_ref() {
200-
self.container_client
199+
let (mut response_stream, _in_flight_guards) = if let Some(range) = range_opt.as_ref() {
200+
let stream = self
201+
.container_client
201202
.blob_client(&name)
202203
.get()
203204
.range(range.clone())
204-
.into_stream()
205+
.into_stream();
206+
// only record ranged get request as being in flight
207+
let in_flight_guards = object_storage_get_slice_in_flight_guards(capacity);
208+
(stream, Some(in_flight_guards))
205209
} else {
206-
self.container_client.blob_client(&name).get().into_stream()
210+
let stream = self.container_client.blob_client(&name).get().into_stream();
211+
(stream, None)
207212
};
208-
209213
let mut buf: Vec<u8> = Vec::with_capacity(capacity);
210214
download_all(&mut response_stream, &mut buf).await?;
211215

quickwit/quickwit-storage/src/object_storage/s3_compatible_storage.rs

+4
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use tokio::io::{AsyncRead, AsyncReadExt, AsyncWriteExt, BufReader, ReadBuf};
4444
use tokio::sync::Semaphore;
4545
use tracing::{info, instrument, warn};
4646

47+
use crate::metrics::object_storage_get_slice_in_flight_guards;
4748
use crate::object_storage::MultiPartPolicy;
4849
use crate::storage::SendableAsync;
4950
use crate::{
@@ -566,6 +567,9 @@ impl S3CompatibleObjectStorage {
566567
self.get_object(path, range_opt.clone())
567568
})
568569
.await?;
570+
// only record ranged get request as being in flight
571+
let _in_flight_guards =
572+
range_opt.map(|range| object_storage_get_slice_in_flight_guards(range.len()));
569573
let mut buf: Vec<u8> = Vec::with_capacity(cap);
570574
download_all(get_object_output.body, &mut buf).await?;
571575
Ok(buf)

quickwit/quickwit-storage/src/opendal_storage/base.rs

+5
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use opendal::Operator;
2222
use quickwit_common::uri::Uri;
2323
use tokio::io::{AsyncRead, AsyncWriteExt};
2424

25+
use crate::metrics::object_storage_get_slice_in_flight_guards;
2526
use crate::storage::SendableAsync;
2627
use crate::{
2728
BulkDeleteError, OwnedBytes, PutPayload, Storage, StorageError, StorageErrorKind,
@@ -96,7 +97,11 @@ impl Storage for OpendalStorage {
9697

9798
async fn get_slice(&self, path: &Path, range: Range<usize>) -> StorageResult<OwnedBytes> {
9899
let path = path.as_os_str().to_string_lossy();
100+
let size = range.len();
99101
let range = range.start as u64..range.end as u64;
102+
// Unlike other object store implementations, in flight requests are
103+
// recorded before issuing the query to the object store.
104+
let _inflight_guards = object_storage_get_slice_in_flight_guards(size);
100105
let storage_content = self.op.read_with(&path).range(range).await?;
101106

102107
Ok(OwnedBytes::new(storage_content))

0 commit comments

Comments
 (0)