Skip to content

Commit

Permalink
Tweak Raft heartbeat logic.
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Nov 19, 2023
1 parent b44b0bf commit d76cb23
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 14 deletions.
3 changes: 1 addition & 2 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,8 @@ impl RoleNode<Candidate> {
info!("Won election for term {}, becoming leader", self.term);
let peers = self.peers.clone();
let (last_index, _) = self.log.get_last_index();
let (commit_index, commit_term) = self.log.get_commit_index();
let mut node = self.become_role(Leader::new(peers, last_index));
node.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?;
node.heartbeat()?;
node.append(None)?;
Ok(node)
}
Expand Down
28 changes: 16 additions & 12 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::collections::HashMap;
// A leader serves requests and replicates the log to followers.
#[derive(Debug)]
pub struct Leader {
/// Number of ticks since last heartbeat.
/// Number of ticks since last periodic heartbeat.
since_heartbeat: Ticks,
/// The next index to replicate to a peer.
peer_next_index: HashMap<NodeID, Index>,
Expand Down Expand Up @@ -157,7 +157,7 @@ impl RoleNode<Leader> {
}

Event::ClientRequest { id, request: Request::Query(command) } => {
let (commit_index, commit_term) = self.log.get_commit_index();
let (commit_index, _) = self.log.get_commit_index();
self.state_tx.send(Instruction::Query {
id,
address: msg.from,
Expand All @@ -171,9 +171,7 @@ impl RoleNode<Leader> {
index: commit_index,
address: Address::Node(self.id),
})?;
if !self.peers.is_empty() {
self.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?;
}
self.heartbeat()?;
}

Event::ClientRequest { id, request: Request::Mutate(command) } => {
Expand Down Expand Up @@ -216,16 +214,22 @@ impl RoleNode<Leader> {

/// Processes a logical clock tick.
pub fn tick(mut self) -> Result<Node> {
if !self.peers.is_empty() {
self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= HEARTBEAT_INTERVAL {
let (commit_index, commit_term) = self.log.get_commit_index();
self.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?;
self.role.since_heartbeat = 0;
}
self.role.since_heartbeat += 1;
if self.role.since_heartbeat >= HEARTBEAT_INTERVAL {
self.heartbeat()?;
self.role.since_heartbeat = 0;
}
Ok(self.into())
}

/// Broadcasts a heartbeat to all peers.
pub(super) fn heartbeat(&mut self) -> Result<()> {
let (commit_index, commit_term) = self.log.get_commit_index();
self.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?;
// NB: We don't reset self.since_heartbeat here, because we want to send
// periodic heartbeats regardless of any on-demand heartbeats.
Ok(())
}
}

#[cfg(test)]
Expand Down

0 comments on commit d76cb23

Please sign in to comment.