Skip to content

Commit

Permalink
Merge pull request #21 from shinnya/add-tests
Browse files Browse the repository at this point in the history
Add test cases for Loader
  • Loading branch information
yuezato authored Mar 6, 2019
2 parents ca18f26 + 6f738d1 commit 3ebb628
Show file tree
Hide file tree
Showing 6 changed files with 423 additions and 123 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ travis-ci = {repository = "frugalos/raftlog"}
[dependencies]
futures = "0.1"
trackable = "0.2"

[dev-dependencies]
fibers = "0.1"
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
//!
//! [Raft]: https://raft.github.io/
#![warn(missing_docs)]
#[cfg(test)]
extern crate fibers;
extern crate futures;
#[macro_use]
extern crate trackable;
Expand All @@ -24,6 +26,7 @@ mod error;
mod io;
mod node_state;
mod replicated_log;
mod test_util;

/// クレート固有の`Result`型.
pub type Result<T> = ::std::result::Result<T, Error>;
135 changes: 17 additions & 118 deletions src/node_state/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,20 @@ impl<IO: Io> Future for InstallSnapshot<IO> {
#[cfg(test)]
mod tests {
use super::*;
use log::{LogEntry, LogPrefix};
use std::collections::BTreeSet;
use trackable::result::TestResult;

use log::{LogEntry, LogPrefix};
use test_util::tests::TestIoBuilder;

#[test]
fn is_snapshot_installing_works() -> TestResult {
let cluster = make_cluster_config(3);
let io = NullIo(cluster.clone());
let node_id = cluster.members().last().expect("never fails");
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new()
.add_member(node_id.clone())
.add_member("node2".into())
.add_member("node3".into())
.finish();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let prefix = LogPrefix {
tail: LogPosition::default(),
Expand All @@ -475,9 +480,13 @@ mod tests {

#[test]
fn is_focusing_on_installing_snapshot_works() -> TestResult {
let cluster = make_cluster_config(3);
let io = NullIo(cluster.clone());
let node_id = cluster.members().last().expect("never fails");
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new()
.add_member(node_id.clone())
.add_member("node2".into())
.add_member("node3".into())
.finish();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let prev_term = Term::new(0);
let node_prefix = LogPrefix {
Expand Down Expand Up @@ -539,114 +548,4 @@ mod tests {

Ok(())
}

#[derive(Debug)]
/// Note: Give desired implementations if needed.
struct NullIo(ClusterConfig);
impl Io for NullIo {
type SaveBallot = SaveBallotImpl;
type LoadBallot = LoadBallotImpl;
type SaveLog = SaveLogImpl;
type LoadLog = LoadLogImpl;
type Timeout = TimeoutImpl;

fn try_recv_message(&mut self) -> Result<Option<Message>> {
Ok(None)
}

fn send_message(&mut self, _message: Message) {}

fn save_ballot(&mut self, _ballot: Ballot) -> Self::SaveBallot {
SaveBallotImpl
}

fn load_ballot(&mut self) -> Self::LoadBallot {
LoadBallotImpl
}

fn save_log_prefix(&mut self, _prefix: LogPrefix) -> Self::SaveLog {
SaveLogImpl
}

fn save_log_suffix(&mut self, _suffix: &LogSuffix) -> Self::SaveLog {
SaveLogImpl
}

fn load_log(&mut self, _start: LogIndex, _end: Option<LogIndex>) -> Self::LoadLog {
LoadLogImpl(self.0.clone())
}

fn create_timeout(&mut self, _role: Role) -> Self::Timeout {
TimeoutImpl
}
}

#[derive(Debug)]
struct SaveBallotImpl;
impl Future for SaveBallotImpl {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(()))
}
}

#[derive(Debug)]
struct LoadBallotImpl;
impl Future for LoadBallotImpl {
type Item = Option<Ballot>;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(None))
}
}

#[derive(Debug)]
struct SaveLogImpl;
impl Future for SaveLogImpl {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(()))
}
}

#[derive(Debug)]
struct LoadLogImpl(ClusterConfig);
impl Future for LoadLogImpl {
type Item = Log;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let prefix = LogPrefix {
tail: LogPosition::default(),
config: self.0.clone(),
snapshot: Vec::default(),
};
Ok(Async::Ready(Log::Prefix(prefix)))
}
}

#[derive(Debug)]
struct TimeoutImpl;
impl Future for TimeoutImpl {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(()))
}
}

