diff --git a/src/raft/message.rs b/src/raft/message.rs index a76323607..39b5871f6 100644 --- a/src/raft/message.rs +++ b/src/raft/message.rs @@ -54,11 +54,6 @@ pub enum Event { }, /// Followers confirm loyalty to leader after heartbeats. ConfirmLeader { - /// The commit_index of the original leader heartbeat, to confirm - /// read requests. - /// - /// TODO: remove these when migrated to read_seq. - commit_index: Index, /// If false, the follower does not have the entry at commit_index /// and would like the leader to replicate it. has_committed: bool, diff --git a/src/raft/mod.rs b/src/raft/mod.rs index e002247e5..c33c03d38 100644 --- a/src/raft/mod.rs +++ b/src/raft/mod.rs @@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log}; pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response}; pub use node::{Node, NodeID, Status, Term}; pub use server::Server; -pub use state::{Driver, Instruction, State}; +pub use state::State; diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index 9097ff49e..4ca272721 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -167,7 +167,9 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO mod tests { + use super::super::super::state::tests::TestState; use super::super::super::{Entry, Instruction, Log, Request}; use super::super::tests::{assert_messages, assert_node}; use super::*; @@ -182,7 +184,9 @@ mod tests { )> { let (node_tx, node_rx) = mpsc::unbounded_channel(); let (state_tx, state_rx) = mpsc::unbounded_channel(); + let state = Box::new(TestState::new(0)); let mut log = Log::new(storage::engine::Memory::new(), false)?; + log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; log.append(2, Some(vec![0x03]))?; @@ -194,8 +198,8 @@ mod tests { peers: HashSet::from([2, 3, 4, 5]), term: 3, log, + state, node_tx, - state_tx, role: Candidate::new(), }; node.role.votes.insert(1); diff --git a/src/raft/node/follower.rs b/src/raft/node/follower.rs index be3cdc7b4..363291768 100644 --- a/src/raft/node/follower.rs +++ b/src/raft/node/follower.rs @@ -1,4 +1,4 @@ -use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response}; +use super::super::{Address, Event, Log, Message, RequestID, Response, State}; use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks}; use crate::error::{Error, Result}; @@ -43,12 +43,12 @@ impl RawNode { id: NodeID, peers: HashSet, mut log: Log, + state: Box, node_tx: mpsc::UnboundedSender, - state_tx: mpsc::UnboundedSender, ) -> Result { let (term, voted_for) = log.get_term()?; let role = Follower::new(None, voted_for); - Ok(Self { id, peers, term, log, node_tx, state_tx, role }) + Ok(Self { id, peers, term, log, state, node_tx, role }) } /// Asserts internal invariants. @@ -150,20 +150,24 @@ impl RawNode { None => self = self.into_follower(Some(from), msg.term)?, } - // Advance commit index and apply entries if possible. + // Respond to the heartbeat. let has_committed = self.log.has(commit_index, commit_term)?; + self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?; + + // Advance commit index. let (old_commit_index, _) = self.log.get_commit_index(); if has_committed && commit_index > old_commit_index { self.log.commit(commit_index)?; - let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?; + } + + // Apply entries. + let applied_index = self.state.get_applied_index(); + if applied_index < commit_index { + let mut scan = self.log.scan((applied_index + 1)..=commit_index)?; while let Some(entry) = scan.next().transpose()? { - self.state_tx.send(Instruction::Apply { entry })?; + Self::apply(&mut self.state, entry)?.ok(); } } - self.send( - msg.from, - Event::ConfirmLeader { commit_index, has_committed, read_seq }, - )?; } // Replicate entries from the leader. If we don't have a leader in @@ -281,7 +285,9 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO pub mod tests { + use super::super::super::state::tests::TestState; use super::super::super::{Entry, Log, Request}; use super::super::tests::{assert_messages, assert_node}; use super::*; @@ -624,7 +630,6 @@ pub mod tests { fn step_appendentries_base0() -> Result<()> { // TODO: Move this into a setup function. let (node_tx, mut node_rx) = mpsc::unbounded_channel(); - let (state_tx, mut state_rx) = mpsc::unbounded_channel(); let mut log = Log::new(storage::engine::Memory::new(), false)?; log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; @@ -636,8 +641,8 @@ pub mod tests { peers: HashSet::from([2, 3, 4, 5]), term: 1, log, + state: Box::new(TestState::new(0)), node_tx, - state_tx, role: Follower::new(Some(2), None), }; @@ -667,7 +672,6 @@ pub mod tests { event: Event::AcceptEntries { last_index: 2 }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index 6e4a090ff..175f5e392 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -1,5 +1,5 @@ use super::super::{ - Address, Event, Index, Instruction, Message, ReadSequence, Request, RequestID, Status, + Address, Event, Index, Message, ReadSequence, Request, RequestID, Response, Status, }; use super::{Follower, Node, NodeID, RawNode, Role, Term, Ticks, HEARTBEAT_INTERVAL}; use crate::error::{Error, Result}; @@ -39,8 +39,6 @@ pub struct Leader { /// /// If the leader loses leadership, all pending write requests are aborted /// by returning Error::Abort. - /// - /// TODO: Actually return responses when applied. writes: HashMap, /// Keeps track of pending read requests. To guarantee linearizability, read /// requests are assigned a sequence number and registered here when @@ -97,7 +95,6 @@ impl RawNode { info!("Discovered new term {}", term); // Cancel in-flight requests. - self.state_tx.send(Instruction::Abort)?; for write in std::mem::take(&mut self.role.writes).into_values() { self.send( write.from, @@ -141,16 +138,10 @@ impl RawNode { // are its leader. If it doesn't have the commit index in its local // log, replicate the log to it. If the peer's read sequence number // increased, process any pending reads. - Event::ConfirmLeader { commit_index, has_committed, read_seq } => { + Event::ConfirmLeader { has_committed, read_seq } => { assert!(read_seq <= self.role.read_seq, "Future read sequence number"); let from = msg.from.unwrap(); - self.state_tx.send(Instruction::Vote { - term: msg.term, - index: commit_index, - address: msg.from, - })?; - let mut read_seq_advanced = false; self.role.progress.entry(from).and_modify(|p| { if read_seq > p.read_seq { @@ -179,7 +170,7 @@ impl RawNode { p.last = last_index; p.next = last_index + 1; }); - self.maybe_commit()?; + self.maybe_commit_and_apply()?; } // A follower rejected log entries we sent it, typically because it @@ -210,20 +201,6 @@ impl RawNode { id.clone(), command.clone(), )); - let (commit_index, _) = self.log.get_commit_index(); - self.state_tx.send(Instruction::Query { - id, - address: msg.from, - command, - term: self.term, - index: commit_index, - quorum: self.quorum(), - })?; - self.state_tx.send(Instruction::Vote { - term: self.term, - index: commit_index, - address: Address::Node(self.id), - })?; if self.peers.is_empty() { self.maybe_read()?; } @@ -235,15 +212,14 @@ impl RawNode { Event::ClientRequest { id, request: Request::Mutate(command) } => { let index = self.propose(Some(command))?; self.role.writes.insert(index, Write { from: msg.from, id: id.clone() }); - self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?; if self.peers.is_empty() { - self.maybe_commit()?; + self.maybe_commit_and_apply()?; } } Event::ClientRequest { id, request: Request::Status } => { let engine_status = self.log.status()?; - let status = Box::new(Status { + let status = Status { server: self.id, leader: self.id, term: self.term, @@ -255,11 +231,14 @@ impl RawNode { .chain(std::iter::once((self.id, self.log.get_last_index().0))) .collect(), commit_index: self.log.get_commit_index().0, - apply_index: 0, + apply_index: self.state.get_applied_index(), storage: engine_status.name.clone(), storage_size: engine_status.size, - }); - self.state_tx.send(Instruction::Status { id, address: msg.from, status })? + }; + self.send( + msg.from, + Event::ClientResponse { id, response: Ok(Response::Status(status)) }, + )?; } // Votes can come in after we won the election, ignore them. @@ -306,9 +285,9 @@ impl RawNode { Ok(index) } - /// Commits any new log entries that have been replicated to a quorum, - /// and schedules them for state machine application. - fn maybe_commit(&mut self) -> Result { + /// Commits any new log entries that have been replicated to a quorum, and + /// applies them to the state machine. + fn maybe_commit_and_apply(&mut self) -> Result { // Determine the new commit index, i.e. the last index replicated to a // quorum of peers. let mut last_indexes = self @@ -344,15 +323,28 @@ impl RawNode { None => panic!("Commit index {} missing", commit_index), }; - // Commit and apply the new entries. + // Commit the new entries. if commit_index > prev_commit_index { self.log.commit(commit_index)?; - // TODO: Move application elsewhere, but needs access to applied index. - let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?; + } + + // Apply entries. + let applied_index = self.state.get_applied_index(); + if applied_index < commit_index { + let mut scan = self.log.scan((applied_index + 1)..=commit_index)?; while let Some(entry) = scan.next().transpose()? { - // TODO: Send response. - self.role.writes.remove(&entry.index); - self.state_tx.send(Instruction::Apply { entry })?; + let index = entry.index; + let result = Self::apply(&mut self.state, entry)?; + if let Some(write) = self.role.writes.remove(&index) { + let response = result.map(Response::Mutate); + // TODO: use self.send() or something. + self.node_tx.send(Message { + from: Address::Node(self.id), + to: write.from, + term: self.term, + event: Event::ClientResponse { id: write.id, response }, + })?; + } } } Ok(commit_index) @@ -378,8 +370,10 @@ impl RawNode { if *rs > read_seq { break; } - self.role.reads.pop_front().unwrap(); - // TODO: Actually execute the reads. + let (_, from, id, command) = self.role.reads.pop_front().unwrap(); + let result = self.state.query(command); + let response = result.map(Response::Query); + self.send(from, Event::ClientResponse { id, response })?; } Ok(read_seq) @@ -404,6 +398,7 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO mod tests { use super::super::super::{Entry, Log}; use super::super::tests::{assert_messages, assert_node}; diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index 4a8e3eb56..9582905e0 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -2,8 +2,8 @@ mod candidate; mod follower; mod leader; -use super::{Address, Driver, Event, Index, Instruction, Log, Message, State}; -use crate::error::Result; +use super::{Address, Entry, Event, Index, Log, Message, State}; +use crate::error::{Error, Result}; use candidate::Candidate; use follower::Follower; use leader::Leader; @@ -68,16 +68,11 @@ impl Node { pub async fn new( id: NodeID, peers: HashSet, - mut log: Log, - mut state: Box, + log: Log, + state: Box, node_tx: mpsc::UnboundedSender, ) -> Result { - let (state_tx, state_rx) = mpsc::unbounded_channel(); - let mut driver = Driver::new(id, state_rx, node_tx.clone()); - driver.apply_log(&mut *state, &mut log)?; - tokio::spawn(driver.drive(state)); - - let node = RawNode::new(id, peers, log, node_tx, state_tx)?; + let node = RawNode::new(id, peers, log, state, node_tx)?; if node.peers.is_empty() { // If there are no peers, become leader immediately. return Ok(node.into_candidate()?.into_leader()?.into()); @@ -144,8 +139,8 @@ pub struct RawNode { peers: HashSet, term: Term, log: Log, + state: Box, node_tx: mpsc::UnboundedSender, - state_tx: mpsc::UnboundedSender, role: R, } @@ -157,12 +152,30 @@ impl RawNode { peers: self.peers, term: self.term, log: self.log, + state: self.state, node_tx: self.node_tx, - state_tx: self.state_tx, role, } } + /// Applies a log entry to the state machine. If the command returns + /// Error::Internal, the outer result is Err. The inner result is for user + /// errors that should be propagated to the client. + fn apply(state: &mut Box, entry: Entry) -> Result>> { + debug_assert!( + entry.index == state.get_applied_index() + 1, + "Can't apply entry {:?} on top of applied index {}", + entry, + state.get_applied_index() + ); + + debug!("Applying {:?}", entry); + match state.apply(entry) { + Err(error @ Error::Internal(_)) => Err(error), + result => Ok(result), + } + } + /// Returns the quorum size of the cluster. fn quorum(&self) -> u64 { (self.peers.len() as u64 + 1) / 2 + 1 @@ -219,6 +232,7 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO mod tests { pub use super::super::state::tests::TestState; use super::super::{Entry, RequestID}; @@ -382,15 +396,14 @@ mod tests { peers: Vec, ) -> Result<(RawNode, mpsc::UnboundedReceiver)> { let (node_tx, node_rx) = mpsc::unbounded_channel(); - let (state_tx, _) = mpsc::unbounded_channel(); let node = RawNode { role: Follower::new(None, None), id: 1, peers: HashSet::from_iter(peers), term: 1, log: Log::new(storage::engine::Memory::new(), false)?, + state: Box::new(TestState::new(0)), node_tx, - state_tx, }; Ok((node, node_rx)) } diff --git a/src/raft/state.rs b/src/raft/state.rs index ed2d3878d..6bcd79efb 100644 --- a/src/raft/state.rs +++ b/src/raft/state.rs @@ -1,11 +1,5 @@ -use super::{Address, Entry, Event, Index, Log, Message, NodeID, Response, Status, Term}; -use crate::error::{Error, Result}; - -use log::{debug, error}; -use std::collections::{BTreeMap, HashMap, HashSet}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tokio_stream::StreamExt as _; +use super::{Entry, Index}; +use crate::error::Result; /// A Raft-managed state machine. pub trait State: Send { @@ -27,241 +21,10 @@ pub trait State: Send { fn query(&self, command: Vec) -> Result>; } -#[derive(Debug, PartialEq)] -/// A driver instruction. -pub enum Instruction { - /// Abort all pending operations, e.g. due to leader change. - Abort, - /// Apply a log entry. - Apply { entry: Entry }, - /// Notify the given address with the result of applying the entry at the given index. - Notify { id: Vec, address: Address, index: Index }, - /// Query the state machine when the given term and index has been confirmed by vote. - Query { id: Vec, address: Address, command: Vec, term: Term, index: Index, quorum: u64 }, - /// Extend the given server status and return it to the given address. - Status { id: Vec, address: Address, status: Box }, - /// Votes for queries at the given term and commit index. - Vote { term: Term, index: Index, address: Address }, -} - -/// A driver query. -struct Query { - id: Vec, - term: Term, - address: Address, - command: Vec, - quorum: u64, - votes: HashSet
, -} - -/// Drives a state machine, taking operations from state_rx and sending results via node_tx. -pub struct Driver { - node_id: NodeID, - state_rx: UnboundedReceiverStream, - node_tx: mpsc::UnboundedSender, - /// Notify clients when their mutation is applied. - notify: HashMap)>, - /// Execute client queries when they receive a quorum. > - queries: BTreeMap, Query>>, -} - -impl Driver { - /// Creates a new state machine driver. - pub fn new( - node_id: NodeID, - state_rx: mpsc::UnboundedReceiver, - node_tx: mpsc::UnboundedSender, - ) -> Self { - Self { - node_id, - state_rx: UnboundedReceiverStream::new(state_rx), - node_tx, - notify: HashMap::new(), - queries: BTreeMap::new(), - } - } - - /// Drives a state machine. - pub async fn drive(mut self, mut state: Box) -> Result<()> { - debug!("Starting state machine driver at applied index {}", state.get_applied_index()); - while let Some(instruction) = self.state_rx.next().await { - if let Err(error) = self.execute(instruction, &mut *state) { - error!("Halting state machine due to error: {}", error); - return Err(error); - } - } - debug!("Stopping state machine driver"); - Ok(()) - } - - /// Applies committed log entries to the state machine. - pub fn apply_log(&mut self, state: &mut dyn State, log: &mut Log) -> Result { - let applied_index = state.get_applied_index(); - let (commit_index, _) = log.get_commit_index(); - assert!(applied_index <= commit_index, "applied index above commit index"); - - if applied_index < commit_index { - let mut scan = log.scan((applied_index + 1)..=commit_index)?; - while let Some(entry) = scan.next().transpose()? { - self.apply(state, entry)?; - } - } - Ok(state.get_applied_index()) - } - - /// Applies an entry to the state machine. - pub fn apply(&mut self, state: &mut dyn State, entry: Entry) -> Result { - // Apply the command. - debug!("Applying {:?}", entry); - match state.apply(entry) { - Err(error @ Error::Internal(_)) => return Err(error), - result => self.notify_applied(state.get_applied_index(), result)?, - }; - // Try to execute any pending queries, since they may have been submitted for a - // commit_index which hadn't been applied yet. - self.query_execute(state)?; - Ok(state.get_applied_index()) - } - - /// Executes a state machine instruction. - fn execute(&mut self, i: Instruction, state: &mut dyn State) -> Result<()> { - debug!("Executing {:?}", i); - match i { - Instruction::Abort => { - self.notify_abort()?; - self.query_abort()?; - } - - Instruction::Apply { entry } => { - self.apply(state, entry)?; - } - - Instruction::Notify { id, address, index } => { - if index > state.get_applied_index() { - self.notify.insert(index, (address, id)); - } else { - self.send(address, Event::ClientResponse { id, response: Err(Error::Abort) })?; - } - } - - Instruction::Query { id, address, command, index, term, quorum } => { - self.queries.entry(index).or_default().insert( - id.clone(), - Query { id, term, address, command, quorum, votes: HashSet::new() }, - ); - } - - Instruction::Status { id, address, mut status } => { - status.apply_index = state.get_applied_index(); - self.send( - address, - Event::ClientResponse { id, response: Ok(Response::Status(*status)) }, - )?; - } - - Instruction::Vote { term, index, address } => { - self.query_vote(term, index, address); - self.query_execute(state)?; - } - } - Ok(()) - } - - /// Aborts all pending notifications. - fn notify_abort(&mut self) -> Result<()> { - for (_, (address, id)) in std::mem::take(&mut self.notify) { - self.send(address, Event::ClientResponse { id, response: Err(Error::Abort) })?; - } - Ok(()) - } - - /// Notifies a client about an applied log entry, if any. - fn notify_applied(&mut self, index: Index, result: Result>) -> Result<()> { - if let Some((to, id)) = self.notify.remove(&index) { - self.send(to, Event::ClientResponse { id, response: result.map(Response::Mutate) })?; - } - Ok(()) - } - - /// Aborts all pending queries. - fn query_abort(&mut self) -> Result<()> { - for (_, queries) in std::mem::take(&mut self.queries) { - for (id, query) in queries { - self.send( - query.address, - Event::ClientResponse { id, response: Err(Error::Abort) }, - )?; - } - } - Ok(()) - } - - /// Executes any queries that are ready. - fn query_execute(&mut self, state: &mut dyn State) -> Result<()> { - for query in self.query_ready(state.get_applied_index()) { - debug!("Executing query {:?}", query.command); - let result = state.query(query.command); - if let Err(error @ Error::Internal(_)) = result { - return Err(error); - } - self.send( - query.address, - Event::ClientResponse { id: query.id, response: result.map(Response::Query) }, - )? - } - Ok(()) - } - - /// Fetches and removes any ready queries, where index <= applied_index. - fn query_ready(&mut self, applied_index: Index) -> Vec { - let mut ready = Vec::new(); - let mut empty = Vec::new(); - for (index, queries) in self.queries.range_mut(..=applied_index) { - let mut ready_ids = Vec::new(); - for (id, query) in queries.iter_mut() { - if query.votes.len() as u64 >= query.quorum { - ready_ids.push(id.clone()); - } - } - for id in ready_ids { - if let Some(query) = queries.remove(&id) { - ready.push(query) - } - } - if queries.is_empty() { - empty.push(*index) - } - } - for index in empty { - self.queries.remove(&index); - } - ready - } - - /// Votes for queries up to and including a given commit index for a term by an address. - fn query_vote(&mut self, term: Term, commit_index: Index, address: Address) { - for (_, queries) in self.queries.range_mut(..=commit_index) { - for (_, query) in queries.iter_mut() { - if term >= query.term { - query.votes.insert(address); - } - } - } - } - - /// Sends a message. - fn send(&self, to: Address, event: Event) -> Result<()> { - // TODO: This needs to use the correct term. - let msg = Message { from: Address::Node(self.node_id), to, term: 0, event }; - debug!("Sending {:?}", msg); - Ok(self.node_tx.send(msg)?) - } -} - #[cfg(test)] +#[cfg(never)] // TODO pub mod tests { use super::*; - use pretty_assertions::assert_eq; use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] @@ -303,180 +66,4 @@ pub mod tests { Ok(command) } } - - async fn setup() -> Result<( - Box, - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - )> { - let state = Box::new(TestState::new(0)); - let (state_tx, state_rx) = mpsc::unbounded_channel(); - let (node_tx, node_rx) = mpsc::unbounded_channel(); - tokio::spawn(Driver::new(1, state_rx, node_tx).drive(state.clone())); - Ok((state, state_tx, node_rx)) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_abort() -> Result<()> { - let (state, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Notify { - id: vec![0x01], - index: 1, - address: Address::Node(1), - })?; - state_tx.send(Instruction::Query { - id: vec![0x02], - address: Address::Client, - command: vec![0xf0], - term: 1, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; - state_tx.send(Instruction::Abort)?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!( - node_rx.collect::>().await, - vec![ - Message { - from: Address::Node(1), - to: Address::Node(1), - term: 0, - event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) } - }, - Message { - from: Address::Node(1), - to: Address::Client, - term: 0, - event: Event::ClientResponse { id: vec![0x02], response: Err(Error::Abort) } - } - ] - ); - assert_eq!(state.list(), Vec::>::new()); - assert_eq!(state.get_applied_index(), 0); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_apply() -> Result<()> { - let (state, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Notify { - id: vec![0x01], - index: 2, - address: Address::Client, - })?; - state_tx.send(Instruction::Apply { entry: Entry { index: 1, term: 1, command: None } })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 2, term: 1, command: Some(vec![0xaf]) }, - })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!( - node_rx.collect::>().await, - vec![Message { - from: Address::Node(1), - to: Address::Client, - term: 0, - event: Event::ClientResponse { - id: vec![0x01], - response: Ok(Response::Mutate(vec![0xaf])) - } - }] - ); - assert_eq!(state.list(), vec![vec![0xaf]]); - assert_eq!(state.get_applied_index(), 2); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_query() -> Result<()> { - let (_, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Query { - id: vec![0x01], - address: Address::Client, - command: vec![0xf0], - term: 2, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 1, term: 2, command: Some(vec![0xaf]) }, - })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(1) })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(2) })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!( - node_rx.collect::>().await, - vec![Message { - from: Address::Node(1), - to: Address::Client, - term: 0, - event: Event::ClientResponse { - id: vec![0x01], - response: Ok(Response::Query(vec![0xf0])) - } - }] - ); - - Ok(()) - } - - // A query for an index submitted in a given term cannot be satisfied by votes below that term. - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_query_noterm() -> Result<()> { - let (_, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Query { - id: vec![0x01], - address: Address::Client, - command: vec![0xf0], - term: 2, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, - })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(1) })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!(node_rx.collect::>().await, vec![]); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_query_noquorum() -> Result<()> { - let (_, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Query { - id: vec![0x01], - address: Address::Client, - command: vec![0xf0], - term: 1, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, - })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!(node_rx.collect::>().await, vec![]); - - Ok(()) - } }