12
12
use std:: fmt:: Debug ;
13
13
use std:: sync:: Arc ;
14
14
15
- use anyhow:: anyhow;
15
+ use anyhow:: { anyhow, Context } ;
16
16
use async_trait:: async_trait;
17
17
use azure_core:: StatusCode ;
18
18
use azure_identity:: create_default_credential;
19
19
use azure_storage:: { prelude:: * , CloudLocation , EMULATOR_ACCOUNT } ;
20
+ use azure_storage_blobs:: blob:: operations:: GetBlobResponse ;
20
21
use azure_storage_blobs:: prelude:: * ;
21
22
use bytes:: Bytes ;
23
+ use futures_util:: stream:: FuturesOrdered ;
22
24
use futures_util:: StreamExt ;
23
25
use tracing:: { info, warn} ;
24
26
use url:: Url ;
@@ -185,29 +187,12 @@ impl Blob for AzureBlob {
185
187
async fn get ( & self , key : & str ) -> Result < Option < SegmentedBytes > , ExternalError > {
186
188
let path = self . get_path ( key) ;
187
189
let blob = self . client . blob_client ( path) ;
188
- let mut segments: Vec < MaybeLgBytes > = vec ! [ ] ;
189
-
190
- // TODO: the default chunk size is 1MB. We have not tried tuning it,
191
- // but making this configurable / running some benchmarks could be
192
- // valuable.
193
- let mut stream = blob. get ( ) . into_stream ( ) ;
194
- while let Some ( value) = stream. next ( ) . await {
195
- let response = match value {
196
- Ok ( v) => v,
197
- Err ( e) => {
198
- if let Some ( e) = e. as_http_error ( ) {
199
- if e. status ( ) == StatusCode :: NotFound {
200
- return Ok ( None ) ;
201
- }
202
- }
203
-
204
- return Err ( ExternalError :: from ( anyhow ! (
205
- "Azure blob get error: {:?}" ,
206
- e
207
- ) ) ) ;
208
- }
209
- } ;
210
190
191
+ /// Fetch a the body of a single [`GetBlobResponse`].
192
+ async fn fetch_chunk (
193
+ response : GetBlobResponse ,
194
+ metrics : S3BlobMetrics ,
195
+ ) -> Result < MaybeLgBytes , ExternalError > {
211
196
let content_length = response. blob . properties . content_length ;
212
197
213
198
// Here we're being quite defensive. If `content_length` comes back
@@ -216,35 +201,86 @@ impl Blob for AzureBlob {
216
201
// buffer into lgalloc.
217
202
let mut buffer = match content_length {
218
203
1 .. => {
219
- let region = self
220
- . metrics
204
+ let region = metrics
221
205
. lgbytes
222
206
. persist_azure
223
207
. new_region ( usize:: cast_from ( content_length) ) ;
224
208
PreSizedBuffer :: Sized ( region)
225
209
}
226
- 0 => PreSizedBuffer :: Unknown ( Vec :: new ( ) ) ,
210
+ 0 => PreSizedBuffer :: Unknown ( SegmentedBytes :: new ( ) ) ,
227
211
} ;
228
212
229
213
let mut body = response. data ;
230
214
while let Some ( value) = body. next ( ) . await {
231
215
let value = value. map_err ( |e| {
232
216
ExternalError :: from ( anyhow ! ( "Azure blob get body error: {}" , e) )
233
217
} ) ?;
234
- buffer. extend_from_slice ( & value) ;
218
+
219
+ match & mut buffer {
220
+ PreSizedBuffer :: Sized ( region) => region. extend_from_slice ( & value) ,
221
+ PreSizedBuffer :: Unknown ( segments) => segments. push ( value) ,
222
+ }
235
223
}
236
224
237
225
// Spill our bytes to lgalloc, if they aren't already.
238
- let lg_bytes = match buffer {
226
+ let lgbytes = match buffer {
239
227
PreSizedBuffer :: Sized ( region) => LgBytes :: from ( Arc :: new ( region) ) ,
240
- PreSizedBuffer :: Unknown ( buffer) => {
241
- self . metrics . lgbytes . persist_azure . try_mmap ( buffer)
228
+ // Now that we've collected all of the segments, we know the size of our region.
229
+ PreSizedBuffer :: Unknown ( segments) => {
230
+ let mut region = metrics. lgbytes . persist_azure . new_region ( segments. len ( ) ) ;
231
+ for segment in segments. into_segments ( ) {
232
+ region. extend_from_slice ( segment. as_ref ( ) ) ;
233
+ }
234
+ LgBytes :: from ( Arc :: new ( region) )
242
235
}
243
236
} ;
244
- segments. push ( MaybeLgBytes :: LgBytes ( lg_bytes) ) ;
237
+
238
+ // Report if the content-length header didn't match the number of
239
+ // bytes we read from the network.
240
+ if content_length != u64:: cast_from ( lgbytes. len ( ) ) {
241
+ metrics. get_invalid_resp . inc ( ) ;
242
+ }
243
+
244
+ Ok ( MaybeLgBytes :: LgBytes ( lgbytes) )
245
245
}
246
246
247
- Ok ( Some ( SegmentedBytes :: from ( segments) ) )
247
+ let mut requests = FuturesOrdered :: new ( ) ;
248
+ // TODO: the default chunk size is 1MB. We have not tried tuning it,
249
+ // but making this configurable / running some benchmarks could be
250
+ // valuable.
251
+ let mut stream = blob. get ( ) . into_stream ( ) ;
252
+
253
+ while let Some ( value) = stream. next ( ) . await {
254
+ // Return early if any of the individual fetch requests return an error.
255
+ let response = match value {
256
+ Ok ( v) => v,
257
+ Err ( e) => {
258
+ if let Some ( e) = e. as_http_error ( ) {
259
+ if e. status ( ) == StatusCode :: NotFound {
260
+ return Ok ( None ) ;
261
+ }
262
+ }
263
+
264
+ return Err ( ExternalError :: from ( anyhow ! (
265
+ "Azure blob get error: {:?}" ,
266
+ e
267
+ ) ) ) ;
268
+ }
269
+ } ;
270
+
271
+ // Drive all of the fetch requests concurrently.
272
+ let metrics = self . metrics . clone ( ) ;
273
+ requests. push_back ( fetch_chunk ( response, metrics) ) ;
274
+ }
275
+
276
+ // Await on all of our chunks.
277
+ let mut segments = SegmentedBytes :: with_capacity ( requests. len ( ) ) ;
278
+ while let Some ( body) = requests. next ( ) . await {
279
+ let segment = body. context ( "azure get body err" ) ?;
280
+ segments. push ( segment) ;
281
+ }
282
+
283
+ Ok ( Some ( segments) )
248
284
}
249
285
250
286
async fn list_keys_and_metadata (
@@ -343,16 +379,7 @@ impl Blob for AzureBlob {
343
379
/// that as we read bytes off the network.
344
380
enum PreSizedBuffer {
345
381
Sized ( MetricsRegion < u8 > ) ,
346
- Unknown ( Vec < u8 > ) ,
347
- }
348
-
349
- impl PreSizedBuffer {
350
- fn extend_from_slice ( & mut self , slice : & [ u8 ] ) {
351
- match self {
352
- PreSizedBuffer :: Sized ( region) => region. extend_from_slice ( slice) ,
353
- PreSizedBuffer :: Unknown ( buffer) => buffer. extend_from_slice ( slice) ,
354
- }
355
- }
382
+ Unknown ( SegmentedBytes ) ,
356
383
}
357
384
358
385
#[ cfg( test) ]
0 commit comments