Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 7, 2024
1 parent 340c0d2 commit 9eeb2e2
Show file tree
Hide file tree
Showing 7 changed files with 123 additions and 554 deletions.
5 changes: 0 additions & 5 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ pub enum Event {
},
/// Followers confirm loyalty to leader after heartbeats.
ConfirmLeader {
/// The commit_index of the original leader heartbeat, to confirm
/// read requests.
///
/// TODO: remove this when migrated to read_seq.
commit_index: Index,
/// If false, the follower does not have the entry at commit_index
/// and would like the leader to replicate it.
has_committed: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
pub use node::{Node, NodeID, Status, Term};
pub use server::Server;
pub use state::{Driver, Instruction, State};
pub use state::State;
39 changes: 15 additions & 24 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,21 +168,19 @@ impl RawNode<Candidate> {

#[cfg(test)]
mod tests {
use super::super::super::{Entry, Instruction, Log, Request};
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::storage;
use tokio::sync::mpsc;

#[allow(clippy::type_complexity)]
fn setup() -> Result<(
RawNode<Candidate>,
mpsc::UnboundedReceiver<Message>,
mpsc::UnboundedReceiver<Instruction>,
)> {
fn setup() -> Result<(RawNode<Candidate>, mpsc::UnboundedReceiver<Message>)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
let (state_tx, state_rx) = mpsc::unbounded_channel();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::engine::Memory::new(), false)?;

log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
log.append(2, Some(vec![0x03]))?;
Expand All @@ -194,18 +192,18 @@ mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 3,
log,
state,
node_tx,
state_tx,
role: Candidate::new(),
};
node.role.votes.insert(1);
Ok((node, node_rx, state_rx))
Ok((node, node_rx))
}

#[test]
// Heartbeat for current term converts to follower and emits ConfirmLeader.
fn step_heartbeat_current_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -219,18 +217,17 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// Heartbeat for future term converts to follower and emits ConfirmLeader
// event.
fn step_heartbeat_future_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -244,17 +241,16 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 4,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// Heartbeat for past term is ignored
fn step_heartbeat_past_term() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = candidate.step(Message {
from: Address::Node(2),
to: Address::Node(1),
Expand All @@ -263,13 +259,12 @@ mod tests {
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
fn step_grantvote() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let peers = candidate.peers.clone();
let mut node = Node::Candidate(candidate);

Expand All @@ -282,7 +277,6 @@ mod tests {
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);

// However, the second external vote makes us leader
node = node.step(Message {
Expand Down Expand Up @@ -320,14 +314,13 @@ mod tests {
}

assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
// ClientRequest returns Error::Abort.
fn step_clientrequest() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let mut node = Node::Candidate(candidate);

node = node.step(Message {
Expand All @@ -346,13 +339,12 @@ mod tests {
event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

#[test]
fn tick() -> Result<()> {
let (candidate, mut node_rx, mut state_rx) = setup()?;
let (candidate, mut node_rx) = setup()?;
let timeout = candidate.role.election_timeout;
let mut node = Node::Candidate(candidate);

Expand All @@ -372,7 +364,6 @@ mod tests {
event: Event::SolicitVote { last_index: 3, last_term: 2 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}
}
43 changes: 16 additions & 27 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response};
use super::super::{Address, Event, Log, Message, RequestID, Response, State};
use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks};
use crate::error::{Error, Result};

Expand All @@ -10,13 +10,13 @@ use tokio::sync::mpsc;
#[derive(Clone, Debug, PartialEq)]
pub struct Follower {
/// The leader, or None if just initialized.
leader: Option<NodeID>,
pub(super) leader: Option<NodeID>,
/// The number of ticks since the last message from the leader.
leader_seen: Ticks,
/// The leader_seen timeout before triggering an election.
election_timeout: Ticks,
/// The node we voted for in the current term, if any.
voted_for: Option<NodeID>,
pub(super) voted_for: Option<NodeID>,
// Local client requests that have been forwarded to the leader. These are
// aborted on leader/term changes.
pub(super) forwarded: HashSet<RequestID>,
Expand All @@ -43,12 +43,12 @@ impl RawNode<Follower> {
id: NodeID,
peers: HashSet<NodeID>,
mut log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
state_tx: mpsc::UnboundedSender<Instruction>,
) -> Result<Self> {
let (term, voted_for) = log.get_term()?;
let role = Follower::new(None, voted_for);
Ok(Self { id, peers, term, log, node_tx, state_tx, role })
Ok(Self { id, peers, term, log, state, node_tx, role })
}

/// Asserts internal invariants.
Expand Down Expand Up @@ -150,20 +150,17 @@ impl RawNode<Follower> {
None => self = self.into_follower(Some(from), msg.term)?,
}

// Advance commit index and apply entries if possible.
// Respond to the heartbeat.
//
// TODO: this should return the last index and term.
let has_committed = self.log.has(commit_index, commit_term)?;
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?;

// Advance commit index and apply entries.
if has_committed && commit_index > self.log.get_commit_index().0 {
self.log.commit(commit_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
self.maybe_apply()?;
}
self.send(
msg.from,
Event::ConfirmLeader { commit_index, has_committed, read_seq },
)?;
}

// Replicate entries from the leader. If we don't have a leader in
Expand Down Expand Up @@ -281,22 +278,16 @@ impl RawNode<Follower> {
}

#[cfg(test)]
#[cfg(never)] // TODO
pub mod tests {
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
use crate::error::Error;
use crate::storage;
use tokio::sync::mpsc;

pub fn follower_leader(node: &RawNode<Follower>) -> Option<NodeID> {
node.role.leader
}

pub fn follower_voted_for(node: &RawNode<Follower>) -> Option<NodeID> {
node.role.voted_for
}

#[allow(clippy::type_complexity)]
fn setup() -> Result<(
RawNode<Follower>,
Expand Down Expand Up @@ -624,7 +615,6 @@ pub mod tests {
fn step_appendentries_base0() -> Result<()> {
// TODO: Move this into a setup function.
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
let mut log = Log::new(storage::engine::Memory::new(), false)?;
log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
Expand All @@ -636,8 +626,8 @@ pub mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 1,
log,
state: Box::new(TestState::new(0)),
node_tx,
state_tx,
role: Follower::new(Some(2), None),
};

Expand Down Expand Up @@ -667,7 +657,6 @@ pub mod tests {
event: Event::AcceptEntries { last_index: 2 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

Expand Down
Loading

0 comments on commit 9eeb2e2

Please sign in to comment.