Skip to content

Commit 4c99db3

Browse files
committed
Add Dyn object-safe storage engine, and use for Raft log
1 parent 2f9fd16 commit 4c99db3

File tree

10 files changed

+91
-92
lines changed

10 files changed

+91
-92
lines changed

Diff for: src/bin/toydb.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,10 @@ async fn main() -> Result<()> {
3737
let path = std::path::Path::new(&cfg.data_dir);
3838
let raft_log = match cfg.storage_raft.as_str() {
3939
"bitcask" | "" => raft::Log::new(
40-
Box::new(storage::engine::BitCask::new_compact(
41-
path.join("log"),
42-
cfg.compact_threshold,
43-
)?),
40+
storage::engine::BitCask::new_compact(path.join("log"), cfg.compact_threshold)?,
4441
cfg.sync,
4542
)?,
46-
"memory" => raft::Log::new(Box::new(storage::engine::Memory::new()), false)?,
43+
"memory" => raft::Log::new(storage::engine::Memory::new(), false)?,
4744
name => return Err(Error::Config(format!("Unknown Raft storage engine {}", name))),
4845
};
4946
let raft_state: Box<dyn raft::State> = match cfg.storage_sql.as_str() {

Diff for: src/raft/log.rs

+14-75
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ impl KeyPrefix {
5656
}
5757
/// A Raft log.
5858
pub struct Log {
59-
/// The underlying storage engine.
60-
engine: Box<dyn Engine>,
59+
/// The underlying storage engine. Uses a trait object to avoid leaking
60+
/// generic type parameters throughout the Raft module.
61+
engine: Box<dyn storage::engine::Dyn>,
6162
/// The index of the last stored entry.
6263
last_index: Index,
6364
/// The term of the last stored entry.
@@ -72,7 +73,7 @@ pub struct Log {
7273

7374
impl Log {
7475
/// Creates a new log, using the given storage engine.
75-
pub fn new(mut engine: Box<dyn Engine>, sync: bool) -> Result<Self> {
76+
pub fn new(mut engine: impl storage::engine::Engine + 'static, sync: bool) -> Result<Self> {
7677
let (last_index, last_term) = engine
7778
.scan_prefix(&KeyPrefix::Entry.encode()?)
7879
.last()
@@ -86,7 +87,14 @@ impl Log {
8687
.map(|v| bincode::deserialize(&v))
8788
.transpose()?
8889
.unwrap_or((0, 0));
89-
Ok(Self { engine, last_index, last_term, commit_index, commit_term, sync })
90+
Ok(Self {
91+
engine: Box::new(engine),
92+
last_index,
93+
last_term,
94+
commit_index,
95+
commit_term,
96+
sync,
97+
})
9098
}
9199

92100
/// Decodes an entry from a log key/value pair.
@@ -210,7 +218,7 @@ impl Log {
210218
std::ops::Bound::Included(Key::Entry(Index::MAX).encode()?)
211219
}
212220
};
213-
Ok(self.engine.scan(from, to).map(|r| r.and_then(|(k, v)| Self::decode_entry(&k, &v))))
221+
Ok(self.engine.scan((from, to)).map(|r| r.and_then(|(k, v)| Self::decode_entry(&k, &v))))
214222
}
215223

216224
/// Splices a set of entries into the log. The entries must be contiguous,
@@ -260,83 +268,14 @@ impl Log {
260268
}
261269
}
262270

263-
/// A Raft log storage engine. This is a wrapper trait around storage::Engine
264-
/// with a blanket implementation, to make it object-safe (this is otherwise
265-
/// prevented by the generic scan() method). This wrapper allows Log to use a
266-
/// trait object for the engine, which prevents leaking generics throughout the
267-
/// Raft implementation.
268-
///
269-
/// TODO: Consider getting rid of this and using generics throughout.
270-
pub trait Engine: std::fmt::Display + Send + Sync {
271-
fn delete(&mut self, key: &[u8]) -> Result<()>;
272-
273-
fn flush(&mut self) -> Result<()>;
274-
275-
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
276-
277-
#[allow(clippy::type_complexity)]
278-
fn scan(
279-
&mut self,
280-
from: std::ops::Bound<Vec<u8>>,
281-
to: std::ops::Bound<Vec<u8>>,
282-
) -> Box<dyn DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + '_>;
283-
284-
#[allow(clippy::type_complexity)]
285-
fn scan_prefix(
286-
&mut self,
287-
prefix: &[u8],
288-
) -> Box<dyn DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + '_>;
289-
290-
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()>;
291-
292-
fn status(&mut self) -> Result<storage::engine::Status>;
293-
}
294-
295-
impl<E: storage::engine::Engine> Engine for E {
296-
fn delete(&mut self, key: &[u8]) -> Result<()> {
297-
self.delete(key)
298-
}
299-
300-
fn flush(&mut self) -> Result<()> {
301-
self.flush()
302-
}
303-
304-
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
305-
self.get(key)
306-
}
307-
308-
fn scan(
309-
&mut self,
310-
from: std::ops::Bound<Vec<u8>>,
311-
to: std::ops::Bound<Vec<u8>>,
312-
) -> Box<dyn DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + '_> {
313-
Box::new(self.scan((from, to)))
314-
}
315-
316-
fn scan_prefix(
317-
&mut self,
318-
prefix: &[u8],
319-
) -> Box<dyn DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + '_> {
320-
Box::new(self.scan_prefix(prefix))
321-
}
322-
323-
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> {
324-
self.set(key, value)
325-
}
326-
327-
fn status(&mut self) -> Result<storage::engine::Status> {
328-
self.status()
329-
}
330-
}
331-
332271
#[cfg(test)]
333272
mod tests {
334273
use super::*;
335274
use crate::storage::engine::Memory;
336275
use pretty_assertions::assert_eq;
337276

338277
fn setup() -> Log {
339-
Log::new(Box::new(Memory::new()), false).expect("empty engine should never fail to open")
278+
Log::new(Memory::new(), false).expect("empty engine should never fail to open")
340279
}
341280

342281
#[test]

Diff for: src/raft/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ mod node;
44
mod server;
55
mod state;
66

7-
pub use self::log::{Engine, Entry, Index, Log};
7+
pub use log::{Entry, Index, Log};
88
pub use message::{Address, Event, Message, Request, RequestID, Response};
99
pub use node::{Node, NodeID, Status, Term};
1010
pub use server::Server;

Diff for: src/raft/node/candidate.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,7 @@ mod tests {
176176
)> {
177177
let (node_tx, node_rx) = mpsc::unbounded_channel();
178178
let (state_tx, state_rx) = mpsc::unbounded_channel();
179-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
179+
let mut log = Log::new(storage::engine::Memory::new(), false)?;
180180
log.append(1, Some(vec![0x01]))?;
181181
log.append(1, Some(vec![0x02]))?;
182182
log.append(2, Some(vec![0x03]))?;

Diff for: src/raft/node/follower.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -284,7 +284,7 @@ pub mod tests {
284284
)> {
285285
let (node_tx, node_rx) = mpsc::unbounded_channel();
286286
let (state_tx, state_rx) = mpsc::unbounded_channel();
287-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
287+
let mut log = Log::new(storage::engine::Memory::new(), false)?;
288288
log.append(1, Some(vec![0x01]))?;
289289
log.append(1, Some(vec![0x02]))?;
290290
log.append(2, Some(vec![0x03]))?;
@@ -604,7 +604,7 @@ pub mod tests {
604604
// TODO: Move this into a setup function.
605605
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
606606
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
607-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
607+
let mut log = Log::new(storage::engine::Memory::new(), false)?;
608608
log.append(1, Some(vec![0x01]))?;
609609
log.append(1, Some(vec![0x02]))?;
610610
log.append(2, Some(vec![0x03]))?;

Diff for: src/raft/node/leader.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ mod tests {
306306
let (node_tx, node_rx) = mpsc::unbounded_channel();
307307
let (state_tx, state_rx) = mpsc::unbounded_channel();
308308
let peers = HashSet::from([2, 3, 4, 5]);
309-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
309+
let mut log = Log::new(storage::engine::Memory::new(), false)?;
310310
log.append(1, Some(vec![0x01]))?;
311311
log.append(1, Some(vec![0x02]))?;
312312
log.append(2, Some(vec![0x03]))?;

Diff for: src/raft/node/mod.rs

+6-6
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,7 @@ mod tests {
383383
id: 1,
384384
peers: HashSet::from_iter(peers),
385385
term: 1,
386-
log: Log::new(Box::new(storage::engine::Memory::new()), false)?,
386+
log: Log::new(storage::engine::Memory::new(), false)?,
387387
node_tx,
388388
state_tx,
389389
};
@@ -396,7 +396,7 @@ mod tests {
396396
let node = Node::new(
397397
1,
398398
HashSet::from([2, 3]),
399-
Log::new(Box::new(storage::engine::Memory::new()), false)?,
399+
Log::new(storage::engine::Memory::new(), false)?,
400400
Box::new(TestState::new(0)),
401401
node_tx,
402402
)
@@ -415,7 +415,7 @@ mod tests {
415415
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
416416
async fn new_state_apply_all() -> Result<()> {
417417
let (node_tx, _) = mpsc::unbounded_channel();
418-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
418+
let mut log = Log::new(storage::engine::Memory::new(), false)?;
419419
log.append(1, Some(vec![0x01]))?;
420420
log.append(2, None)?;
421421
log.append(2, Some(vec![0x02]))?;
@@ -433,7 +433,7 @@ mod tests {
433433
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
434434
async fn new_state_apply_partial() -> Result<()> {
435435
let (node_tx, _) = mpsc::unbounded_channel();
436-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
436+
let mut log = Log::new(storage::engine::Memory::new(), false)?;
437437
log.append(1, Some(vec![0x01]))?;
438438
log.append(2, None)?;
439439
log.append(2, Some(vec![0x02]))?;
@@ -452,7 +452,7 @@ mod tests {
452452
#[should_panic(expected = "applied index above commit index")]
453453
async fn new_state_apply_missing() {
454454
let (node_tx, _) = mpsc::unbounded_channel();
455-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false).unwrap();
455+
let mut log = Log::new(storage::engine::Memory::new(), false).unwrap();
456456
log.append(1, Some(vec![0x01])).unwrap();
457457
log.append(2, None).unwrap();
458458
log.append(2, Some(vec![0x02])).unwrap();
@@ -469,7 +469,7 @@ mod tests {
469469
let node = Node::new(
470470
1,
471471
HashSet::new(),
472-
Log::new(Box::new(storage::engine::Memory::new()), false)?,
472+
Log::new(storage::engine::Memory::new(), false)?,
473473
Box::new(TestState::new(0)),
474474
node_tx,
475475
)

Diff for: src/storage/engine/dyn.rs

+61
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
use crate::error::Result;
2+
3+
use super::{Engine, Status};
4+
5+
/// A variant of Engine that's object-safe. This is otherwise prevented by the
6+
/// generic Engine::scan() method. This uses dynamic dispatch, including for the
7+
/// scan iterator, which comes with a minor performance penalty.
8+
pub trait Dyn: Send + Sync {
9+
fn delete(&mut self, key: &[u8]) -> Result<()>;
10+
11+
fn flush(&mut self) -> Result<()>;
12+
13+
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>>;
14+
15+
fn scan(
16+
&mut self,
17+
range: (std::ops::Bound<Vec<u8>>, std::ops::Bound<Vec<u8>>),
18+
) -> ScanIterator<'_>;
19+
20+
fn scan_prefix(&mut self, prefix: &[u8]) -> ScanIterator<'_>;
21+
22+
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()>;
23+
24+
fn status(&mut self) -> Result<Status>;
25+
}
26+
27+
type ScanIterator<'a> = Box<dyn DoubleEndedIterator<Item = Result<(Vec<u8>, Vec<u8>)>> + 'a>;
28+
29+
/// Blanket implementation of Dyn for the Engine trait.
30+
impl<E: Engine> Dyn for E {
31+
fn delete(&mut self, key: &[u8]) -> Result<()> {
32+
self.delete(key)
33+
}
34+
35+
fn flush(&mut self) -> Result<()> {
36+
self.flush()
37+
}
38+
39+
fn get(&mut self, key: &[u8]) -> Result<Option<Vec<u8>>> {
40+
self.get(key)
41+
}
42+
43+
fn scan(
44+
&mut self,
45+
range: (std::ops::Bound<Vec<u8>>, std::ops::Bound<Vec<u8>>),
46+
) -> ScanIterator<'_> {
47+
Box::new(self.scan(range))
48+
}
49+
50+
fn scan_prefix(&mut self, prefix: &[u8]) -> ScanIterator<'_> {
51+
Box::new(self.scan_prefix(prefix))
52+
}
53+
54+
fn set(&mut self, key: &[u8], value: Vec<u8>) -> Result<()> {
55+
self.set(key, value)
56+
}
57+
58+
fn status(&mut self) -> Result<Status> {
59+
self.status()
60+
}
61+
}

Diff for: src/storage/engine/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
mod bitcask;
2+
mod r#dyn;
23
mod memory;
34

45
#[cfg(test)]
56
pub use super::debug::Engine as Debug;
67
pub use bitcask::BitCask;
78
pub use memory::Memory;
9+
pub use r#dyn::Dyn;
810

911
use crate::error::Result;
1012

Diff for: tests/setup.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ pub async fn server(
7979
let mut srv = Server::new(
8080
id,
8181
peers,
82-
raft::Log::new(Box::new(storage::engine::BitCask::new(dir.path().join("log"))?), false)?,
82+
raft::Log::new(storage::engine::BitCask::new(dir.path().join("log"))?, false)?,
8383
Box::new(sql::engine::Raft::new_state(storage::engine::Memory::new())?),
8484
)
8585
.await?;

0 commit comments

Comments
 (0)