Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
erikgrinaker committed Jan 7, 2024
1 parent a8d92ad commit bfc53b8
Show file tree
Hide file tree
Showing 7 changed files with 91 additions and 493 deletions.
5 changes: 0 additions & 5 deletions src/raft/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,6 @@ pub enum Event {
},
/// Followers confirm loyalty to leader after heartbeats.
ConfirmLeader {
/// The commit_index of the original leader heartbeat, to confirm
/// read requests.
///
/// TODO: remove these when migrated to read_seq.
commit_index: Index,
/// If false, the follower does not have the entry at commit_index
/// and would like the leader to replicate it.
has_committed: bool,
Expand Down
2 changes: 1 addition & 1 deletion src/raft/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
pub use node::{Node, NodeID, Status, Term};
pub use server::Server;
pub use state::{Driver, Instruction, State};
pub use state::State;
6 changes: 5 additions & 1 deletion src/raft/node/candidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,9 @@ impl RawNode<Candidate> {
}

#[cfg(test)]
#[cfg(never)] // TODO
mod tests {
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Instruction, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
Expand All @@ -182,7 +184,9 @@ mod tests {
)> {
let (node_tx, node_rx) = mpsc::unbounded_channel();
let (state_tx, state_rx) = mpsc::unbounded_channel();
let state = Box::new(TestState::new(0));
let mut log = Log::new(storage::engine::Memory::new(), false)?;

log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
log.append(2, Some(vec![0x03]))?;
Expand All @@ -194,8 +198,8 @@ mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 3,
log,
state,
node_tx,
state_tx,
role: Candidate::new(),
};
node.role.votes.insert(1);
Expand Down
30 changes: 17 additions & 13 deletions src/raft/node/follower.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response};
use super::super::{Address, Event, Log, Message, RequestID, Response, State};
use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks};
use crate::error::{Error, Result};

Expand Down Expand Up @@ -43,12 +43,12 @@ impl RawNode<Follower> {
id: NodeID,
peers: HashSet<NodeID>,
mut log: Log,
state: Box<dyn State>,
node_tx: mpsc::UnboundedSender<Message>,
state_tx: mpsc::UnboundedSender<Instruction>,
) -> Result<Self> {
let (term, voted_for) = log.get_term()?;
let role = Follower::new(None, voted_for);
Ok(Self { id, peers, term, log, node_tx, state_tx, role })
Ok(Self { id, peers, term, log, state, node_tx, role })
}

/// Asserts internal invariants.
Expand Down Expand Up @@ -150,20 +150,24 @@ impl RawNode<Follower> {
None => self = self.into_follower(Some(from), msg.term)?,
}

// Advance commit index and apply entries if possible.
// Respond to the heartbeat.
let has_committed = self.log.has(commit_index, commit_term)?;
self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?;

// Advance commit index.
let (old_commit_index, _) = self.log.get_commit_index();
if has_committed && commit_index > old_commit_index {
self.log.commit(commit_index)?;
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
}

// Apply entries.
let applied_index = self.state.get_applied_index();
if applied_index < commit_index {
let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
self.state_tx.send(Instruction::Apply { entry })?;
Self::apply(&mut self.state, entry)?.ok();
}
}
self.send(
msg.from,
Event::ConfirmLeader { commit_index, has_committed, read_seq },
)?;
}

// Replicate entries from the leader. If we don't have a leader in
Expand Down Expand Up @@ -281,7 +285,9 @@ impl RawNode<Follower> {
}

#[cfg(test)]
#[cfg(never)] // TODO
pub mod tests {
use super::super::super::state::tests::TestState;
use super::super::super::{Entry, Log, Request};
use super::super::tests::{assert_messages, assert_node};
use super::*;
Expand Down Expand Up @@ -624,7 +630,6 @@ pub mod tests {
fn step_appendentries_base0() -> Result<()> {
// TODO: Move this into a setup function.
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
let mut log = Log::new(storage::engine::Memory::new(), false)?;
log.append(1, Some(vec![0x01]))?;
log.append(1, Some(vec![0x02]))?;
Expand All @@ -636,8 +641,8 @@ pub mod tests {
peers: HashSet::from([2, 3, 4, 5]),
term: 1,
log,
state: Box::new(TestState::new(0)),
node_tx,
state_tx,
role: Follower::new(Some(2), None),
};

Expand Down Expand Up @@ -667,7 +672,6 @@ pub mod tests {
event: Event::AcceptEntries { last_index: 2 },
}],
);
assert_messages(&mut state_rx, vec![]);
Ok(())
}

