Skip to content

Commit

Permalink
Fix Raft term changes.
Browse files Browse the repository at this point in the history
This patch improves Raft term changes, by becoming a leaderless follower
for new terms, and tracking voted_for properly for candidates and
leaders.
  • Loading branch information
erikgrinaker committed Nov 19, 2023
1 parent d2b1a79 commit 30986a5
Show file tree
Hide file tree
Showing 6 changed files with 231 additions and 133 deletions.
13 changes: 12 additions & 1 deletion src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,21 @@ pub enum Address {
/// A node with the specified node ID (local or remote). Valid both as
/// sender and recipient.
Node(NodeID),
/// A local client.
/// A local client. Can only send ClientRequest messages, and receive
/// ClientResponse messages.
Client,
}

impl Address {
/// Unwraps the node ID, or panics if address is not of kind Node.
pub fn unwrap(&self) -> NodeID {
match self {
Self::Node(id) => *id,
_ => panic!("unwrap called on non-Node address {:?}", self),
}
}
}

/// A message passed between Raft nodes.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct Message {
Expand Down
64 changes: 45 additions & 19 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use super::{
};
use crate::error::{Error, Result};

use ::log::{debug, info, warn};
use ::log::{debug, error, info, warn};
use rand::Rng as _;

/// A candidate is campaigning to become a leader.
Expand All @@ -31,12 +31,30 @@ impl Candidate {
}

