diff --git a/Cargo.toml b/Cargo.toml index ca714b8..c0c9903 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,6 @@ travis-ci = {repository = "frugalos/raftlog"} [dependencies] futures = "0.1" trackable = "0.2" + +[dev-dependencies] +fibers = "0.1" \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index b20e67b..c109d06 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,8 @@ //! //! [Raft]: https://raft.github.io/ #![warn(missing_docs)] +#[cfg(test)] +extern crate fibers; extern crate futures; #[macro_use] extern crate trackable; @@ -24,6 +26,7 @@ mod error; mod io; mod node_state; mod replicated_log; +mod test_util; /// クレート固有の`Result`型. pub type Result = ::std::result::Result; diff --git a/src/node_state/common/mod.rs b/src/node_state/common/mod.rs index cf1dbaa..700a5a6 100644 --- a/src/node_state/common/mod.rs +++ b/src/node_state/common/mod.rs @@ -450,15 +450,20 @@ impl Future for InstallSnapshot { #[cfg(test)] mod tests { use super::*; - use log::{LogEntry, LogPrefix}; - use std::collections::BTreeSet; use trackable::result::TestResult; + use log::{LogEntry, LogPrefix}; + use test_util::tests::TestIoBuilder; + #[test] fn is_snapshot_installing_works() -> TestResult { - let cluster = make_cluster_config(3); - let io = NullIo(cluster.clone()); - let node_id = cluster.members().last().expect("never fails"); + let node_id: NodeId = "node1".into(); + let io = TestIoBuilder::new() + .add_member(node_id.clone()) + .add_member("node2".into()) + .add_member("node3".into()) + .finish(); + let cluster = io.cluster.clone(); let mut common = Common::new(node_id.clone(), io, cluster.clone()); let prefix = LogPrefix { tail: LogPosition::default(), @@ -475,9 +480,13 @@ mod tests { #[test] fn is_focusing_on_installing_snapshot_works() -> TestResult { - let cluster = make_cluster_config(3); - let io = NullIo(cluster.clone()); - let node_id = cluster.members().last().expect("never fails"); + let node_id: NodeId = "node1".into(); + let io = TestIoBuilder::new() + .add_member(node_id.clone()) + .add_member("node2".into()) + .add_member("node3".into()) + .finish(); + let cluster = io.cluster.clone(); let mut common = Common::new(node_id.clone(), io, cluster.clone()); let prev_term = Term::new(0); let node_prefix = LogPrefix { @@ -539,114 +548,4 @@ mod tests { Ok(()) } - - #[derive(Debug)] - /// Note: Give desired implementations if needed. - struct NullIo(ClusterConfig); - impl Io for NullIo { - type SaveBallot = SaveBallotImpl; - type LoadBallot = LoadBallotImpl; - type SaveLog = SaveLogImpl; - type LoadLog = LoadLogImpl; - type Timeout = TimeoutImpl; - - fn try_recv_message(&mut self) -> Result> { - Ok(None) - } - - fn send_message(&mut self, _message: Message) {} - - fn save_ballot(&mut self, _ballot: Ballot) -> Self::SaveBallot { - SaveBallotImpl - } - - fn load_ballot(&mut self) -> Self::LoadBallot { - LoadBallotImpl - } - - fn save_log_prefix(&mut self, _prefix: LogPrefix) -> Self::SaveLog { - SaveLogImpl - } - - fn save_log_suffix(&mut self, _suffix: &LogSuffix) -> Self::SaveLog { - SaveLogImpl - } - - fn load_log(&mut self, _start: LogIndex, _end: Option) -> Self::LoadLog { - LoadLogImpl(self.0.clone()) - } - - fn create_timeout(&mut self, _role: Role) -> Self::Timeout { - TimeoutImpl - } - } - - #[derive(Debug)] - struct SaveBallotImpl; - impl Future for SaveBallotImpl { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(())) - } - } - - #[derive(Debug)] - struct LoadBallotImpl; - impl Future for LoadBallotImpl { - type Item = Option; - type Error = Error; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(None)) - } - } - - #[derive(Debug)] - struct SaveLogImpl; - impl Future for SaveLogImpl { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(())) - } - } - - #[derive(Debug)] - struct LoadLogImpl(ClusterConfig); - impl Future for LoadLogImpl { - type Item = Log; - type Error = Error; - - fn poll(&mut self) -> Poll { - let prefix = LogPrefix { - tail: LogPosition::default(), - config: self.0.clone(), - snapshot: Vec::default(), - }; - Ok(Async::Ready(Log::Prefix(prefix))) - } - } - - #[derive(Debug)] - struct TimeoutImpl; - impl Future for TimeoutImpl { - type Item = (); - type Error = Error; - - fn poll(&mut self) -> Poll { - Ok(Async::Ready(())) - } - } - - /// Returns `ClusterConfig`. - fn make_cluster_config(size: usize) -> ClusterConfig { - let mut members = BTreeSet::new(); - for i in 0..size { - members.insert(NodeId::new(i.to_string())); - } - ClusterConfig::new(members) - } } diff --git a/src/node_state/loader.rs b/src/node_state/loader.rs index 681cc40..05c6a91 100644 --- a/src/node_state/loader.rs +++ b/src/node_state/loader.rs @@ -98,3 +98,111 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use election::Term; + use log::{LogEntry, LogPosition, LogPrefix, LogSuffix}; + use node::NodeId; + use test_util::tests::TestIoBuilder; + use trackable::result::TestResult; + + #[test] + fn it_works() -> TestResult { + let node_id: NodeId = "node1".into(); + let io = TestIoBuilder::new().add_member(node_id.clone()).finish(); + let mut handle = io.handle(); + let cluster = io.cluster.clone(); + let mut common = Common::new(node_id.clone(), io, cluster.clone()); + let mut loader = Loader::new(&mut common); + + // prefix には空の snapshot があり、tail は 1 を指している。 + // suffix には position 1 から 1 エントリが保存されている。 + // term は変更なし。 + let term = Term::new(1); + let suffix_head = LogIndex::new(1); + let prefix_tail = LogPosition { + prev_term: term.clone(), + index: suffix_head.clone(), + }; + handle.set_initial_log_prefix(LogPrefix { + tail: prefix_tail.clone(), + config: cluster.clone(), + snapshot: vec![], + }); + handle.set_initial_log_suffix( + suffix_head.clone(), + LogSuffix { + head: LogPosition { + prev_term: term.clone(), + index: suffix_head.clone(), + }, + entries: vec![LogEntry::Noop { term: term.clone() }], + }, + ); + loop { + if let Some(next) = track!(loader.run_once(&mut common))? { + assert!(next.is_candidate()); + // term は変化なし + assert_eq!(term, common.log().tail().prev_term); + // 追記されたログエントリの tail が 1 つ先に進んでいる + assert_eq!(LogIndex::new(2), common.log().tail().index); + // consumed と committed は prefix の状態のまま + assert_eq!(prefix_tail.index, common.log().consumed_tail().index); + assert_eq!(prefix_tail.index, common.log().committed_tail().index); + break; + } + } + + Ok(()) + } + + #[test] + fn it_fails_if_log_suffix_contains_older_term() -> TestResult { + let node_id: NodeId = "node1".into(); + let io = TestIoBuilder::new().add_member(node_id.clone()).finish(); + let mut handle = io.handle(); + let cluster = io.cluster.clone(); + let mut common = Common::new(node_id.clone(), io, cluster.clone()); + let mut loader = Loader::new(&mut common); + + // 古い term のログが紛れ込んでいるとエラーになる + let term = Term::new(308); + let suffix_head = LogIndex::new(28405496); + let prefix_tail = LogPosition { + prev_term: term.clone(), + index: suffix_head.clone(), + }; + handle.set_initial_log_prefix(LogPrefix { + tail: prefix_tail.clone(), + config: cluster.clone(), + snapshot: vec![], + }); + handle.set_initial_log_suffix( + suffix_head.clone(), + LogSuffix { + head: LogPosition { + prev_term: term.clone(), + index: suffix_head.clone(), + }, + entries: vec![ + LogEntry::Noop { term: term.clone() }, + LogEntry::Noop { + term: Term::new(term.as_u64() - 1), + }, + ], + }, + ); + + // Error: Other (cause; assertion failed: `self.last_record().head.prev_term < tail.prev_term`; last_record.head=LogPosition { prev_term: Term(308), index: LogIndex(28405496) }, tail=LogPosition { prev_term: Term(307), index: LogIndex(28405498) }) + //HISTORY: + // [0] at src/log/history.rs:104 + // [1] at src/node_state/common/mod.rs:78 + // [2] at src/node_state/loader.rs:58 + // [3] at src/node_state/loader.rs:198 + assert!(loader.run_once(&mut common).is_err()); + + Ok(()) + } +} diff --git a/src/node_state/mod.rs b/src/node_state/mod.rs index fb089a0..17863c1 100644 --- a/src/node_state/mod.rs +++ b/src/node_state/mod.rs @@ -35,11 +35,7 @@ impl NodeState { NodeState { common, role } } pub fn is_loading(&self) -> bool { - if let RoleState::Loader(_) = self.role { - true - } else { - false - } + self.role.is_loader() } pub fn start_election(&mut self) { if let RoleState::Follower(_) = self.role { @@ -152,3 +148,58 @@ pub enum RoleState { /// リーダ (詳細はRaftの論文を参照) Leader(Leader), } + +impl RoleState { + /// Returns true if this role state is `Loader`. + pub fn is_loader(&self) -> bool { + if let RoleState::Loader(_) = self { + true + } else { + false + } + } + + /// Returns true if this role state is `Candidate`. + #[cfg(test)] + pub fn is_candidate(&self) -> bool { + if let RoleState::Candidate(_) = self { + true + } else { + false + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use test_util::tests::TestIoBuilder; + + #[test] + fn node_state_is_loading_works() { + let io = TestIoBuilder::new().finish(); + let cluster = io.cluster.clone(); + let node = NodeState::load("test".into(), cluster, io); + assert!(node.is_loading()); + } + + #[test] + fn role_state_is_loader_works() { + let io = TestIoBuilder::new().finish(); + let cluster = io.cluster.clone(); + let mut common = Common::new("test".into(), io, cluster); + let state = RoleState::Loader(Loader::new(&mut common)); + assert!(state.is_loader()); + assert!(!state.is_candidate()); + } + + #[test] + fn role_state_is_candidate_works() { + let io = TestIoBuilder::new().finish(); + let cluster = io.cluster.clone(); + let mut common = Common::new("test".into(), io, cluster); + let state = RoleState::Candidate(Candidate::new(&mut common)); + assert!(!state.is_loader()); + assert!(state.is_candidate()); + } +} diff --git a/src/test_util.rs b/src/test_util.rs new file mode 100644 index 0000000..cb77ebc --- /dev/null +++ b/src/test_util.rs @@ -0,0 +1,236 @@ +//! テスト用のユーティリティ群。 +#[cfg(test)] +pub mod tests { + use fibers::time::timer; + use futures::{Async, Future, Poll}; + use std::collections::{BTreeSet, HashMap}; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + use trackable::error::ErrorKindExt; + + use cluster::{ClusterConfig, ClusterMembers}; + use election::{Ballot, Role}; + use io::Io; + use log::{Log, LogIndex, LogPrefix, LogSuffix}; + use message::Message; + use node::NodeId; + use {Error, ErrorKind, Result}; + + /// `TestIo`を生成する。主にクラスタ構成をするために存在する。 + /// `Log` や `Ballot` の設定は直接 `TestIo` に対して行えばよい。 + #[derive(Debug)] + pub struct TestIoBuilder { + members: ClusterMembers, + } + + impl TestIoBuilder { + pub fn new() -> Self { + Self { + members: BTreeSet::new(), + } + } + + pub fn add_member(mut self, node_id: NodeId) -> Self { + self.members.insert(node_id); + self + } + + pub fn finish(&self) -> TestIo { + TestIo { + leader_timeout: Duration::from_millis(5), + follower_timeout: Duration::from_millis(10), + candidate_timeout: Duration::from_millis(15), + cluster: ClusterConfig::new(self.members.clone()), + ballots: Arc::new(Mutex::new(Vec::new())), + logs: Arc::new(Mutex::new(HashMap::new())), + } + } + } + + /// `TestIo` を操作するためのハンドル。 + #[derive(Clone)] + pub struct TestIoHandle { + pub cluster: ClusterConfig, + logs: Arc), Log>>>, + } + + impl TestIoHandle { + /// 最初にロードされる `LogPrefix` をセットする。 + pub fn set_initial_log_prefix(&mut self, prefix: LogPrefix) { + let mut logs = self.logs.lock().expect("Never fails"); + logs.insert((LogIndex::new(0), None), Log::Prefix(prefix)); + } + + /// 最初にロードされる `LogSuffix` をセットする。 + /// `set_initial_log_prefix` を使った場合は suffix もセットしないと整合性が崩れるので注意。 + pub fn set_initial_log_suffix(&mut self, start: LogIndex, suffix: LogSuffix) { + let mut logs = self.logs.lock().expect("Never fails"); + logs.insert((start, None), Log::Suffix(suffix)); + } + + /// ログを追加する。2回目以降のログ読み込みを想定しているため end は常に指定する。 + #[allow(dead_code)] + pub fn append_log(&mut self, start: LogIndex, end: LogIndex, log: Log) { + let mut logs = self.logs.lock().expect("Never fails"); + logs.insert((start, Some(end)), log); + } + } + + /// テスト用の `Io` 実装。 + /// テストシナリオが多岐に渡ることが想定されるため、この struct では最小限の機能のみ提供する。 + /// 各 field の整合性は担保しないため、テストコード側で担保すること。 + #[derive(Debug)] + pub struct TestIo { + pub leader_timeout: Duration, + pub follower_timeout: Duration, + pub candidate_timeout: Duration, + /// クラスタ構成。 + pub cluster: ClusterConfig, + /// `LoadBallot` でロードされる。 + pub ballots: Arc>>, + /// `LoadLog` でロードされる。 + pub logs: Arc), Log>>>, + } + + impl TestIo { + pub fn handle(&self) -> TestIoHandle { + TestIoHandle { + cluster: self.cluster.clone(), + logs: self.logs.clone(), + } + } + } + + impl Io for TestIo { + type SaveBallot = NoopSaveBallot; + type LoadBallot = LoadBallotImpl; + type SaveLog = NoopSaveLog; + type LoadLog = LoadLogImpl; + type Timeout = FibersTimeout; + + fn try_recv_message(&mut self) -> Result> { + Ok(None) + } + + fn send_message(&mut self, _message: Message) {} + + fn save_ballot(&mut self, _ballot: Ballot) -> Self::SaveBallot { + NoopSaveBallot + } + + fn load_ballot(&mut self) -> Self::LoadBallot { + let mut ballots = self.ballots.lock().expect("Never fails"); + LoadBallotImpl(ballots.pop()) + } + + fn save_log_prefix(&mut self, _prefix: LogPrefix) -> Self::SaveLog { + NoopSaveLog + } + + fn save_log_suffix(&mut self, _suffix: &LogSuffix) -> Self::SaveLog { + NoopSaveLog + } + + fn load_log(&mut self, start: LogIndex, end: Option) -> Self::LoadLog { + let mut logs = self.logs.lock().expect("Never fails"); + if let Some(log) = logs.remove(&(start, end)) { + match log { + Log::Prefix(prefix) => { + return LoadLogImpl { + prefix: Some(prefix), + suffix: None, + }; + } + Log::Suffix(suffix) => { + return LoadLogImpl { + prefix: None, + suffix: Some(suffix), + }; + } + } + } + LoadLogImpl { + prefix: None, + suffix: Some(LogSuffix::default()), + } + } + + fn create_timeout(&mut self, role: Role) -> Self::Timeout { + match role { + Role::Leader => FibersTimeout(timer::timeout(self.leader_timeout)), + Role::Follower => FibersTimeout(timer::timeout(self.follower_timeout)), + Role::Candidate => FibersTimeout(timer::timeout(self.candidate_timeout)), + } + } + } + + /// 現時点では必要ないので何もしない。 + #[derive(Debug)] + pub struct NoopSaveBallot; + impl Future for NoopSaveBallot { + type Item = (); + type Error = Error; + fn poll(&mut self) -> Poll { + Ok(Async::Ready(())) + } + } + + /// 引数で与えられた `Ballot` を返す `LoadBallot` 実装。 + #[derive(Debug)] + pub struct LoadBallotImpl(Option); + impl Future for LoadBallotImpl { + type Item = Option; + type Error = Error; + fn poll(&mut self) -> Poll { + Ok(Async::Ready(self.0.clone())) + } + } + + /// 現時点では必要ないので何もしない。 + #[derive(Debug)] + pub struct NoopSaveLog; + impl Future for NoopSaveLog { + type Item = (); + type Error = Error; + fn poll(&mut self) -> Poll { + Ok(Async::Ready(())) + } + } + + /// `LogPrefix` か `LogSuffix` のどちらかをロードする `LoadLog` 実装。 + #[derive(Debug)] + pub struct LoadLogImpl { + prefix: Option, + suffix: Option, + } + impl Future for LoadLogImpl { + type Item = Log; + type Error = Error; + fn poll(&mut self) -> Poll { + if let Some(prefix) = self.prefix.clone() { + return Ok(Async::Ready(Log::Prefix(prefix))); + } + if let Some(suffix) = self.suffix.clone() { + return Ok(Async::Ready(Log::Suffix(suffix))); + } + + Err(ErrorKind::InconsistentState + .cause("Neither prefix or suffix is not given") + .into()) + } + } + + /// fibers を使ったタイムアウトの実装。 + #[derive(Debug)] + pub struct FibersTimeout(timer::Timeout); + impl Future for FibersTimeout { + type Item = (); + type Error = Error; + + fn poll(&mut self) -> Poll { + self.0 + .poll() + .map_err(|_| ErrorKind::Other.cause("Broken timer").into()) + } + } +}