Expand Down
81 changes: 38 additions & 43 deletions src/raft/node/leader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use super::super::{
Address, Event, Index, Instruction, Message, ReadSequence, Request, RequestID, Status,
Address, Event, Index, Message, ReadSequence, Request, RequestID, Response, Status,
};
use super::{Follower, Node, NodeID, RawNode, Role, Term, Ticks, HEARTBEAT_INTERVAL};
use crate::error::{Error, Result};
Expand Down Expand Up @@ -39,8 +39,6 @@ pub struct Leader {
///
/// If the leader loses leadership, all pending write requests are aborted
/// by returning Error::Abort.
///
/// TODO: Actually return responses when applied.
writes: HashMap<Index, Write>,
/// Keeps track of pending read requests. To guarantee linearizability, read
/// requests are assigned a sequence number and registered here when
Expand Down Expand Up @@ -97,7 +95,6 @@ impl RawNode<Leader> {
info!("Discovered new term {}", term);

// Cancel in-flight requests.
self.state_tx.send(Instruction::Abort)?;
for write in std::mem::take(&mut self.role.writes).into_values() {
self.send(
write.from,
Expand Down Expand Up @@ -141,16 +138,10 @@ impl RawNode<Leader> {
// are its leader. If it doesn't have the commit index in its local
// log, replicate the log to it. If the peer's read sequence number
// increased, process any pending reads.
Event::ConfirmLeader { commit_index, has_committed, read_seq } => {
Event::ConfirmLeader { has_committed, read_seq } => {
assert!(read_seq <= self.role.read_seq, "Future read sequence number");

let from = msg.from.unwrap();
self.state_tx.send(Instruction::Vote {
term: msg.term,
index: commit_index,
address: msg.from,
})?;

let mut read_seq_advanced = false;
self.role.progress.entry(from).and_modify(|p| {
if read_seq > p.read_seq {
Expand Down Expand Up @@ -179,7 +170,7 @@ impl RawNode<Leader> {
p.last = last_index;
p.next = last_index + 1;
});
self.maybe_commit()?;
self.maybe_commit_and_apply()?;
}

// A follower rejected log entries we sent it, typically because it
Expand Down Expand Up @@ -210,20 +201,6 @@ impl RawNode<Leader> {
id.clone(),
command.clone(),
));
let (commit_index, _) = self.log.get_commit_index();
self.state_tx.send(Instruction::Query {
id,
address: msg.from,
command,
term: self.term,
index: commit_index,
quorum: self.quorum(),
})?;
self.state_tx.send(Instruction::Vote {
term: self.term,
index: commit_index,
address: Address::Node(self.id),
})?;
if self.peers.is_empty() {
self.maybe_read()?;
}
Expand All @@ -235,15 +212,14 @@ impl RawNode<Leader> {
Event::ClientRequest { id, request: Request::Mutate(command) } => {
let index = self.propose(Some(command))?;
self.role.writes.insert(index, Write { from: msg.from, id: id.clone() });
self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?;
if self.peers.is_empty() {
self.maybe_commit()?;
self.maybe_commit_and_apply()?;
}
}

Event::ClientRequest { id, request: Request::Status } => {
let engine_status = self.log.status()?;
let status = Box::new(Status {
let status = Status {
server: self.id,
leader: self.id,
term: self.term,
Expand All @@ -255,11 +231,14 @@ impl RawNode<Leader> {
.chain(std::iter::once((self.id, self.log.get_last_index().0)))
.collect(),
commit_index: self.log.get_commit_index().0,
apply_index: 0,
apply_index: self.state.get_applied_index(),
storage: engine_status.name.clone(),
storage_size: engine_status.size,
});
self.state_tx.send(Instruction::Status { id, address: msg.from, status })?
};
self.send(
msg.from,
Event::ClientResponse { id, response: Ok(Response::Status(status)) },
)?;
}

// Votes can come in after we won the election, ignore them.
Expand Down Expand Up @@ -306,9 +285,9 @@ impl RawNode<Leader> {
Ok(index)
}

/// Commits any new log entries that have been replicated to a quorum,
/// and schedules them for state machine application.
fn maybe_commit(&mut self) -> Result<Index> {
/// Commits any new log entries that have been replicated to a quorum, and
/// applies them to the state machine.
fn maybe_commit_and_apply(&mut self) -> Result<Index> {
// Determine the new commit index, i.e. the last index replicated to a
// quorum of peers.
let mut last_indexes = self
Expand Down Expand Up @@ -344,15 +323,28 @@ impl RawNode<Leader> {
None => panic!("Commit index {} missing", commit_index),
};

// Commit and apply the new entries.
// Commit the new entries.
if commit_index > prev_commit_index {
self.log.commit(commit_index)?;
// TODO: Move application elsewhere, but needs access to applied index.
let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?;
}

// Apply entries.
let applied_index = self.state.get_applied_index();
if applied_index < commit_index {
let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
while let Some(entry) = scan.next().transpose()? {
// TODO: Send response.
self.role.writes.remove(&entry.index);
self.state_tx.send(Instruction::Apply { entry })?;
let index = entry.index;
let result = Self::apply(&mut self.state, entry)?;
if let Some(write) = self.role.writes.remove(&index) {
let response = result.map(Response::Mutate);
// TODO: use self.send() or something.
self.node_tx.send(Message {
from: Address::Node(self.id),
to: write.from,
term: self.term,
event: Event::ClientResponse { id: write.id, response },
})?;
}
}
}
Ok(commit_index)
Expand All @@ -378,8 +370,10 @@ impl RawNode<Leader> {
if *rs > read_seq {
break;
}
self.role.reads.pop_front().unwrap();
// TODO: Actually execute the reads.
let (_, from, id, command) = self.role.reads.pop_front().unwrap();
let result = self.state.query(command);
let response = result.map(Response::Query);
self.send(from, Event::ClientResponse { id, response })?;
}

Ok(read_seq)
Expand All @@ -404,6 +398,7 @@ impl RawNode<Leader> {
}

#[cfg(test)]
#[cfg(never)] // TODO
mod tests {
use super::super::super::{Entry, Log};
use super::super::tests::{assert_messages, assert_node};
Expand Down
Loading

0 comments on commit bfc53b8

Please sign in to comment.