impl RoleNode<Candidate> {
/// Transition to follower role.
fn become_follower(mut self, term: Term, leader: NodeID) -> Result<RoleNode<Follower>> {
info!("Discovered leader {} for term {}, following", leader, term);
self.term = term;
self.log.set_term(term, None)?;
let mut node = self.become_role(Follower::new(Some(leader), None))?;
/// Transforms the node into a follower. We either lost the election
/// and follow the winner, or we discovered a new term in which case
/// we step into it as a leaderless follower.
fn become_follower(mut self, term: Term, leader: Option<NodeID>) -> Result<RoleNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);

let mut node = if let Some(leader) = leader {
// We lost the election, follow the winner.
assert_eq!(term, self.term, "Can't follow leader in different term");
info!("Lost election, following leader {} in term {}", leader, term);
let voted_for = Some(self.id); // by definition
self.become_role(Follower::new(Some(leader), voted_for))
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
self.become_role(Follower::new(None, None))
};
// Abort any proxied requests.
//
// TODO: Candidates shouldn't proxy requests.
node.abort_proxied()?;
Ok(node)
}
Expand All @@ -47,7 +65,7 @@ impl RoleNode<Candidate> {
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let (commit_index, commit_term) = self.log.get_commit_index();
let mut node = self.become_role(Leader::new(peers, last_index))?;
let mut node = self.become_role(Leader::new(peers, last_index));
node.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?;
node.append(None)?;
node.abort_proxied()?;
Expand All @@ -56,21 +74,29 @@ impl RoleNode<Candidate> {

/// Processes a message.
pub fn step(mut self, msg: Message) -> Result<Node> {
// Drop invalid messages and messages from past terms.
if let Err(err) = self.validate(&msg) {
warn!("Ignoring invalid message: {}", err);
error!("Invalid message: {} ({:?})", err, msg);
return Ok(self.into());
}
if msg.term < self.term && msg.term > 0 {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// ReplicateEntries from the leader, stepping it will follow the leader.
if msg.term > self.term {
if let Address::Node(from) = msg.from {
return self.become_follower(msg.term, from)?.step(msg);
}
return self.become_follower(msg.term, None)?.step(msg);
}

match msg.event {
Event::Heartbeat { .. } => {
if let Address::Node(from) = msg.from {
return self.become_follower(msg.term, from)?.step(msg);
}
// If we receive a heartbeat or replicated entries in this term, we
// lost the election and have a new leader. Follow it and process
// the message.
Event::Heartbeat { .. } | Event::ReplicateEntries { .. } => {
return self.become_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
}

Event::GrantVote => {
Expand All @@ -97,8 +123,8 @@ impl RoleNode<Candidate> {
// Ignore other candidates when we're also campaigning
Event::SolicitVote { .. } => {}

// We're not a leader in this term, so we shoudn't see these.
Event::ConfirmLeader { .. }
| Event::ReplicateEntries { .. }
| Event::AcceptEntries { .. }
| Event::RejectEntries { .. } => warn!("Received unexpected message {:?}", msg),
}
Expand All @@ -113,7 +139,7 @@ impl RoleNode<Candidate> {
info!("Election timed out, starting new election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
self.term += 1;
self.log.set_term(self.term, None)?;
self.log.set_term(self.term, Some(self.id))?;
self.role = Candidate::new();
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
}
Expand Down Expand Up @@ -143,7 +169,7 @@ mod tests {
log.append(1, Some(vec![0x02]))?;
log.append(2, Some(vec![0x03]))?;
log.commit(2)?;
log.set_term(3, None)?;
log.set_term(3, Some(1))?;

let node = RoleNode {
id: 1,
Expand Down
153 changes: 98 additions & 55 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::super::{Address, Event, Instruction, Message, Response};
use super::{Candidate, Node, NodeID, RoleNode, Term, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN};
use crate::error::{Error, Result};

use ::log::{debug, info, warn};
use ::log::{debug, error, info, warn};
use rand::Rng as _;

// A follower replicates state from a leader.
Expand Down Expand Up @@ -36,62 +36,117 @@ impl RoleNode<Follower> {
fn become_candidate(self) -> Result<RoleNode<Candidate>> {
info!("Starting election for term {}", self.term + 1);
let (last_index, last_term) = self.log.get_last_index();
let mut node = self.become_role(Candidate::new())?;
let mut node = self.become_role(Candidate::new());
node.term += 1;
node.log.set_term(node.term, None)?;
node.log.set_term(node.term, Some(node.id))?;
node.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
Ok(node)
}

/// Transforms the node into a follower for a new leader.
fn become_follower(mut self, leader: NodeID, term: Term) -> Result<RoleNode<Follower>> {
let mut voted_for = None;
if term > self.term {
info!("Discovered new term {}, following leader {}", term, leader);
/// Transforms the node into a follower, either a leaderless follower in a
/// new term or following a leader in the current term.
fn become_follower(mut self, leader: Option<NodeID>, term: Term) -> Result<RoleNode<Follower>> {
assert!(term >= self.term, "Term regression {} -> {}", self.term, term);

if let Some(leader) = leader {
// We found a leader in the current term.
assert_eq!(self.role.leader, None, "Already have leader in term");
assert_eq!(term, self.term, "Can't follow leader in different term");
info!("Following leader {} in term {}", leader, term);
self.role = Follower::new(Some(leader), self.role.voted_for);
} else {
// We found a new term, but we don't necessarily know who the leader
// is yet. We'll find out when we step a message from it.
assert_ne!(term, self.term, "Can't become leaderless follower in current term");
info!("Discovered new term {}", term);
self.term = term;
self.log.set_term(term, None)?;
} else {
info!("Discovered leader {}, following", leader);
voted_for = self.role.voted_for;
};
self.role = Follower::new(Some(leader), voted_for);
self.role = Follower::new(None, None);
}
// Abort any proxied requests.
//
// TODO: Move this into the new term branch, and assert that there are
// no proxied requests in the new leader branch.
self.abort_proxied()?;
Ok(self)
}

/// Checks if an address is the current leader
/// Checks if an address is the current leader.
fn is_leader(&self, from: &Address) -> bool {
matches!((&self.role.leader, from), (Some(leader), Address::Node(from)) if leader == from)
if let Some(leader) = &self.role.leader {
if let Address::Node(from) = from {
return leader == from;
}
}
false
}

/// Processes a message.
pub fn step(mut self, msg: Message) -> Result<Node> {
// Drop invalid messages and messages from past terms.
if let Err(err) = self.validate(&msg) {
warn!("Ignoring invalid message: {}", err);
error!("Invalid message: {} ({:?})", err, msg);
return Ok(self.into());
}
if let Address::Node(from) = msg.from {
if msg.term > self.term || self.role.leader.is_none() {
return self.become_follower(from, msg.term)?.step(msg);
}
if msg.term < self.term && msg.term > 0 {
debug!("Dropping message from past term ({:?})", msg);
return Ok(self.into());
}

// If we receive a message for a future term, become a leaderless
// follower in it and step the message. If the message is a Heartbeat or
// ReplicateEntries from the leader, stepping it will follow the leader.
if msg.term > self.term {
return self.become_follower(None, msg.term)?.step(msg);
}

// Record when we last saw a message from the leader (if any).
if self.is_leader(&msg.from) {
self.role.leader_seen_ticks = 0
}

match msg.event {
// The leader will send periodic heartbeats. If we don't have a
// leader in this term yet, follow it. If the commit_index advances,
// apply state transitions.
Event::Heartbeat { commit_index, commit_term } => {
if self.is_leader(&msg.from) {
let has_committed = self.log.has(commit_index, commit_term)?;
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.log.commit(commit_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
// Check that the heartbeat is from our leader.
let from = msg.from.unwrap();
match self.role.leader {
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
None => self = self.become_follower(Some(from), msg.term)?,
}

// Advance commit index and apply entries if possible.
let has_committed = self.log.has(commit_index, commit_term)?;
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.log.commit(commit_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
}
self.send(msg.from, Event::ConfirmLeader { commit_index, has_committed })?;
}
self.send(msg.from, Event::ConfirmLeader { commit_index, has_committed })?;
}

// Replicate entries from the leader. If we don't have a leader in
// this term yet, follow it.
Event::ReplicateEntries { base_index, base_term, entries } => {
// Check that the entries are from our leader.
let from = msg.from.unwrap();
match self.role.leader {
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
None => self = self.become_follower(Some(from), msg.term)?,
}

// Append the entries, if possible.
if base_index > 0 && !self.log.has(base_index, base_term)? {
debug!("Rejecting log entries at base {}", base_index);
self.send(msg.from, Event::RejectEntries)?
} else {
let last_index = self.log.splice(entries)?;
self.send(msg.from, Event::AcceptEntries { last_index })?
}
}

Expand All @@ -116,18 +171,6 @@ impl RoleNode<Follower> {
}
}

Event::ReplicateEntries { base_index, base_term, entries } => {
if self.is_leader(&msg.from) {
if base_index > 0 && !self.log.has(base_index, base_term)? {
debug!("Rejecting log entries at base {}", base_index);
self.send(msg.from, Event::RejectEntries)?
} else {
let last_index = self.log.splice(entries)?;
self.send(msg.from, Event::AcceptEntries { last_index })?
}
}
}

// Forward requests to the leader, or abort them if there is none.
Event::ClientRequest { ref id, .. } => {
let id = id.clone();
Expand All @@ -150,6 +193,7 @@ impl RoleNode<Follower> {
// Ignore votes which are usually strays from the previous election that we lost.
Event::GrantVote => {}

// We're not a leader in this term, so we shoudn't see these.
Event::ConfirmLeader { .. }
| Event::AcceptEntries { .. }
| Event::RejectEntries { .. } => warn!("Received unexpected message {:?}", msg),
Expand Down Expand Up @@ -292,19 +336,18 @@ pub mod tests {
}

#[test]
// Heartbeat from fake leader
fn step_heartbeat_fake_leader() -> Result<()> {
let (follower, mut node_rx, mut state_rx) = setup()?;
let mut node = follower.step(Message {
from: Address::Node(3),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 5, commit_term: 3 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2);
assert_messages(&mut node_rx, vec![]);
assert_messages(&mut state_rx, vec![]);
Ok(())
#[should_panic(expected = "Multiple leaders in term")]
// Heartbeat from other leader should panic.
fn step_heartbeat_fake_leader() {
let (follower, _, _) = setup().unwrap();
follower
.step(Message {
from: Address::Node(3),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 5, commit_term: 3 },
})
.unwrap();
}

#[test]
Expand Down
Loading

0 comments on commit 30986a5

Please sign in to comment.