Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test cases for Loader #21

Merged
merged 4 commits into from
Mar 6, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,6 @@ travis-ci = {repository = "frugalos/raftlog"}
[dependencies]
futures = "0.1"
trackable = "0.2"

[dev-dependencies]
fibers = "0.1"
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
//!
//! [Raft]: https://raft.github.io/
#![warn(missing_docs)]
#[cfg(test)]
extern crate fibers;
extern crate futures;
#[macro_use]
extern crate trackable;
Expand All @@ -24,6 +26,7 @@ mod error;
mod io;
mod node_state;
mod replicated_log;
mod test_util;

/// クレート固有の`Result`型.
pub type Result<T> = ::std::result::Result<T, Error>;
135 changes: 17 additions & 118 deletions src/node_state/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -450,15 +450,20 @@ impl<IO: Io> Future for InstallSnapshot<IO> {
#[cfg(test)]
mod tests {
use super::*;
use log::{LogEntry, LogPrefix};
use std::collections::BTreeSet;
use trackable::result::TestResult;

use log::{LogEntry, LogPrefix};
use test_util::tests::TestIoBuilder;

#[test]
fn is_snapshot_installing_works() -> TestResult {
let cluster = make_cluster_config(3);
let io = NullIo(cluster.clone());
let node_id = cluster.members().last().expect("never fails");
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new()
.add_member(node_id.clone())
.add_member("node2".into())
.add_member("node3".into())
.finish();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let prefix = LogPrefix {
tail: LogPosition::default(),
Expand All @@ -475,9 +480,13 @@ mod tests {

#[test]
fn is_focusing_on_installing_snapshot_works() -> TestResult {
let cluster = make_cluster_config(3);
let io = NullIo(cluster.clone());
let node_id = cluster.members().last().expect("never fails");
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new()
.add_member(node_id.clone())
.add_member("node2".into())
.add_member("node3".into())
.finish();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let prev_term = Term::new(0);
let node_prefix = LogPrefix {
Expand Down Expand Up @@ -539,114 +548,4 @@ mod tests {

Ok(())
}

#[derive(Debug)]
/// Note: Give desired implementations if needed.
struct NullIo(ClusterConfig);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NullIo は削除して、まともにテストで使えるように TestIo として再実装。

impl Io for NullIo {
type SaveBallot = SaveBallotImpl;
type LoadBallot = LoadBallotImpl;
type SaveLog = SaveLogImpl;
type LoadLog = LoadLogImpl;
type Timeout = TimeoutImpl;

fn try_recv_message(&mut self) -> Result<Option<Message>> {
Ok(None)
}

fn send_message(&mut self, _message: Message) {}

fn save_ballot(&mut self, _ballot: Ballot) -> Self::SaveBallot {
SaveBallotImpl
}

fn load_ballot(&mut self) -> Self::LoadBallot {
LoadBallotImpl
}

fn save_log_prefix(&mut self, _prefix: LogPrefix) -> Self::SaveLog {
SaveLogImpl
}

fn save_log_suffix(&mut self, _suffix: &LogSuffix) -> Self::SaveLog {
SaveLogImpl
}

fn load_log(&mut self, _start: LogIndex, _end: Option<LogIndex>) -> Self::LoadLog {
LoadLogImpl(self.0.clone())
}

fn create_timeout(&mut self, _role: Role) -> Self::Timeout {
TimeoutImpl
}
}

#[derive(Debug)]
struct SaveBallotImpl;
impl Future for SaveBallotImpl {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(()))
}
}

#[derive(Debug)]
struct LoadBallotImpl;
impl Future for LoadBallotImpl {
type Item = Option<Ballot>;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(None))
}
}

#[derive(Debug)]
struct SaveLogImpl;
impl Future for SaveLogImpl {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(()))
}
}

#[derive(Debug)]
struct LoadLogImpl(ClusterConfig);
impl Future for LoadLogImpl {
type Item = Log;
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
let prefix = LogPrefix {
tail: LogPosition::default(),
config: self.0.clone(),
snapshot: Vec::default(),
};
Ok(Async::Ready(Log::Prefix(prefix)))
}
}

#[derive(Debug)]
struct TimeoutImpl;
impl Future for TimeoutImpl {
type Item = ();
type Error = Error;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
Ok(Async::Ready(()))
}
}

/// Returns `ClusterConfig`.
fn make_cluster_config(size: usize) -> ClusterConfig {
let mut members = BTreeSet::new();
for i in 0..size {
members.insert(NodeId::new(i.to_string()));
}
ClusterConfig::new(members)
}
}
108 changes: 108 additions & 0 deletions src/node_state/loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,111 @@ where
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use election::Term;
use log::{LogEntry, LogPosition, LogPrefix, LogSuffix};
use node::NodeId;
use test_util::tests::TestIoBuilder;
use trackable::result::TestResult;

