Skip to content

Commit d469ce2

Browse files
committed
wip
1 parent 48037d0 commit d469ce2

File tree

7 files changed

+91
-493
lines changed

7 files changed

+91
-493
lines changed

Diff for: src/raft/message.rs

-5
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ pub enum Event {
5454
},
5555
/// Followers confirm loyalty to leader after heartbeats.
5656
ConfirmLeader {
57-
/// The commit_index of the original leader heartbeat, to confirm
58-
/// read requests.
59-
///
60-
/// TODO: remove these when migrated to read_seq.
61-
commit_index: Index,
6257
/// If false, the follower does not have the entry at commit_index
6358
/// and would like the leader to replicate it.
6459
has_committed: bool,

Diff for: src/raft/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
88
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
99
pub use node::{Node, NodeID, Status, Term};
1010
pub use server::Server;
11-
pub use state::{Driver, Instruction, State};
11+
pub use state::State;

Diff for: src/raft/node/candidate.rs

+5-1
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,9 @@ impl RawNode<Candidate> {
167167
}
168168

169169
#[cfg(test)]
170+
#[cfg(never)] // TODO
170171
mod tests {
172+
use super::super::super::state::tests::TestState;
171173
use super::super::super::{Entry, Instruction, Log, Request};
172174
use super::super::tests::{assert_messages, assert_node};
173175
use super::*;
@@ -182,7 +184,9 @@ mod tests {
182184
)> {
183185
let (node_tx, node_rx) = mpsc::unbounded_channel();
184186
let (state_tx, state_rx) = mpsc::unbounded_channel();
187+
let state = Box::new(TestState::new(0));
185188
let mut log = Log::new(storage::engine::Memory::new(), false)?;
189+
186190
log.append(1, Some(vec![0x01]))?;
187191
log.append(1, Some(vec![0x02]))?;
188192
log.append(2, Some(vec![0x03]))?;
@@ -194,8 +198,8 @@ mod tests {
194198
peers: HashSet::from([2, 3, 4, 5]),
195199
term: 3,
196200
log,
201+
state,
197202
node_tx,
198-
state_tx,
199203
role: Candidate::new(),
200204
};
201205
node.role.votes.insert(1);

Diff for: src/raft/node/follower.rs

+17-13
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response};
1+
use super::super::{Address, Event, Log, Message, RequestID, Response, State};
22
use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks};
33
use crate::error::{Error, Result};
44

@@ -43,12 +43,12 @@ impl RawNode<Follower> {
4343
id: NodeID,
4444
peers: HashSet<NodeID>,
4545
mut log: Log,
46+
state: Box<dyn State>,
4647
node_tx: mpsc::UnboundedSender<Message>,
47-
state_tx: mpsc::UnboundedSender<Instruction>,
4848
) -> Result<Self> {
4949
let (term, voted_for) = log.get_term()?;
5050
let role = Follower::new(None, voted_for);
51-
Ok(Self { id, peers, term, log, node_tx, state_tx, role })
51+
Ok(Self { id, peers, term, log, state, node_tx, role })
5252
}
5353

5454
/// Asserts internal invariants.
@@ -150,20 +150,24 @@ impl RawNode<Follower> {
150150
None => self = self.into_follower(Some(from), msg.term)?,
151151
}
152152

153-
// Advance commit index and apply entries if possible.
153+
// Respond to the heartbeat.
154154
let has_committed = self.log.has(commit_index, commit_term)?;
155+
self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?;
156+
157+
// Advance commit index.
155158
let (old_commit_index, _) = self.log.get_commit_index();
156159
if has_committed && commit_index > old_commit_index {
157160
self.log.commit(commit_index)?;
158-
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
161+
}
162+
163+
// Apply entries.
164+
let applied_index = self.state.get_applied_index();
165+
if applied_index < commit_index {
166+
let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
159167
while let Some(entry) = scan.next().transpose()? {
160-
self.state_tx.send(Instruction::Apply { entry })?;
168+
Self::apply(&mut self.state, entry)?.ok();
161169
}
162170
}
163-
self.send(
164-
msg.from,
165-
Event::ConfirmLeader { commit_index, has_committed, read_seq },
166-
)?;
167171
}
168172

169173
// Replicate entries from the leader. If we don't have a leader in
@@ -281,7 +285,9 @@ impl RawNode<Follower> {
281285
}
282286

