Skip to content

Commit 1ba9a17

Browse files
iwanbkLeeSmet
authored andcommitted
add retry mechanism on data loading when rebuild the data.
We only retry on temporary errors like timeout. retry also only enabled on data rebuild, not retrieve, because latency is not crucial during data rebuild.
1 parent 7101627 commit 1ba9a17

File tree

2 files changed

+39
-5
lines changed

2 files changed

+39
-5
lines changed

zstor/src/actors/zstor.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ impl Handler<Retrieve> for ZstorActor {
258258
)
259259
})?;
260260

261-
let shards = load_data(&metadata).await?;
261+
let shards = load_data(&metadata, 1).await?;
262262

263263
pipeline
264264
.send(RecoverFile {
@@ -335,7 +335,7 @@ impl Handler<Rebuild> for ZstorActor {
335335
};
336336

337337
// load the data from the storage backends
338-
let input = load_data(&old_metadata).await?;
338+
let input = load_data(&old_metadata, 3).await?;
339339
let existing_data = input.clone();
340340

341341
// rebuild the data (in memory only)
@@ -454,7 +454,8 @@ impl Handler<ReloadConfig> for ZstorActor {
454454
}
455455
}
456456

457-
async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
457+
/// load data from the storage backends
458+
async fn load_data(metadata: &MetaData, max_attempts: u64) -> ZstorResult<Vec<Option<Vec<u8>>>> {
458459
// attempt to retrieve all shards
459460
let mut shard_loads: Vec<JoinHandle<(usize, Result<(_, _), ZstorError>)>> =
460461
Vec::with_capacity(metadata.shards().len());
@@ -468,7 +469,7 @@ async fn load_data(metadata: &MetaData) -> ZstorResult<Vec<Option<Vec<u8>>>> {
468469
Ok(ok) => ok,
469470
Err(e) => return (idx, Err(e.into())),
470471
};
471-
match db.get(&key).await {
472+
match db.get_with_retry(&key, max_attempts).await {
472473
Ok(potential_shard) => match potential_shard {
473474
Some(shard) => (idx, Ok((shard, chksum))),
474475
None => (

zstor/src/zdb.rs

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -759,6 +759,39 @@ impl SequentialZdb {
759759
Ok(Some(data))
760760
}
761761

762+
/// get data from the zdb with a retry mechanism.
763+
/// The retry will only happen at temporary errors,
764+
/// currently only timeouts.
765+
pub async fn get_with_retry(
766+
&self,
767+
keys: &[Key],
768+
max_attempts: u64,
769+
) -> ZdbResult<Option<Vec<u8>>> {
770+
if max_attempts < 2 {
771+
return self.get(keys).await;
772+
}
773+
774+
let mut last_error = None;
775+
776+
for attempt in 0..max_attempts {
777+
match self.get(keys).await {
778+
Ok(result) => return Ok(result),
779+
Err(e) => {
780+
if e.internal == ErrorCause::Timeout {
781+
last_error = Some(e);
782+
if attempt < max_attempts - 1 {
783+
debug!("timeout error on attempt {}, retrying", attempt + 1);
784+
}
785+
continue;
786+
}
787+
return Err(e);
788+
}
789+
}
790+
}
791+
792+
Err(last_error.unwrap())
793+
}
794+
762795
/// Returns the [`ZdbConnectionInfo`] object used to connect to this db.
763796
#[inline]
764797
pub fn connection_info(&self) -> &ZdbConnectionInfo {
@@ -1037,7 +1070,7 @@ impl ZdbError {
10371070
}
10381071

10391072
/// The cause of a zero db error.
1040-
#[derive(Debug)]
1073+
#[derive(Debug, PartialEq)]
10411074
enum ErrorCause {
10421075
Redis(redis::RedisError),
10431076
Other(String),

0 commit comments

Comments
 (0)