/// Returns `ClusterConfig`.
fn make_cluster_config(size: usize) -> ClusterConfig {
let mut members = BTreeSet::new();
for i in 0..size {
members.insert(NodeId::new(i.to_string()));
}
ClusterConfig::new(members)
}
}
108 changes: 108 additions & 0 deletions src/node_state/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,111 @@ where
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use election::Term;
use log::{LogEntry, LogPosition, LogPrefix, LogSuffix};
use node::NodeId;
use test_util::tests::TestIoBuilder;
use trackable::result::TestResult;

#[test]
fn it_works() -> TestResult {
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new().add_member(node_id.clone()).finish();
let mut handle = io.handle();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let mut loader = Loader::new(&mut common);

// prefix には空の snapshot があり、tail は 1 を指している。
// suffix には position 1 から 1 エントリが保存されている。
// term は変更なし。
let term = Term::new(1);
let suffix_head = LogIndex::new(1);
let prefix_tail = LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
};
handle.set_initial_log_prefix(LogPrefix {
tail: prefix_tail.clone(),
config: cluster.clone(),
snapshot: vec![],
});
handle.set_initial_log_suffix(
suffix_head.clone(),
LogSuffix {
head: LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
},
entries: vec![LogEntry::Noop { term: term.clone() }],
},
);
loop {
if let Some(next) = track!(loader.run_once(&mut common))? {
assert!(next.is_candidate());
// term は変化なし
assert_eq!(term, common.log().tail().prev_term);
// 追記されたログエントリの tail が 1 つ先に進んでいる
assert_eq!(LogIndex::new(2), common.log().tail().index);
// consumed と committed は prefix の状態のまま
assert_eq!(prefix_tail.index, common.log().consumed_tail().index);
assert_eq!(prefix_tail.index, common.log().committed_tail().index);
break;
}
}

Ok(())
}

#[test]
fn it_fails_if_log_suffix_contains_older_term() -> TestResult {
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new().add_member(node_id.clone()).finish();
let mut handle = io.handle();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let mut loader = Loader::new(&mut common);

// 古い term のログが紛れ込んでいるとエラーになる
let term = Term::new(308);
let suffix_head = LogIndex::new(28405496);
let prefix_tail = LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
};
handle.set_initial_log_prefix(LogPrefix {
tail: prefix_tail.clone(),
config: cluster.clone(),
snapshot: vec![],
});
handle.set_initial_log_suffix(
suffix_head.clone(),
LogSuffix {
head: LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
},
entries: vec![
LogEntry::Noop { term: term.clone() },
LogEntry::Noop {
term: Term::new(term.as_u64() - 1),
},
],
},
);

// Error: Other (cause; assertion failed: `self.last_record().head.prev_term < tail.prev_term`; last_record.head=LogPosition { prev_term: Term(308), index: LogIndex(28405496) }, tail=LogPosition { prev_term: Term(307), index: LogIndex(28405498) })
//HISTORY:
// [0] at src/log/history.rs:104
// [1] at src/node_state/common/mod.rs:78
// [2] at src/node_state/loader.rs:58
// [3] at src/node_state/loader.rs:198
assert!(loader.run_once(&mut common).is_err());

Ok(())
}
}
61 changes: 56 additions & 5 deletions src/node_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ impl<IO: Io> NodeState<IO> {
NodeState { common, role }
}
pub fn is_loading(&self) -> bool {
if let RoleState::Loader(_) = self.role {
true
} else {
false
}
self.role.is_loader()
}
pub fn start_election(&mut self) {
if let RoleState::Follower(_) = self.role {
Expand Down Expand Up @@ -152,3 +148,58 @@ pub enum RoleState<IO: Io> {
/// リーダ (詳細はRaftの論文を参照)
Leader(Leader<IO>),
}

impl<IO: Io> RoleState<IO> {
/// Returns true if this role state is `Loader`.
pub fn is_loader(&self) -> bool {
if let RoleState::Loader(_) = self {
true
} else {
false
}
}

/// Returns true if this role state is `Candidate`.
#[cfg(test)]
pub fn is_candidate(&self) -> bool {
if let RoleState::Candidate(_) = self {
true
} else {
false
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use test_util::tests::TestIoBuilder;

#[test]
fn node_state_is_loading_works() {
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let node = NodeState::load("test".into(), cluster, io);
assert!(node.is_loading());
}

#[test]
fn role_state_is_loader_works() {
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let mut common = Common::new("test".into(), io, cluster);
let state = RoleState::Loader(Loader::new(&mut common));
assert!(state.is_loader());
assert!(!state.is_candidate());
}

#[test]
fn role_state_is_candidate_works() {
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let mut common = Common::new("test".into(), io, cluster);
let state = RoleState::Candidate(Candidate::new(&mut common));
assert!(!state.is_loader());
assert!(state.is_candidate());
}
}
Loading

0 comments on commit 3ebb628

Please sign in to comment.