From 9eeb2e2d72abc1162df27170f2e5494a2f5fe40e Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 25 Nov 2023 12:58:52 +0100 Subject: [PATCH] wip --- src/raft/message.rs | 5 - src/raft/mod.rs | 2 +- src/raft/node/candidate.rs | 39 ++-- src/raft/node/follower.rs | 43 ++-- src/raft/node/leader.rs | 111 +++++----- src/raft/node/mod.rs | 59 ++++-- src/raft/state.rs | 418 +------------------------------------ 7 files changed, 123 insertions(+), 554 deletions(-) diff --git a/src/raft/message.rs b/src/raft/message.rs index cdf93a877..39b5871f6 100644 --- a/src/raft/message.rs +++ b/src/raft/message.rs @@ -54,11 +54,6 @@ pub enum Event { }, /// Followers confirm loyalty to leader after heartbeats. ConfirmLeader { - /// The commit_index of the original leader heartbeat, to confirm - /// read requests. - /// - /// TODO: remove this when migrated to read_seq. - commit_index: Index, /// If false, the follower does not have the entry at commit_index /// and would like the leader to replicate it. has_committed: bool, diff --git a/src/raft/mod.rs b/src/raft/mod.rs index e002247e5..c33c03d38 100644 --- a/src/raft/mod.rs +++ b/src/raft/mod.rs @@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log}; pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response}; pub use node::{Node, NodeID, Status, Term}; pub use server::Server; -pub use state::{Driver, Instruction, State}; +pub use state::State; diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index ecf1e3b3d..e3fa5a50a 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -168,21 +168,19 @@ impl RawNode { #[cfg(test)] mod tests { - use super::super::super::{Entry, Instruction, Log, Request}; + use super::super::super::state::tests::TestState; + use super::super::super::{Entry, Log, Request}; use super::super::tests::{assert_messages, assert_node}; use super::*; use crate::storage; use tokio::sync::mpsc; #[allow(clippy::type_complexity)] - fn setup() -> Result<( - RawNode, - mpsc::UnboundedReceiver, - mpsc::UnboundedReceiver, - )> { + fn setup() -> Result<(RawNode, mpsc::UnboundedReceiver)> { let (node_tx, node_rx) = mpsc::unbounded_channel(); - let (state_tx, state_rx) = mpsc::unbounded_channel(); + let state = Box::new(TestState::new(0)); let mut log = Log::new(storage::engine::Memory::new(), false)?; + log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; log.append(2, Some(vec![0x03]))?; @@ -194,18 +192,18 @@ mod tests { peers: HashSet::from([2, 3, 4, 5]), term: 3, log, + state, node_tx, - state_tx, role: Candidate::new(), }; node.role.votes.insert(1); - Ok((node, node_rx, state_rx)) + Ok((node, node_rx)) } #[test] // Heartbeat for current term converts to follower and emits ConfirmLeader. fn step_heartbeat_current_term() -> Result<()> { - let (candidate, mut node_rx, mut state_rx) = setup()?; + let (candidate, mut node_rx) = setup()?; let mut node = candidate.step(Message { from: Address::Node(2), to: Address::Node(1), @@ -219,10 +217,9 @@ mod tests { from: Address::Node(1), to: Address::Node(2), term: 3, - event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 }, + event: Event::ConfirmLeader { has_committed: true, read_seq: 7 }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } @@ -230,7 +227,7 @@ mod tests { // Heartbeat for future term converts to follower and emits ConfirmLeader // event. fn step_heartbeat_future_term() -> Result<()> { - let (candidate, mut node_rx, mut state_rx) = setup()?; + let (candidate, mut node_rx) = setup()?; let mut node = candidate.step(Message { from: Address::Node(2), to: Address::Node(1), @@ -244,17 +241,16 @@ mod tests { from: Address::Node(1), to: Address::Node(2), term: 4, - event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 }, + event: Event::ConfirmLeader { has_committed: true, read_seq: 7 }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } #[test] // Heartbeat for past term is ignored fn step_heartbeat_past_term() -> Result<()> { - let (candidate, mut node_rx, mut state_rx) = setup()?; + let (candidate, mut node_rx) = setup()?; let mut node = candidate.step(Message { from: Address::Node(2), to: Address::Node(1), @@ -263,13 +259,12 @@ mod tests { })?; assert_node(&mut node).is_candidate().term(3); assert_messages(&mut node_rx, vec![]); - assert_messages(&mut state_rx, vec![]); Ok(()) } #[test] fn step_grantvote() -> Result<()> { - let (candidate, mut node_rx, mut state_rx) = setup()?; + let (candidate, mut node_rx) = setup()?; let peers = candidate.peers.clone(); let mut node = Node::Candidate(candidate); @@ -282,7 +277,6 @@ mod tests { })?; assert_node(&mut node).is_candidate().term(3); assert_messages(&mut node_rx, vec![]); - assert_messages(&mut state_rx, vec![]); // However, the second external vote makes us leader node = node.step(Message { @@ -320,14 +314,13 @@ mod tests { } assert_messages(&mut node_rx, vec![]); - assert_messages(&mut state_rx, vec![]); Ok(()) } #[test] // ClientRequest returns Error::Abort. fn step_clientrequest() -> Result<()> { - let (candidate, mut node_rx, mut state_rx) = setup()?; + let (candidate, mut node_rx) = setup()?; let mut node = Node::Candidate(candidate); node = node.step(Message { @@ -346,13 +339,12 @@ mod tests { event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } #[test] fn tick() -> Result<()> { - let (candidate, mut node_rx, mut state_rx) = setup()?; + let (candidate, mut node_rx) = setup()?; let timeout = candidate.role.election_timeout; let mut node = Node::Candidate(candidate); @@ -372,7 +364,6 @@ mod tests { event: Event::SolicitVote { last_index: 3, last_term: 2 }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } } diff --git a/src/raft/node/follower.rs b/src/raft/node/follower.rs index be3cdc7b4..4b9d7f994 100644 --- a/src/raft/node/follower.rs +++ b/src/raft/node/follower.rs @@ -1,4 +1,4 @@ -use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response}; +use super::super::{Address, Event, Log, Message, RequestID, Response, State}; use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks}; use crate::error::{Error, Result}; @@ -10,13 +10,13 @@ use tokio::sync::mpsc; #[derive(Clone, Debug, PartialEq)] pub struct Follower { /// The leader, or None if just initialized. - leader: Option, + pub(super) leader: Option, /// The number of ticks since the last message from the leader. leader_seen: Ticks, /// The leader_seen timeout before triggering an election. election_timeout: Ticks, /// The node we voted for in the current term, if any. - voted_for: Option, + pub(super) voted_for: Option, // Local client requests that have been forwarded to the leader. These are // aborted on leader/term changes. pub(super) forwarded: HashSet, @@ -43,12 +43,12 @@ impl RawNode { id: NodeID, peers: HashSet, mut log: Log, + state: Box, node_tx: mpsc::UnboundedSender, - state_tx: mpsc::UnboundedSender, ) -> Result { let (term, voted_for) = log.get_term()?; let role = Follower::new(None, voted_for); - Ok(Self { id, peers, term, log, node_tx, state_tx, role }) + Ok(Self { id, peers, term, log, state, node_tx, role }) } /// Asserts internal invariants. @@ -150,20 +150,17 @@ impl RawNode { None => self = self.into_follower(Some(from), msg.term)?, } - // Advance commit index and apply entries if possible. + // Respond to the heartbeat. + // + // TODO: this should return the last index and term. let has_committed = self.log.has(commit_index, commit_term)?; - let (old_commit_index, _) = self.log.get_commit_index(); - if has_committed && commit_index > old_commit_index { + self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?; + + // Advance commit index and apply entries. + if has_committed && commit_index > self.log.get_commit_index().0 { self.log.commit(commit_index)?; - let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?; - while let Some(entry) = scan.next().transpose()? { - self.state_tx.send(Instruction::Apply { entry })?; - } + self.maybe_apply()?; } - self.send( - msg.from, - Event::ConfirmLeader { commit_index, has_committed, read_seq }, - )?; } // Replicate entries from the leader. If we don't have a leader in @@ -281,7 +278,9 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO pub mod tests { + use super::super::super::state::tests::TestState; use super::super::super::{Entry, Log, Request}; use super::super::tests::{assert_messages, assert_node}; use super::*; @@ -289,14 +288,6 @@ pub mod tests { use crate::storage; use tokio::sync::mpsc; - pub fn follower_leader(node: &RawNode) -> Option { - node.role.leader - } - - pub fn follower_voted_for(node: &RawNode) -> Option { - node.role.voted_for - } - #[allow(clippy::type_complexity)] fn setup() -> Result<( RawNode, @@ -624,7 +615,6 @@ pub mod tests { fn step_appendentries_base0() -> Result<()> { // TODO: Move this into a setup function. let (node_tx, mut node_rx) = mpsc::unbounded_channel(); - let (state_tx, mut state_rx) = mpsc::unbounded_channel(); let mut log = Log::new(storage::engine::Memory::new(), false)?; log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; @@ -636,8 +626,8 @@ pub mod tests { peers: HashSet::from([2, 3, 4, 5]), term: 1, log, + state: Box::new(TestState::new(0)), node_tx, - state_tx, role: Follower::new(Some(2), None), }; @@ -667,7 +657,6 @@ pub mod tests { event: Event::AcceptEntries { last_index: 2 }, }], ); - assert_messages(&mut state_rx, vec![]); Ok(()) } diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index ee1ba69dc..5c75dd159 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -1,5 +1,5 @@ use super::super::{ - Address, Event, Index, Instruction, Message, ReadSequence, Request, RequestID, Status, + Address, Event, Index, Message, ReadSequence, Request, RequestID, Response, Status, }; use super::{Follower, Node, NodeID, RawNode, Role, Term, Ticks, HEARTBEAT_INTERVAL}; use crate::error::{Error, Result}; @@ -52,8 +52,6 @@ pub struct Leader { /// /// If the leader loses leadership, all pending write requests are aborted /// by returning Error::Abort. - /// - /// TODO: Actually return responses when applied. writes: HashMap, /// Keeps track of pending read requests. To guarantee linearizability, read /// requests are assigned a sequence number and registered here when @@ -110,7 +108,6 @@ impl RawNode { info!("Discovered new term {}", term); // Cancel in-flight requests. - self.state_tx.send(Instruction::Abort)?; for write in std::mem::take(&mut self.role.writes).into_values() { self.send( write.from, @@ -157,16 +154,10 @@ impl RawNode { // are its leader. If it doesn't have the commit index in its local // log, replicate the log to it. If the peer's read sequence number // increased, process any pending reads. - Event::ConfirmLeader { commit_index, has_committed, read_seq } => { + Event::ConfirmLeader { has_committed, read_seq } => { assert!(read_seq <= self.role.read_seq, "Future read sequence number"); let from = msg.from.unwrap(); - self.state_tx.send(Instruction::Vote { - term: msg.term, - index: commit_index, - address: msg.from, - })?; - let progress = self.role.progress.get_mut(&from).unwrap(); if read_seq > progress.read_seq { progress.read_seq = read_seq; @@ -191,7 +182,7 @@ impl RawNode { if last_index > progress.last { progress.last = last_index; progress.next = last_index + 1; - self.maybe_commit()?; + self.maybe_commit_and_apply()?; } } @@ -220,23 +211,9 @@ impl RawNode { self.role.reads.push_back(Read { seq: self.role.read_seq, from: msg.from, - id: id.clone(), - command: command.clone(), - }); - let (commit_index, _) = self.log.get_commit_index(); - self.state_tx.send(Instruction::Query { id, - address: msg.from, command, - term: self.term, - index: commit_index, - quorum: self.quorum_size(), - })?; - self.state_tx.send(Instruction::Vote { - term: self.term, - index: commit_index, - address: Address::Node(self.id), - })?; + }); if self.peers.is_empty() { self.maybe_read()?; } @@ -248,15 +225,14 @@ impl RawNode { Event::ClientRequest { id, request: Request::Mutate(command) } => { let index = self.propose(Some(command))?; self.role.writes.insert(index, Write { from: msg.from, id: id.clone() }); - self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?; if self.peers.is_empty() { - self.maybe_commit()?; + self.maybe_commit_and_apply()?; } } Event::ClientRequest { id, request: Request::Status } => { let engine_status = self.log.status()?; - let status = Box::new(Status { + let status = Status { server: self.id, leader: self.id, term: self.term, @@ -268,11 +244,14 @@ impl RawNode { .chain(std::iter::once((self.id, self.log.get_last_index().0))) .collect(), commit_index: self.log.get_commit_index().0, - apply_index: 0, + apply_index: self.state.get_applied_index(), storage: engine_status.name.clone(), storage_size: engine_status.size, - }); - self.state_tx.send(Instruction::Status { id, address: msg.from, status })? + }; + self.send( + msg.from, + Event::ClientResponse { id, response: Ok(Response::Status(status)) }, + )?; } // Votes can come in after we won the election, ignore them. @@ -319,11 +298,10 @@ impl RawNode { Ok(index) } - /// Commits any new log entries that have been replicated to a quorum, - /// and schedules them for state machine application. - fn maybe_commit(&mut self) -> Result { - // Determine the new commit index, i.e. the last index replicated to a - // quorum of peers. + /// Commits any new log entries that have been replicated to a quorum, and + /// applies them to the state machine. + fn maybe_commit_and_apply(&mut self) -> Result { + // Determine the new commit index. let commit_index = self.quorum_value( self.role .progress @@ -333,39 +311,41 @@ impl RawNode { .collect(), ); - // A 0 commit index means we haven't committed anything yet. - if commit_index == 0 { + // Make sure the commit index advances. + let prev_commit_index = self.log.get_commit_index().0; + assert!(commit_index >= prev_commit_index, "Commit index regression"); + if commit_index == prev_commit_index { return Ok(commit_index); } - // Make sure the commit index does not regress. - let (prev_commit_index, _) = self.log.get_commit_index(); - assert!( - commit_index >= prev_commit_index, - "Commit index regression {} -> {}", - prev_commit_index, - commit_index - ); - - // We can only safely commit up to an entry from our own term, see - // figure 8 in Raft paper. + // We can only safely commit an entry from our own term (see figure 8 in + // Raft paper). match self.log.get(commit_index)? { Some(entry) if entry.term == self.term => {} Some(_) => return Ok(prev_commit_index), None => panic!("Commit index {} missing", commit_index), }; - // Commit and apply the new entries. - if commit_index > prev_commit_index { - self.log.commit(commit_index)?; - // TODO: Move application elsewhere, but needs access to applied index. - let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?; - while let Some(entry) = scan.next().transpose()? { - // TODO: Send response. - self.role.writes.remove(&entry.index); - self.state_tx.send(Instruction::Apply { entry })?; + // Commit the new entries. + self.log.commit(commit_index)?; + + // Apply entries and respond to client writers. + Self::maybe_apply_with(&mut self.log, &mut self.state, |index, result| -> Result<()> { + if let Some(write) = self.role.writes.remove(&index) { + // TODO: use self.send() or something. + self.node_tx.send(Message { + from: Address::Node(self.id), + to: write.from, + term: self.term, + event: Event::ClientResponse { + id: write.id, + response: result.map(Response::Mutate), + }, + })?; } - } + Ok(()) + })?; + Ok(commit_index) } @@ -391,8 +371,12 @@ impl RawNode { if read.seq > read_seq { break; } - self.role.reads.pop_front().unwrap(); - // TODO: Actually execute the reads. + let read = self.role.reads.pop_front().unwrap(); + let result = self.state.query(read.command); + self.send( + read.from, + Event::ClientResponse { id: read.id, response: result.map(Response::Query) }, + )?; } Ok(()) @@ -417,6 +401,7 @@ impl RawNode { } #[cfg(test)] +#[cfg(never)] // TODO mod tests { use super::super::super::{Entry, Log}; use super::super::tests::{assert_messages, assert_node}; diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index 52dc005dd..ab0b836cc 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -2,8 +2,8 @@ mod candidate; mod follower; mod leader; -use super::{Address, Driver, Event, Index, Instruction, Log, Message, State}; -use crate::error::Result; +use super::{Address, Entry, Event, Index, Log, Message, State}; +use crate::error::{Error, Result}; use candidate::Candidate; use follower::Follower; use leader::Leader; @@ -68,16 +68,11 @@ impl Node { pub async fn new( id: NodeID, peers: HashSet, - mut log: Log, - mut state: Box, + log: Log, + state: Box, node_tx: mpsc::UnboundedSender, ) -> Result { - let (state_tx, state_rx) = mpsc::unbounded_channel(); - let mut driver = Driver::new(id, state_rx, node_tx.clone()); - driver.apply_log(&mut *state, &mut log)?; - tokio::spawn(driver.drive(state)); - - let node = RawNode::new(id, peers, log, node_tx, state_tx)?; + let node = RawNode::new(id, peers, log, state, node_tx)?; if node.peers.is_empty() { // If there are no peers, become leader immediately. return Ok(node.into_candidate()?.into_leader()?.into()); @@ -144,8 +139,8 @@ pub struct RawNode { peers: HashSet, term: Term, log: Log, + state: Box, node_tx: mpsc::UnboundedSender, - state_tx: mpsc::UnboundedSender, role: R, } @@ -157,12 +152,42 @@ impl RawNode { peers: self.peers, term: self.term, log: self.log, + state: self.state, node_tx: self.node_tx, - state_tx: self.state_tx, role, } } + /// Applies any pending, committed entries to the state machine. + fn maybe_apply(&mut self) -> Result<()> { + Self::maybe_apply_with(&mut self.log, &mut self.state, |_, _| Ok(())) + } + + /// Like maybe_apply(), but calls the given closure with the result of every + /// applied command. Not a method, so that the closure can mutate the node. + fn maybe_apply_with(log: &mut Log, state: &mut Box, mut on_apply: F) -> Result<()> + where + F: FnMut(Index, Result>) -> Result<()>, + { + let applied_index = state.get_applied_index(); + let commit_index = log.get_commit_index().0; + assert!(commit_index >= applied_index, "Commit index below applied index"); + if applied_index >= commit_index { + return Ok(()); + } + + let mut scan = log.scan((applied_index + 1)..=commit_index)?; + while let Some(entry) = scan.next().transpose()? { + let index = entry.index; + debug!("Applying {:?}", entry); + match state.apply(entry) { + Err(error @ Error::Internal(_)) => return Err(error), + result => on_apply(index, result)?, + } + } + Ok(()) + } + /// Returns the size of the cluster. fn cluster_size(&self) -> u8 { self.peers.len() as u8 + 1 @@ -247,7 +272,6 @@ fn quorum_value(mut values: Vec) -> T { mod tests { pub use super::super::state::tests::TestState; use super::super::{Entry, RequestID}; - use super::follower::tests::{follower_leader, follower_voted_for}; use super::*; use crate::storage; use pretty_assertions::assert_eq; @@ -335,7 +359,7 @@ mod tests { leader, match self.node { Node::Candidate(_) => None, - Node::Follower(n) => follower_leader(n), + Node::Follower(n) => n.role.leader, Node::Leader(_) => None, }, "Unexpected leader", @@ -371,7 +395,7 @@ mod tests { saved_voted_for, match self.node { Node::Candidate(n) => Some(n.id), - Node::Follower(n) => follower_voted_for(n), + Node::Follower(n) => n.role.voted_for, Node::Leader(n) => Some(n.id), }, "Incorrect voted_for stored in log" @@ -384,7 +408,7 @@ mod tests { voted_for, match self.node { Node::Candidate(_) => None, - Node::Follower(n) => follower_voted_for(n), + Node::Follower(n) => n.role.voted_for, Node::Leader(_) => None, }, "Unexpected voted_for" @@ -407,15 +431,14 @@ mod tests { peers: Vec, ) -> Result<(RawNode, mpsc::UnboundedReceiver)> { let (node_tx, node_rx) = mpsc::unbounded_channel(); - let (state_tx, _) = mpsc::unbounded_channel(); let node = RawNode { role: Follower::new(None, None), id: 1, peers: HashSet::from_iter(peers), term: 1, log: Log::new(storage::engine::Memory::new(), false)?, + state: Box::new(TestState::new(0)), node_tx, - state_tx, }; Ok((node, node_rx)) } diff --git a/src/raft/state.rs b/src/raft/state.rs index dfdc20d47..a7bbf35f3 100644 --- a/src/raft/state.rs +++ b/src/raft/state.rs @@ -1,11 +1,5 @@ -use super::{Address, Entry, Event, Index, Log, Message, NodeID, Response, Status, Term}; -use crate::error::{Error, Result}; - -use log::{debug, error}; -use std::collections::{BTreeMap, HashMap, HashSet}; -use tokio::sync::mpsc; -use tokio_stream::wrappers::UnboundedReceiverStream; -use tokio_stream::StreamExt as _; +use super::{Entry, Index}; +use crate::error::Result; /// A Raft-managed state machine. pub trait State: Send { @@ -27,241 +21,9 @@ pub trait State: Send { fn query(&self, command: Vec) -> Result>; } -#[derive(Debug, PartialEq)] -/// A driver instruction. -pub enum Instruction { - /// Abort all pending operations, e.g. due to leader change. - Abort, - /// Apply a log entry. - Apply { entry: Entry }, - /// Notify the given address with the result of applying the entry at the given index. - Notify { id: Vec, address: Address, index: Index }, - /// Query the state machine when the given term and index has been confirmed by vote. - Query { id: Vec, address: Address, command: Vec, term: Term, index: Index, quorum: u8 }, - /// Extend the given server status and return it to the given address. - Status { id: Vec, address: Address, status: Box }, - /// Votes for queries at the given term and commit index. - Vote { term: Term, index: Index, address: Address }, -} - -/// A driver query. -struct Query { - id: Vec, - term: Term, - address: Address, - command: Vec, - quorum: u8, - votes: HashSet
, -} - -/// Drives a state machine, taking operations from state_rx and sending results via node_tx. -pub struct Driver { - node_id: NodeID, - state_rx: UnboundedReceiverStream, - node_tx: mpsc::UnboundedSender, - /// Notify clients when their mutation is applied. - notify: HashMap)>, - /// Execute client queries when they receive a quorum. > - queries: BTreeMap, Query>>, -} - -impl Driver { - /// Creates a new state machine driver. - pub fn new( - node_id: NodeID, - state_rx: mpsc::UnboundedReceiver, - node_tx: mpsc::UnboundedSender, - ) -> Self { - Self { - node_id, - state_rx: UnboundedReceiverStream::new(state_rx), - node_tx, - notify: HashMap::new(), - queries: BTreeMap::new(), - } - } - - /// Drives a state machine. - pub async fn drive(mut self, mut state: Box) -> Result<()> { - debug!("Starting state machine driver at applied index {}", state.get_applied_index()); - while let Some(instruction) = self.state_rx.next().await { - if let Err(error) = self.execute(instruction, &mut *state) { - error!("Halting state machine due to error: {}", error); - return Err(error); - } - } - debug!("Stopping state machine driver"); - Ok(()) - } - - /// Applies committed log entries to the state machine. - pub fn apply_log(&mut self, state: &mut dyn State, log: &mut Log) -> Result { - let applied_index = state.get_applied_index(); - let (commit_index, _) = log.get_commit_index(); - assert!(applied_index <= commit_index, "applied index above commit index"); - - if applied_index < commit_index { - let mut scan = log.scan((applied_index + 1)..=commit_index)?; - while let Some(entry) = scan.next().transpose()? { - self.apply(state, entry)?; - } - } - Ok(state.get_applied_index()) - } - - /// Applies an entry to the state machine. - pub fn apply(&mut self, state: &mut dyn State, entry: Entry) -> Result { - // Apply the command. - debug!("Applying {:?}", entry); - match state.apply(entry) { - Err(error @ Error::Internal(_)) => return Err(error), - result => self.notify_applied(state.get_applied_index(), result)?, - }; - // Try to execute any pending queries, since they may have been submitted for a - // commit_index which hadn't been applied yet. - self.query_execute(state)?; - Ok(state.get_applied_index()) - } - - /// Executes a state machine instruction. - fn execute(&mut self, i: Instruction, state: &mut dyn State) -> Result<()> { - debug!("Executing {:?}", i); - match i { - Instruction::Abort => { - self.notify_abort()?; - self.query_abort()?; - } - - Instruction::Apply { entry } => { - self.apply(state, entry)?; - } - - Instruction::Notify { id, address, index } => { - if index > state.get_applied_index() { - self.notify.insert(index, (address, id)); - } else { - self.send(address, Event::ClientResponse { id, response: Err(Error::Abort) })?; - } - } - - Instruction::Query { id, address, command, index, term, quorum } => { - self.queries.entry(index).or_default().insert( - id.clone(), - Query { id, term, address, command, quorum, votes: HashSet::new() }, - ); - } - - Instruction::Status { id, address, mut status } => { - status.apply_index = state.get_applied_index(); - self.send( - address, - Event::ClientResponse { id, response: Ok(Response::Status(*status)) }, - )?; - } - - Instruction::Vote { term, index, address } => { - self.query_vote(term, index, address); - self.query_execute(state)?; - } - } - Ok(()) - } - - /// Aborts all pending notifications. - fn notify_abort(&mut self) -> Result<()> { - for (_, (address, id)) in std::mem::take(&mut self.notify) { - self.send(address, Event::ClientResponse { id, response: Err(Error::Abort) })?; - } - Ok(()) - } - - /// Notifies a client about an applied log entry, if any. - fn notify_applied(&mut self, index: Index, result: Result>) -> Result<()> { - if let Some((to, id)) = self.notify.remove(&index) { - self.send(to, Event::ClientResponse { id, response: result.map(Response::Mutate) })?; - } - Ok(()) - } - - /// Aborts all pending queries. - fn query_abort(&mut self) -> Result<()> { - for (_, queries) in std::mem::take(&mut self.queries) { - for (id, query) in queries { - self.send( - query.address, - Event::ClientResponse { id, response: Err(Error::Abort) }, - )?; - } - } - Ok(()) - } - - /// Executes any queries that are ready. - fn query_execute(&mut self, state: &mut dyn State) -> Result<()> { - for query in self.query_ready(state.get_applied_index()) { - debug!("Executing query {:?}", query.command); - let result = state.query(query.command); - if let Err(error @ Error::Internal(_)) = result { - return Err(error); - } - self.send( - query.address, - Event::ClientResponse { id: query.id, response: result.map(Response::Query) }, - )? - } - Ok(()) - } - - /// Fetches and removes any ready queries, where index <= applied_index. - fn query_ready(&mut self, applied_index: Index) -> Vec { - let mut ready = Vec::new(); - let mut empty = Vec::new(); - for (index, queries) in self.queries.range_mut(..=applied_index) { - let mut ready_ids = Vec::new(); - for (id, query) in queries.iter_mut() { - if query.votes.len() as u8 >= query.quorum { - ready_ids.push(id.clone()); - } - } - for id in ready_ids { - if let Some(query) = queries.remove(&id) { - ready.push(query) - } - } - if queries.is_empty() { - empty.push(*index) - } - } - for index in empty { - self.queries.remove(&index); - } - ready - } - - /// Votes for queries up to and including a given commit index for a term by an address. - fn query_vote(&mut self, term: Term, commit_index: Index, address: Address) { - for (_, queries) in self.queries.range_mut(..=commit_index) { - for (_, query) in queries.iter_mut() { - if term >= query.term { - query.votes.insert(address); - } - } - } - } - - /// Sends a message. - fn send(&self, to: Address, event: Event) -> Result<()> { - // TODO: This needs to use the correct term. - let msg = Message { from: Address::Node(self.node_id), to, term: 0, event }; - debug!("Sending {:?}", msg); - Ok(self.node_tx.send(msg)?) - } -} - #[cfg(test)] pub mod tests { use super::*; - use pretty_assertions::assert_eq; use std::sync::{Arc, Mutex}; #[derive(Clone, Debug)] @@ -303,180 +65,4 @@ pub mod tests { Ok(command) } } - - async fn setup() -> Result<( - Box, - mpsc::UnboundedSender, - mpsc::UnboundedReceiver, - )> { - let state = Box::new(TestState::new(0)); - let (state_tx, state_rx) = mpsc::unbounded_channel(); - let (node_tx, node_rx) = mpsc::unbounded_channel(); - tokio::spawn(Driver::new(1, state_rx, node_tx).drive(state.clone())); - Ok((state, state_tx, node_rx)) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_abort() -> Result<()> { - let (state, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Notify { - id: vec![0x01], - index: 1, - address: Address::Node(1), - })?; - state_tx.send(Instruction::Query { - id: vec![0x02], - address: Address::Client, - command: vec![0xf0], - term: 1, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; - state_tx.send(Instruction::Abort)?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!( - node_rx.collect::>().await, - vec![ - Message { - from: Address::Node(1), - to: Address::Node(1), - term: 0, - event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) } - }, - Message { - from: Address::Node(1), - to: Address::Client, - term: 0, - event: Event::ClientResponse { id: vec![0x02], response: Err(Error::Abort) } - } - ] - ); - assert_eq!(state.list(), Vec::>::new()); - assert_eq!(state.get_applied_index(), 0); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_apply() -> Result<()> { - let (state, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Notify { - id: vec![0x01], - index: 2, - address: Address::Client, - })?; - state_tx.send(Instruction::Apply { entry: Entry { index: 1, term: 1, command: None } })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 2, term: 1, command: Some(vec![0xaf]) }, - })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!( - node_rx.collect::>().await, - vec![Message { - from: Address::Node(1), - to: Address::Client, - term: 0, - event: Event::ClientResponse { - id: vec![0x01], - response: Ok(Response::Mutate(vec![0xaf])) - } - }] - ); - assert_eq!(state.list(), vec![vec![0xaf]]); - assert_eq!(state.get_applied_index(), 2); - - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_query() -> Result<()> { - let (_, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Query { - id: vec![0x01], - address: Address::Client, - command: vec![0xf0], - term: 2, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 1, term: 2, command: Some(vec![0xaf]) }, - })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(1) })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(2) })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!( - node_rx.collect::>().await, - vec![Message { - from: Address::Node(1), - to: Address::Client, - term: 0, - event: Event::ClientResponse { - id: vec![0x01], - response: Ok(Response::Query(vec![0xf0])) - } - }] - ); - - Ok(()) - } - - // A query for an index submitted in a given term cannot be satisfied by votes below that term. - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_query_noterm() -> Result<()> { - let (_, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Query { - id: vec![0x01], - address: Address::Client, - command: vec![0xf0], - term: 2, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, - })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(1) })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!(node_rx.collect::>().await, vec![]); - Ok(()) - } - - #[tokio::test(flavor = "multi_thread", worker_threads = 2)] - async fn driver_query_noquorum() -> Result<()> { - let (_, state_tx, node_rx) = setup().await?; - - state_tx.send(Instruction::Query { - id: vec![0x01], - address: Address::Client, - command: vec![0xf0], - term: 1, - index: 1, - quorum: 2, - })?; - state_tx.send(Instruction::Apply { - entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, - })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; - std::mem::drop(state_tx); - - let node_rx = UnboundedReceiverStream::new(node_rx); - assert_eq!(node_rx.collect::>().await, vec![]); - - Ok(()) - } }