Skip to content

Commit 10a663d

Browse files
iwanbkLeeSmet
authored andcommitted
improve sweep object to use scan.
- Previous object_metas return _all_ keys which is not scalable. change it to scan for scalability - fix scan to stop after found data with timestamp > max_timestamp
1 parent cbe5c4b commit 10a663d

File tree

5 files changed

+135
-51
lines changed

5 files changed

+135
-51
lines changed

zstor/src/actors/meta.rs

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,21 @@ pub struct IsReplaced {
5757
pub ci: ZdbConnectionInfo,
5858
}
5959

60+
#[derive(Message)]
61+
#[rtype(result = "Result<(usize, Option<Vec<u8>>, Vec<(String, MetaData)>), MetaStoreError>")]
62+
/// Message for scan [`MetaData`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`].
63+
pub struct ScanMeta {
64+
/// Cursor to start scanning from.
65+
pub cursor: Option<Vec<u8>>,
66+
67+
/// Backend index to scan from.
68+
/// If none, it will use the backend with most keys
69+
pub backend_idx: Option<usize>,
70+
71+
/// Maximum timestamp to scan until.
72+
pub max_timestamp: Option<u64>,
73+
}
74+
6075
#[derive(Message)]
6176
#[rtype(result = "Result<Vec<(String, MetaData)>, MetaStoreError>")]
6277
/// Message for retrieving all [`MetaData`] objects in a [`MetaStore`] managed by a [`MetaStoreActor`].
@@ -208,6 +223,46 @@ impl Handler<IsReplaced> for MetaStoreActor {
208223
}
209224
}
210225

226+
impl Handler<ScanMeta> for MetaStoreActor {
227+
type Result =
228+
ResponseFuture<Result<(usize, Option<Vec<u8>>, Vec<(String, MetaData)>), MetaStoreError>>;
229+
230+
fn handle(&mut self, msg: ScanMeta, ctx: &mut Self::Context) -> Self::Result {
231+
let meta_store = self.meta_store.clone();
232+
let addr = ctx.address();
233+
234+
Box::pin(async move {
235+
let (new_cursor, backend_idx, keys) = match meta_store
236+
.scan_meta_keys(msg.cursor, msg.backend_idx, msg.max_timestamp)
237+
.await
238+
{
239+
Ok(res) => res,
240+
Err(e) => {
241+
return Err(e);
242+
}
243+
};
244+
245+
let mut metas = Vec::with_capacity(keys.len());
246+
247+
for key in keys {
248+
let meta: MetaData = match addr.send(LoadMetaByKey { key: key.clone() }).await {
249+
Ok(Ok(m)) => m.unwrap(),
250+
Ok(Err(e)) => {
251+
log::error!("Error loading meta by key:{} - {}", key, e);
252+
continue;
253+
}
254+
Err(e) => {
255+
log::error!("Error loading meta by key:{} - {}", key, e);
256+
continue;
257+
}
258+
};
259+
metas.push((key, meta));
260+
}
261+
Ok((new_cursor, backend_idx, metas))
262+
})
263+
}
264+
}
265+
211266
impl Handler<ObjectMetas> for MetaStoreActor {
212267
type Result = ResponseFuture<Result<Vec<(String, MetaData)>, MetaStoreError>>;
213268

@@ -336,7 +391,10 @@ impl Handler<RebuildAllMeta> for MetaStoreActor {
336391
}
337392
}
338393
}
339-
cursor = Some(new_cursor);
394+
if cursor.is_none() {
395+
break;
396+
}
397+
cursor = new_cursor;
340398
backend_idx = Some(idx);
341399
}
342400
})

zstor/src/actors/repairer.rs

