Skip to content

Commit

Permalink
Track reads on Raft leader
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 7, 2024
1 parent dac84b7 commit a8d92ad
Show file tree
Hide file tree
Showing 6 changed files with 149 additions and 44 deletions.
9 changes: 9 additions & 0 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,21 @@ pub enum Event {
commit_index: Index,
/// The term of the leader's last committed log entry.
commit_term: Term,
/// The latest read sequence number of the leader.
read_seq: ReadSequence,
},
/// Followers confirm loyalty to leader after heartbeats.
ConfirmLeader {
/// The commit_index of the original leader heartbeat, to confirm
/// read requests.
///
/// TODO: remove these when migrated to read_seq.
commit_index: Index,
/// If false, the follower does not have the entry at commit_index
/// and would like the leader to replicate it.
has_committed: bool,
/// The read sequence number of the heartbeat we're responding to.
read_seq: ReadSequence,
},

/// Candidates solicit votes from all peers when campaigning for leadership.
Expand Down Expand Up @@ -113,6 +119,9 @@ pub enum Event {
/// A client request ID.
pub type RequestID = Vec<u8>;

/// A read sequence number, used to confirm leadership for linearizable reads.
pub type ReadSequence = u64;

/// A client request.
#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub enum Request {
Expand Down
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod server;
mod state;

pub use self::log::{Entry, Index, Log};
pub use message::{Address, Event, Message, Request, RequestID, Response};
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
pub use node::{Node, NodeID, Status, Term};
pub use server::Server;
pub use state::{Driver, Instruction, State};
12 changes: 6 additions & 6 deletions src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 2, commit_term: 1 },
event: Event::Heartbeat { commit_index: 2, commit_term: 1, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3);
assert_messages(
Expand All @@ -219,7 +219,7 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true },
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Expand All @@ -235,7 +235,7 @@ mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 4,
event: Event::Heartbeat { commit_index: 2, commit_term: 1 },
event: Event::Heartbeat { commit_index: 2, commit_term: 1, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(4);
assert_messages(
Expand All @@ -244,7 +244,7 @@ mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 4,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true },
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Expand All @@ -259,7 +259,7 @@ mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 2,
event: Event::Heartbeat { commit_index: 1, commit_term: 1 },
event: Event::Heartbeat { commit_index: 1, commit_term: 1, read_seq: 7 },
})?;
assert_node(&mut node).is_candidate().term(3);
assert_messages(&mut node_rx, vec![]);
Expand Down Expand Up @@ -299,7 +299,7 @@ mod tests {
from: Address::Node(1),
to: Address::Broadcast,
term: 3,
event: Event::Heartbeat { commit_index: 2, commit_term: 1 },
event: Event::Heartbeat { commit_index: 2, commit_term: 1, read_seq: 0 },
},
);