283287
#[cfg(test)]
288+
#[cfg(never)] // TODO
284289
pub mod tests {
290+
use super::super::super::state::tests::TestState;
285291
use super::super::super::{Entry, Log, Request};
286292
use super::super::tests::{assert_messages, assert_node};
287293
use super::*;
@@ -624,7 +630,6 @@ pub mod tests {
624630
fn step_appendentries_base0() -> Result<()> {
625631
// TODO: Move this into a setup function.
626632
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
627-
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
628633
let mut log = Log::new(storage::engine::Memory::new(), false)?;
629634
log.append(1, Some(vec![0x01]))?;
630635
log.append(1, Some(vec![0x02]))?;
@@ -636,8 +641,8 @@ pub mod tests {
636641
peers: HashSet::from([2, 3, 4, 5]),
637642
term: 1,
638643
log,
644+
state: Box::new(TestState::new(0)),
639645
node_tx,
640-
state_tx,
641646
role: Follower::new(Some(2), None),
642647
};
643648

@@ -667,7 +672,6 @@ pub mod tests {
667672
event: Event::AcceptEntries { last_index: 2 },
668673
}],
669674
);
670-
assert_messages(&mut state_rx, vec![]);
671675
Ok(())
672676
}
673677

Diff for: src/raft/node/leader.rs