Lines changed: 64 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,12 @@
11
use crate::actors::{
22
backends::{BackendManagerActor, RequestBackends, StateInterest},
3-
meta::{MetaStoreActor, ObjectMetas},
3+
meta::{MetaStoreActor, ScanMeta},
44
zstor::{Rebuild, ZstorActor},
55
};
66
use actix::prelude::*;
7-
use log::{error, warn};
7+
use log::{debug, error, warn};
88
use std::time::Duration;
9+
use std::time::{SystemTime, UNIX_EPOCH};
910

1011
/// Amount of time between starting a new sweep of the backend objects.
1112
const OBJECT_SWEEP_INTERVAL_SECONDS: u64 = 60 * 10;
@@ -65,52 +66,80 @@ impl Handler<SweepObjects> for RepairActor {
6566
let zstor = self.zstor.clone();
6667

6768
Box::pin(async move {
68-
let obj_metas = match meta.send(ObjectMetas).await {
69-
Err(e) => {
70-
error!("Could not request object metas from metastore: {}", e);
71-
return;
72-
}
73-
Ok(om) => match om {
74-
Err(e) => {
75-
error!("Could not get object metas from metastore: {}", e);
76-
return;
77-
}
78-
Ok(om) => om,
79-
},
80-
};
69+
let start_time = SystemTime::now()
70+
.duration_since(UNIX_EPOCH)
71+
.unwrap()
72+
.as_secs();
8173

82-
for (key, metadata) in obj_metas.into_iter() {
83-
let backend_requests = metadata
84-
.shards()
85-
.iter()
86-
.map(|shard_info| shard_info.zdb())
87-
.cloned()
88-
.collect::<Vec<_>>();
89-
let backends = match backend_manager
90-
.send(RequestBackends {
91-
backend_requests,
92-
interest: StateInterest::Readable,
74+
// start scanning from the beginning (cursor == None) and let the metastore choose the backend_id
75+
let mut cursor = None;
76+
let mut backend_idx = None;
77+
loop {
78+
// scan keys from the metastore
79+
let (idx, new_cursor, metas) = match meta
80+
.send(ScanMeta {
81+
cursor: cursor.clone(),
82+
backend_idx,
83+
max_timestamp: Some(start_time),
9384
})
9485
.await
9586
{
9687
Err(e) => {
97-
error!("Failed to request backends: {}", e);
88+
error!("Could not request meta keys from metastore: {}", e);
9889
return;
9990
}
100-
Ok(backends) => backends,
91+
Ok(result) => match result {
92+
Err(e) => {
93+
error!("Could not get meta keys from metastore: {}", e);
94+
return;
95+
}
96+
Ok(res) => res,
97+
},
10198
};
102-
let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_))));
103-
if must_rebuild {
104-
if let Err(e) = zstor
105-
.send(Rebuild {
106-
file: None,
107-
key: Some(key),
99+
100+
// iterate over the keys and check if the backends are healthy
101+
// if not, rebuild the object
102+
for (key, metadata) in metas.into_iter() {
103+
let backend_requests = metadata
104+
.shards()
105+
.iter()
106+
.map(|shard_info| shard_info.zdb())
107+
.cloned()
108+
.collect::<Vec<_>>();
109+
let backends = match backend_manager
110+
.send(RequestBackends {
111+
backend_requests,
112+
interest: StateInterest::Readable,
108113
})
109114
.await
110115
{
111-
warn!("Failed to rebuild data: {}", e);
116+
Err(e) => {
117+
error!("Failed to request backends: {}", e);
118+
return;
119+
}
120+
Ok(backends) => backends,
121+
};
122+
let must_rebuild = backends.into_iter().any(|b| !matches!(b, Ok(Some(_))));
123+
if must_rebuild {
124+
if let Err(e) = zstor
125+
.send(Rebuild {
126+
file: None,
127+
key: Some(key),
128+
})
129+
.await
130+
{
131+
warn!("Failed to rebuild data: {}", e);
132+
}
112133
}
113134
}
135+
136+
if new_cursor.is_none() {
137+
debug!("there is no more old data to rebuild");
138+
break;
139+
}
140+
141+
cursor = new_cursor;
142+
backend_idx = Some(idx);
114143
}
115144
})
116145
}

zstor/src/meta.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -213,13 +213,14 @@ pub trait MetaStore {
213213
/// If `cursor` is `None`, the scan will start from the beginning.
214214
/// If `backend_idx` is `None`, the scan will use backend which has the most keys.
215215
///
216-
/// Returns the backend index and cursor for the next scan and the keys themselves
216+
/// Returns the backend index and cursor for the next scan and the keys themselves.
217+
/// If there are no more keys with timestamp >= max_timestamp, the cursor will be `None`
217218
async fn scan_meta_keys(
218219
&self,
219220
cursor: Option<Vec<u8>>,
220221
backend_idx: Option<usize>,
221222
max_timestamp: Option<u64>,
222-
) -> Result<(usize, Vec<u8>, Vec<String>), MetaStoreError>;
223+
) -> Result<(usize, Option<Vec<u8>>, Vec<String>), MetaStoreError>;
223224

224225
/// Get the (key, metadata) for all stored objects
225226
async fn object_metas(&self) -> Result<Vec<(String, MetaData)>, MetaStoreError>;

zstor/src/zdb.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -818,15 +818,15 @@ impl UserKeyZdb {
818818
cursor: Option<Vec<u8>>,
819819
prefix: Option<&str>,
820820
max_timestamp: Option<u64>,
821-
) -> ZdbResult<(Vec<u8>, Vec<String>)> {
821+
) -> ZdbResult<(Option<Vec<u8>>, Vec<String>)> {
822822
let (cursor, entries): (Vec<u8>, Vec<ScanEntry>) = self.internal.scan(cursor).await?;
823823

824824
let mut keys = Vec::new();
825825
for entry in &entries {
826826
// check timestamp
827-
if let Some(ts) = max_timestamp {
828-
if entry[0].2 > ts {
829-
continue;
827+
if let Some(max_timestamp) = max_timestamp {
828+
if entry[0].2 > max_timestamp {
829+
return Ok((None, keys));
830830
}
831831
}
832832
// check prefix
@@ -841,7 +841,7 @@ impl UserKeyZdb {
841841
}
842842
}
843843

844-
Ok((cursor, keys))
844+
Ok((Some(cursor), keys))
845845
}
846846

847847
/// Get a stream which yields all the keys in the namespace.

zstor/src/zdb_meta.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ where
272272
prefix: Option<&str>,
273273
backend_idx: Option<usize>,
274274
max_timestamp: Option<u64>,
275-
) -> ZdbMetaStoreResult<(usize, Vec<u8>, Vec<String>)> {
275+
) -> ZdbMetaStoreResult<(usize, Option<Vec<u8>>, Vec<String>)> {
276276
let most_keys_idx = match backend_idx {
277277
Some(idx) => idx,
278278
None => self.get_most_keys_backend().await?,
@@ -613,16 +613,12 @@ where
613613
cursor: Option<Vec<u8>>,
614614
backend_idx: Option<usize>,
615615
max_timestamp: Option<u64>,
616-
) -> Result<(usize, Vec<u8>, Vec<String>), MetaStoreError> {
616+
) -> Result<(usize, Option<Vec<u8>>, Vec<String>), MetaStoreError> {
617617
let prefix = format!("/{}/meta/", self.prefix);
618618

619-
match self
620-
.scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp)
619+
self.scan_keys(cursor, Some(&prefix), backend_idx, max_timestamp)
621620
.await
622-
{
623-
Ok((backend_idx, cursor, keys)) => Ok((backend_idx, cursor, keys)),
624-
Err(e) => Err(MetaStoreError::from(e)),
625-
}
621+
.map_err(MetaStoreError::from)
626622
}
627623

628624
async fn object_metas(&self) -> Result<Vec<(String, MetaData)>, MetaStoreError> {

0 commit comments

Comments
 (0)