Expand Down
51 changes: 31 additions & 20 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ impl RawNode<Follower> {
// The leader will send periodic heartbeats. If we don't have a
// leader in this term yet, follow it. If the commit_index advances,
// apply state transitions.
Event::Heartbeat { commit_index, commit_term } => {
Event::Heartbeat { commit_index, commit_term, read_seq } => {
// Check that the heartbeat is from our leader.
let from = msg.from.unwrap();
match self.role.leader {
Expand All @@ -160,7 +160,10 @@ impl RawNode<Follower> {
self.state_tx.send(Instruction::Apply { entry })?;
}
}
self.send(msg.from, Event::ConfirmLeader { commit_index, has_committed })?;
self.send(
msg.from,
Event::ConfirmLeader { commit_index, has_committed, read_seq },
)?;
}

// Replicate entries from the leader. If we don't have a leader in
Expand Down Expand Up @@ -329,7 +332,7 @@ pub mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 3, commit_term: 2 },
event: Event::Heartbeat { commit_index: 3, commit_term: 2, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(3);
assert_messages(
Expand All @@ -338,7 +341,7 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 3, has_committed: true },
event: Event::ConfirmLeader { commit_index: 3, has_committed: true, read_seq: 7 },
}],
);
assert_messages(
Expand All @@ -358,7 +361,7 @@ pub mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 3, commit_term: 3 },
event: Event::Heartbeat { commit_index: 3, commit_term: 3, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2);
assert_messages(
Expand All @@ -367,7 +370,7 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 3, has_committed: false },
event: Event::ConfirmLeader { commit_index: 3, has_committed: false, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Expand All @@ -382,7 +385,7 @@ pub mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 5, commit_term: 3 },
event: Event::Heartbeat { commit_index: 5, commit_term: 3, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2);
assert_messages(
Expand All @@ -391,7 +394,7 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 5, has_committed: false },
event: Event::ConfirmLeader { commit_index: 5, has_committed: false, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Expand All @@ -408,7 +411,7 @@ pub mod tests {
from: Address::Node(3),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 5, commit_term: 3 },
event: Event::Heartbeat { commit_index: 5, commit_term: 3, read_seq: 7 },
})
.unwrap();
}
Expand All @@ -422,7 +425,7 @@ pub mod tests {
from: Address::Node(3),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 3, commit_term: 2 },
event: Event::Heartbeat { commit_index: 3, commit_term: 2, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(3)).voted_for(None).committed(3);
assert_messages(
Expand All @@ -431,7 +434,7 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(3),
term: 3,
event: Event::ConfirmLeader { commit_index: 3, has_committed: true },
event: Event::ConfirmLeader { commit_index: 3, has_committed: true, read_seq: 7 },
}],
);
assert_messages(
Expand All @@ -451,7 +454,7 @@ pub mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 1, commit_term: 1 },
event: Event::Heartbeat { commit_index: 1, commit_term: 1, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2);
assert_messages(
Expand All @@ -460,7 +463,7 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 1, has_committed: true },
event: Event::ConfirmLeader { commit_index: 1, has_committed: true, read_seq: 7 },
}],
);
assert_messages(&mut state_rx, vec![]);
Expand All @@ -475,7 +478,7 @@ pub mod tests {
from: Address::Node(3),
to: Address::Node(1),
term: 4,
event: Event::Heartbeat { commit_index: 3, commit_term: 2 },
event: Event::Heartbeat { commit_index: 3, commit_term: 2, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(4).leader(Some(3)).voted_for(None);
assert_messages(
Expand All @@ -484,7 +487,7 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(3),
term: 4,
event: Event::ConfirmLeader { commit_index: 3, has_committed: true },
event: Event::ConfirmLeader { commit_index: 3, has_committed: true, read_seq: 7 },
}],
);
assert_messages(
Expand All @@ -504,7 +507,7 @@ pub mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 2,
event: Event::Heartbeat { commit_index: 3, commit_term: 2 },
event: Event::Heartbeat { commit_index: 3, commit_term: 2, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2);
assert_messages(&mut node_rx, vec![]);
Expand Down Expand Up @@ -989,7 +992,7 @@ pub mod tests {
from: Address::Node(3),
to: Address::Node(1),
term: 4,
event: Event::Heartbeat { commit_index: 3, commit_term: 2 },
event: Event::Heartbeat { commit_index: 3, commit_term: 2, read_seq: 7 },
})?;
assert_node(&mut node).is_follower().term(4).leader(Some(3)).forwarded(vec![]);
assert_messages(
Expand All @@ -1005,7 +1008,11 @@ pub mod tests {
from: Address::Node(1),
to: Address::Node(3),
term: 4,
event: Event::ConfirmLeader { commit_index: 3, has_committed: true },
event: Event::ConfirmLeader {
commit_index: 3,
has_committed: true,
read_seq: 7,
},
},
],
);
Expand Down Expand Up @@ -1033,15 +1040,19 @@ pub mod tests {
from: Address::Node(2),
to: Address::Node(1),
term: 3,
event: Event::Heartbeat { commit_index: 2, commit_term: 1 },
event: Event::Heartbeat { commit_index: 2, commit_term: 1, read_seq: 7 },
})?;
assert_messages(
&mut node_rx,
vec![Message {
from: Address::Node(1),
to: Address::Node(2),
term: 3,
event: Event::ConfirmLeader { commit_index: 2, has_committed: true },
event: Event::ConfirmLeader {
commit_index: 2,
has_committed: true,
read_seq: 7,
},
}],
)
}
Expand Down
Loading

0 comments on commit a8d92ad

Please sign in to comment.