Skip to content

Commit b57bc62

Browse files
committed
Track writes on Raft leader
This is in preparation for moving state machine application into the Raft node. For now, it only tracks the writes, without actually responding to clients when applied.
1 parent 2b2e34b commit b57bc62

File tree

1 file changed

+25
-4
lines changed

1 file changed

+25
-4
lines changed

src/raft/node/leader.rs

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
1-
use super::super::{Address, Event, Index, Instruction, Message, Request, Status};
1+
use super::super::{Address, Event, Index, Instruction, Message, Request, RequestID, Status};
22
use super::{Follower, Node, NodeID, RawNode, Role, Term, Ticks, HEARTBEAT_INTERVAL};
3-
use crate::error::Result;
3+
use crate::error::{Error, Result};
44

55
use ::log::{debug, info};
66
use std::collections::{HashMap, HashSet};
@@ -19,6 +19,16 @@ struct Progress {
1919
pub struct Leader {
2020
/// Peer replication progress.
2121
progress: HashMap<NodeID, Progress>,
22+
/// Keeps track of pending write requests. These are added when the write is
23+
/// appended to the leader's log (keyed by log index), and removed when the
24+
/// command is applied to the state machine, sending the command result to
25+
/// the waiting client.
26+
///
27+
/// If we lose leadership before the command is processed, all pending write
28+
/// requests are aborted by returning Error::Abort.
29+
///
30+
/// TODO: Actually return responses when applied.
31+
writes: HashMap<Index, (Address, RequestID)>,
2232
/// Number of ticks since last periodic heartbeat.
2333
since_heartbeat: Ticks,
2434
}
@@ -28,7 +38,7 @@ impl Leader {
2838
pub fn new(peers: HashSet<NodeID>, last_index: Index) -> Self {
2939
let next = last_index + 1;
3040
let progress = peers.into_iter().map(|p| (p, Progress { next, last: 0 })).collect();
31-
Self { progress, since_heartbeat: 0 }
41+
Self { progress, writes: HashMap::new(), since_heartbeat: 0 }
3242
}
3343
}
3444

@@ -53,9 +63,15 @@ impl RawNode<Leader> {
5363
assert!(term > self.term, "Can only become follower in later term");
5464

5565
info!("Discovered new term {}", term);
66+
67+
// Cancel in-flight requests.
68+
self.state_tx.send(Instruction::Abort)?;
69+
for (address, id) in std::mem::take(&mut self.role.writes).into_values() {
70+
self.send(address, Event::ClientResponse { id, response: Err(Error::Abort) })?;
71+
}
72+
5673
self.term = term;
5774
self.log.set_term(term, None)?;
58-
self.state_tx.send(Instruction::Abort)?;
5975
Ok(self.into_role(Follower::new(None, None)))
6076
}
6177

@@ -148,8 +164,11 @@ impl RawNode<Leader> {
148164
self.heartbeat()?;
149165
}
150166

167+
// A client submitted a write command. Propose it, and track it
168+
// until it's applied and the response is returned to the client.
151169
Event::ClientRequest { id, request: Request::Mutate(command) } => {
152170
let index = self.propose(Some(command))?;
171+
self.role.writes.insert(index, (msg.from, id.clone()));
153172
self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?;
154173
if self.peers.is_empty() {
155174
self.maybe_commit()?;
@@ -264,6 +283,8 @@ impl RawNode<Leader> {
264283
// TODO: Move application elsewhere, but needs access to applied index.
265284
let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?;
266285
while let Some(entry) = scan.next().transpose()? {
286+
// TODO: Send response.
287+
self.role.writes.remove(&entry.index);
267288
self.state_tx.send(Instruction::Apply { entry })?;
268289
}
269290
}

0 commit comments

Comments
 (0)