+38-43
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::super::{
2-
Address, Event, Index, Instruction, Message, ReadSequence, Request, RequestID, Status,
2+
Address, Event, Index, Message, ReadSequence, Request, RequestID, Response, Status,
33
};
44
use super::{Follower, Node, NodeID, RawNode, Role, Term, Ticks, HEARTBEAT_INTERVAL};
55
use crate::error::{Error, Result};
@@ -30,8 +30,6 @@ pub struct Leader {
3030
///
3131
/// If we lose leadership before the command is processed, all pending write
3232
/// requests are aborted by returning Error::Abort.
33-
///
34-
/// TODO: Actually return responses when applied.
3533
writes: HashMap<Index, (Address, RequestID)>,
3634
/// Keeps track of pending read requests. To guarantee linearizability, read
3735
/// requests are assigned a sequence number and registered here when
@@ -88,7 +86,6 @@ impl RawNode<Leader> {
8886
info!("Discovered new term {}", term);
8987

9088
// Cancel in-flight requests.
91-
self.state_tx.send(Instruction::Abort)?;
9289
for (address, id) in std::mem::take(&mut self.role.writes).into_values() {
9390
self.send(address, Event::ClientResponse { id, response: Err(Error::Abort) })?;
9491
}
@@ -129,16 +126,10 @@ impl RawNode<Leader> {
129126
// are its leader. If it doesn't have the commit index in its local
130127
// log, replicate the log to it. If the peer's read sequence number
131128
// increased, process any pending reads.
132-
Event::ConfirmLeader { commit_index, has_committed, read_seq } => {
129+
Event::ConfirmLeader { has_committed, read_seq } => {
133130
assert!(read_seq <= self.role.read_seq, "Future read sequence number");
134131

135132
let from = msg.from.unwrap();
136-
self.state_tx.send(Instruction::Vote {
137-
term: msg.term,
138-
index: commit_index,
139-
address: msg.from,
140-
})?;
141-
142133
let mut read_seq_advanced = false;
143134
self.role.progress.entry(from).and_modify(|p| {
144135
if read_seq > p.read_seq {
@@ -167,7 +158,7 @@ impl RawNode<Leader> {
167158
p.last = last_index;
168159
p.next = last_index + 1;
169160
});
170-
self.maybe_commit()?;
161+
self.maybe_commit_and_apply()?;
171162
}
172163

173164
// A follower rejected log entries we sent it, typically because it
@@ -198,20 +189,6 @@ impl RawNode<Leader> {
198189
id.clone(),
199190
command.clone(),
200191
));
201-
let (commit_index, _) = self.log.get_commit_index();
202-
self.state_tx.send(Instruction::Query {
203-
id,
204-
address: msg.from,
205-
command,
206-
term: self.term,
207-
index: commit_index,
208-
quorum: self.quorum(),
209-
})?;
210-
self.state_tx.send(Instruction::Vote {
211-
term: self.term,
212-
index: commit_index,
213-
address: Address::Node(self.id),
214-
})?;
215192
if self.peers.is_empty() {
216193
self.maybe_read()?;
217194
}
@@ -223,15 +200,14 @@ impl RawNode<Leader> {
223200
Event::ClientRequest { id, request: Request::Mutate(command) } => {
224201
let index = self.propose(Some(command))?;
225202
self.role.writes.insert(index, (msg.from, id.clone()));
226-
self.state_tx.send(Instruction::Notify { id, address: msg.from, index })?;
227203
if self.peers.is_empty() {
228-
self.maybe_commit()?;
204+
self.maybe_commit_and_apply()?;
229205
}
230206
}
231207

232208
Event::ClientRequest { id, request: Request::Status } => {
233209
let engine_status = self.log.status()?;
234-
let status = Box::new(Status {
210+
let status = Status {
235211
server: self.id,
236212
leader: self.id,
237213
term: self.term,
@@ -243,11 +219,14 @@ impl RawNode<Leader> {
243219
.chain(std::iter::once((self.id, self.log.get_last_index().0)))
244220
.collect(),
245221
commit_index: self.log.get_commit_index().0,
246-
apply_index: 0,
222+
apply_index: self.state.get_applied_index(),
247223
storage: engine_status.name.clone(),
248224
storage_size: engine_status.size,
249-
});
250-
self.state_tx.send(Instruction::Status { id, address: msg.from, status })?
225+
};
226+
self.send(
227+
msg.from,
228+
Event::ClientResponse { id, response: Ok(Response::Status(status)) },
229+
)?;
251230
}
252231

253232
// Votes can come in after we won the election, ignore them.
@@ -294,9 +273,9 @@ impl RawNode<Leader> {
294273
Ok(index)
295274
}
296275

297-
/// Commits any new log entries that have been replicated to a quorum,
298-
/// and schedules them for state machine application.
299-
fn maybe_commit(&mut self) -> Result<Index> {
276+
/// Commits any new log entries that have been replicated to a quorum, and
277+
/// applies them to the state machine.
278+
fn maybe_commit_and_apply(&mut self) -> Result<Index> {
300279
// Determine the new commit index, i.e. the last index replicated to a
301280
// quorum of peers.
302281
let mut last_indexes = self
@@ -332,15 +311,28 @@ impl RawNode<Leader> {
332311
None => panic!("Commit index {} missing", commit_index),
333312
};
334313

335-
// Commit and apply the new entries.
314+
// Commit the new entries.
336315
if commit_index > prev_commit_index {
337316
self.log.commit(commit_index)?;
338-
// TODO: Move application elsewhere, but needs access to applied index.
339-
let mut scan = self.log.scan((prev_commit_index + 1)..=commit_index)?;
317+
}
318+
319+
// Apply entries.
320+
let applied_index = self.state.get_applied_index();
321+
if applied_index < commit_index {
322+
let mut scan = self.log.scan((applied_index + 1)..=commit_index)?;
340323
while let Some(entry) = scan.next().transpose()? {
341-
// TODO: Send response.
342-
self.role.writes.remove(&entry.index);
343-
self.state_tx.send(Instruction::Apply { entry })?;
324+
let index = entry.index;
325+
let result = Self::apply(&mut self.state, entry)?;
326+
if let Some((from, id)) = self.role.writes.remove(&index) {
327+
let response = result.map(Response::Mutate);
328+
// TODO: use self.send() or something.
329+
self.node_tx.send(Message {
330+
from: Address::Node(self.id),
331+
to: from,
332+
term: self.term,
333+
event: Event::ClientResponse { id, response },
334+
})?;
335+
}
344336
}
345337
}
346338
Ok(commit_index)
@@ -366,8 +358,10 @@ impl RawNode<Leader> {
366358
if *rs > read_seq {
367359
break;
368360
}
369-
self.role.reads.pop_front().unwrap();
370-
// TODO: Actually execute the reads.
361+
let (_, from, id, command) = self.role.reads.pop_front().unwrap();
362+
let result = self.state.query(command);
363+
let response = result.map(Response::Query);
364+
self.send(from, Event::ClientResponse { id, response })?;
371365
}
372366

373367
Ok(read_seq)
@@ -392,6 +386,7 @@ impl RawNode<Leader> {
392386
}
393387

394388
#[cfg(test)]
389+
#[cfg(never)] // TODO
395390
mod tests {
396391
use super::super::super::{Entry, Log};
397392
use super::super::tests::{assert_messages, assert_node};

0 commit comments

Comments
 (0)