#[test]
fn it_works() -> TestResult {
let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new().add_member(node_id.clone()).finish();
let mut handle = io.handle();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let mut loader = Loader::new(&mut common);

// prefix には空の snapshot があり、tail は 1 を指している。
// suffix には position 1 から 1 エントリが保存されている。
// term は変更なし。
let term = Term::new(1);
let suffix_head = LogIndex::new(1);
let prefix_tail = LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
};
handle.set_initial_log_prefix(LogPrefix {
tail: prefix_tail.clone(),
config: cluster.clone(),
snapshot: vec![],
});
handle.set_initial_log_suffix(
suffix_head.clone(),
LogSuffix {
head: LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
},
entries: vec![LogEntry::Noop { term: term.clone() }],
},
);
loop {
if let Some(next) = track!(loader.run_once(&mut common))? {
assert!(next.is_candidate());
// term は変化なし
assert_eq!(term, common.log().tail().prev_term);
// 追記されたログエントリの tail が 1 つ先に進んでいる
assert_eq!(LogIndex::new(2), common.log().tail().index);
// consumed と committed は prefix の状態のまま
assert_eq!(prefix_tail.index, common.log().consumed_tail().index);
assert_eq!(prefix_tail.index, common.log().committed_tail().index);
break;
}
}

Ok(())
}

#[test]
fn it_fails_if_log_suffix_contains_older_term() -> TestResult {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#18 を再現させるテストコード。

let node_id: NodeId = "node1".into();
let io = TestIoBuilder::new().add_member(node_id.clone()).finish();
let mut handle = io.handle();
let cluster = io.cluster.clone();
let mut common = Common::new(node_id.clone(), io, cluster.clone());
let mut loader = Loader::new(&mut common);

// 古い term のログが紛れ込んでいるとエラーになる
let term = Term::new(308);
let suffix_head = LogIndex::new(28405496);
let prefix_tail = LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
};
handle.set_initial_log_prefix(LogPrefix {
tail: prefix_tail.clone(),
config: cluster.clone(),
snapshot: vec![],
});
handle.set_initial_log_suffix(
suffix_head.clone(),
LogSuffix {
head: LogPosition {
prev_term: term.clone(),
index: suffix_head.clone(),
},
entries: vec![
LogEntry::Noop { term: term.clone() },
LogEntry::Noop {
term: Term::new(term.as_u64() - 1),
},
],
},
);

// Error: Other (cause; assertion failed: `self.last_record().head.prev_term < tail.prev_term`; last_record.head=LogPosition { prev_term: Term(308), index: LogIndex(28405496) }, tail=LogPosition { prev_term: Term(307), index: LogIndex(28405498) })
//HISTORY:
// [0] at src/log/history.rs:104
// [1] at src/node_state/common/mod.rs:78
// [2] at src/node_state/loader.rs:58
// [3] at src/node_state/loader.rs:198
assert!(loader.run_once(&mut common).is_err());

Ok(())
}
}
61 changes: 56 additions & 5 deletions src/node_state/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,7 @@ impl<IO: Io> NodeState<IO> {
NodeState { common, role }
}
pub fn is_loading(&self) -> bool {
if let RoleState::Loader(_) = self.role {
true
} else {
false
}
self.role.is_loader()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

コードを整理して、触ったコードに関わる箇所はテストを追加。

}
pub fn start_election(&mut self) {
if let RoleState::Follower(_) = self.role {
Expand Down Expand Up @@ -152,3 +148,58 @@ pub enum RoleState<IO: Io> {
/// リーダ (詳細はRaftの論文を参照)
Leader(Leader<IO>),
}

impl<IO: Io> RoleState<IO> {
/// Returns true if this role state is `Loader`.
pub fn is_loader(&self) -> bool {
if let RoleState::Loader(_) = self {
true
} else {
false
}
}

/// Returns true if this role state is `Candidate`.
#[cfg(test)]
pub fn is_candidate(&self) -> bool {
if let RoleState::Candidate(_) = self {
true
} else {
false
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use test_util::tests::TestIoBuilder;

#[test]
fn node_state_is_loading_works() {
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let node = NodeState::load("test".into(), cluster, io);
assert!(node.is_loading());
}

#[test]
fn role_state_is_loader_works() {
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let mut common = Common::new("test".into(), io, cluster);
let state = RoleState::Loader(Loader::new(&mut common));
assert!(state.is_loader());
assert!(!state.is_candidate());
}

#[test]
fn role_state_is_candidate_works() {
let io = TestIoBuilder::new().finish();
let cluster = io.cluster.clone();
let mut common = Common::new("test".into(), io, cluster);
let state = RoleState::Candidate(Candidate::new(&mut common));
assert!(!state.is_loader());
assert!(state.is_candidate());
}
}
Loading