Skip to content

Commit a991265

Browse files
committed
wip
1 parent 340c0d2 commit a991265

File tree

7 files changed

+126
-605
lines changed

7 files changed

+126
-605
lines changed

src/raft/message.rs

-5
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,6 @@ pub enum Event {
5454
},
5555
/// Followers confirm loyalty to leader after heartbeats.
5656
ConfirmLeader {
57-
/// The commit_index of the original leader heartbeat, to confirm
58-
/// read requests.
59-
///
60-
/// TODO: remove this when migrated to read_seq.
61-
commit_index: Index,
6257
/// If false, the follower does not have the entry at commit_index
6358
/// and would like the leader to replicate it.
6459
has_committed: bool,

src/raft/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -8,4 +8,4 @@ pub use self::log::{Entry, Index, Log};
88
pub use message::{Address, Event, Message, ReadSequence, Request, RequestID, Response};
99
pub use node::{Node, NodeID, Status, Term};
1010
pub use server::Server;
11-
pub use state::{Driver, Instruction, State};
11+
pub use state::State;

src/raft/node/candidate.rs

+15-24
Original file line numberDiff line numberDiff line change
@@ -168,21 +168,19 @@ impl RawNode<Candidate> {
168168

169169
#[cfg(test)]
170170
mod tests {
171-
use super::super::super::{Entry, Instruction, Log, Request};
171+
use super::super::super::state::tests::TestState;
172+
use super::super::super::{Entry, Log, Request};
172173
use super::super::tests::{assert_messages, assert_node};
173174
use super::*;
174175
use crate::storage;
175176
use tokio::sync::mpsc;
176177

177178
#[allow(clippy::type_complexity)]
178-
fn setup() -> Result<(
179-
RawNode<Candidate>,
180-
mpsc::UnboundedReceiver<Message>,
181-
mpsc::UnboundedReceiver<Instruction>,
182-
)> {
179+
fn setup() -> Result<(RawNode<Candidate>, mpsc::UnboundedReceiver<Message>)> {
183180
let (node_tx, node_rx) = mpsc::unbounded_channel();
184-
let (state_tx, state_rx) = mpsc::unbounded_channel();
181+
let state = Box::new(TestState::new(0));
185182
let mut log = Log::new(storage::engine::Memory::new(), false)?;
183+
186184
log.append(1, Some(vec![0x01]))?;
187185
log.append(1, Some(vec![0x02]))?;
188186
log.append(2, Some(vec![0x03]))?;
@@ -194,18 +192,18 @@ mod tests {
194192
peers: HashSet::from([2, 3, 4, 5]),
195193
term: 3,
196194
log,
195+
state,
197196
node_tx,
198-
state_tx,
199197
role: Candidate::new(),
200198
};
201199
node.role.votes.insert(1);
202-
Ok((node, node_rx, state_rx))
200+
Ok((node, node_rx))
203201
}
204202

205203
#[test]
206204
// Heartbeat for current term converts to follower and emits ConfirmLeader.
207205
fn step_heartbeat_current_term() -> Result<()> {
208-
let (candidate, mut node_rx, mut state_rx) = setup()?;
206+
let (candidate, mut node_rx) = setup()?;
209207
let mut node = candidate.step(Message {
210208
from: Address::Node(2),
211209
to: Address::Node(1),
@@ -219,18 +217,17 @@ mod tests {
219217
from: Address::Node(1),
220218
to: Address::Node(2),
221219
term: 3,
222-
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
220+
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
223221
}],
224222
);
225-
assert_messages(&mut state_rx, vec![]);
226223
Ok(())
227224
}
228225

229226
#[test]
230227
// Heartbeat for future term converts to follower and emits ConfirmLeader
231228
// event.
232229
fn step_heartbeat_future_term() -> Result<()> {
233-
let (candidate, mut node_rx, mut state_rx) = setup()?;
230+
let (candidate, mut node_rx) = setup()?;
234231
let mut node = candidate.step(Message {
235232
from: Address::Node(2),
236233
to: Address::Node(1),
@@ -244,17 +241,16 @@ mod tests {
244241
from: Address::Node(1),
245242
to: Address::Node(2),
246243
term: 4,
247-
event: Event::ConfirmLeader { commit_index: 2, has_committed: true, read_seq: 7 },
244+
event: Event::ConfirmLeader { has_committed: true, read_seq: 7 },
248245
}],
249246
);
250-
assert_messages(&mut state_rx, vec![]);
251247
Ok(())
252248
}
253249

254250
#[test]
255251
// Heartbeat for past term is ignored
256252
fn step_heartbeat_past_term() -> Result<()> {
257-
let (candidate, mut node_rx, mut state_rx) = setup()?;
253+
let (candidate, mut node_rx) = setup()?;
258254
let mut node = candidate.step(Message {
259255
from: Address::Node(2),
260256
to: Address::Node(1),
@@ -263,13 +259,12 @@ mod tests {
263259
})?;
264260
assert_node(&mut node).is_candidate().term(3);
265261
assert_messages(&mut node_rx, vec![]);
266-
assert_messages(&mut state_rx, vec![]);
267262
Ok(())
268263
}
269264

270265
#[test]
271266
fn step_grantvote() -> Result<()> {
272-
let (candidate, mut node_rx, mut state_rx) = setup()?;
267+
let (candidate, mut node_rx) = setup()?;
273268
let peers = candidate.peers.clone();
274269
let mut node = Node::Candidate(candidate);
275270

@@ -282,7 +277,6 @@ mod tests {
282277
})?;
283278
assert_node(&mut node).is_candidate().term(3);
284279
assert_messages(&mut node_rx, vec![]);
285-
assert_messages(&mut state_rx, vec![]);
286280

287281
// However, the second external vote makes us leader
288282
node = node.step(Message {
@@ -320,14 +314,13 @@ mod tests {
320314
}
321315

322316
assert_messages(&mut node_rx, vec![]);
323-
assert_messages(&mut state_rx, vec![]);
324317
Ok(())
325318
}
326319

327320
#[test]
328321
// ClientRequest returns Error::Abort.
329322
fn step_clientrequest() -> Result<()> {
330-
let (candidate, mut node_rx, mut state_rx) = setup()?;
323+
let (candidate, mut node_rx) = setup()?;
331324
let mut node = Node::Candidate(candidate);
332325

333326
node = node.step(Message {
@@ -346,13 +339,12 @@ mod tests {
346339
event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) },
347340
}],
348341
);
349-
assert_messages(&mut state_rx, vec![]);
350342
Ok(())
351343
}
352344

353345
#[test]
354346
fn tick() -> Result<()> {
355-
let (candidate, mut node_rx, mut state_rx) = setup()?;
347+
let (candidate, mut node_rx) = setup()?;
356348
let timeout = candidate.role.election_timeout;
357349
let mut node = Node::Candidate(candidate);
358350

@@ -372,7 +364,6 @@ mod tests {
372364
event: Event::SolicitVote { last_index: 3, last_term: 2 },
373365
}],
374366
);
375-
assert_messages(&mut state_rx, vec![]);
376367
Ok(())
377368
}
378369
}

src/raft/node/follower.rs

+19-27
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use super::super::{Address, Event, Instruction, Log, Message, RequestID, Response};
1+
use super::super::{Address, Event, Log, Message, RequestID, Response, State};
22
use super::{rand_election_timeout, Candidate, Node, NodeID, RawNode, Role, Term, Ticks};
33
use crate::error::{Error, Result};
44

@@ -10,13 +10,13 @@ use tokio::sync::mpsc;
1010
#[derive(Clone, Debug, PartialEq)]
1111
pub struct Follower {
1212
/// The leader, or None if just initialized.
13-
leader: Option<NodeID>,
13+
pub(super) leader: Option<NodeID>,
1414
/// The number of ticks since the last message from the leader.
1515
leader_seen: Ticks,
1616
/// The leader_seen timeout before triggering an election.
1717
election_timeout: Ticks,
1818
/// The node we voted for in the current term, if any.
19-
voted_for: Option<NodeID>,
19+
pub(super) voted_for: Option<NodeID>,
2020
// Local client requests that have been forwarded to the leader. These are
2121
// aborted on leader/term changes.
2222
pub(super) forwarded: HashSet<RequestID>,
@@ -43,12 +43,12 @@ impl RawNode<Follower> {
4343
id: NodeID,
4444
peers: HashSet<NodeID>,
4545
mut log: Log,
46+
state: Box<dyn State>,
4647
node_tx: mpsc::UnboundedSender<Message>,
47-
state_tx: mpsc::UnboundedSender<Instruction>,
4848
) -> Result<Self> {
4949
let (term, voted_for) = log.get_term()?;
5050
let role = Follower::new(None, voted_for);
51-
Ok(Self { id, peers, term, log, node_tx, state_tx, role })
51+
Ok(Self { id, peers, term, log, state, node_tx, role })
5252
}
5353

5454
/// Asserts internal invariants.
@@ -79,6 +79,9 @@ impl RawNode<Follower> {
7979
// Abort any forwarded requests. These must be retried with new leader.
8080
self.abort_forwarded()?;
8181

82+
// Apply any pending log entries, so that we're caught up if we win.
83+
self.maybe_apply()?;
84+
8285
let mut node = self.into_role(Candidate::new());
8386
node.campaign()?;
8487
Ok(node)
@@ -150,20 +153,17 @@ impl RawNode<Follower> {
150153
None => self = self.into_follower(Some(from), msg.term)?,
151154
}
152155

153-
// Advance commit index and apply entries if possible.
156+
// Respond to the heartbeat.
157+
//
158+
// TODO: this should return the last index and term.
154159
let has_committed = self.log.has(commit_index, commit_term)?;
155-
let (old_commit_index, _) = self.log.get_commit_index();
156-
if has_committed && commit_index > old_commit_index {
160+
self.send(msg.from, Event::ConfirmLeader { has_committed, read_seq })?;
161+
162+
// Advance commit index and apply entries.
163+
if has_committed && commit_index > self.log.get_commit_index().0 {
157164
self.log.commit(commit_index)?;
158-
let mut scan = self.log.scan((old_commit_index + 1)..=commit_index)?;
159-
while let Some(entry) = scan.next().transpose()? {
160-
self.state_tx.send(Instruction::Apply { entry })?;
161-
}
165+
self.maybe_apply()?;
162166
}
163-
self.send(
164-
msg.from,
165-
Event::ConfirmLeader { commit_index, has_committed, read_seq },
166-
)?;
167167
}
168168

169169
// Replicate entries from the leader. If we don't have a leader in
@@ -281,22 +281,16 @@ impl RawNode<Follower> {
281281
}
282282

283283
#[cfg(test)]
284+
#[cfg(never)] // TODO
284285
pub mod tests {
286+
use super::super::super::state::tests::TestState;
285287
use super::super::super::{Entry, Log, Request};
286288
use super::super::tests::{assert_messages, assert_node};
287289
use super::*;
288290
use crate::error::Error;
289291
use crate::storage;
290292
use tokio::sync::mpsc;
291293

292-
pub fn follower_leader(node: &RawNode<Follower>) -> Option<NodeID> {
293-
node.role.leader
294-
}
295-
296-
pub fn follower_voted_for(node: &RawNode<Follower>) -> Option<NodeID> {
297-
node.role.voted_for
298-
}
299-
300294
#[allow(clippy::type_complexity)]
301295
fn setup() -> Result<(
302296
RawNode<Follower>,
@@ -624,7 +618,6 @@ pub mod tests {
624618
fn step_appendentries_base0() -> Result<()> {
625619
// TODO: Move this into a setup function.
626620
let (node_tx, mut node_rx) = mpsc::unbounded_channel();
627-
let (state_tx, mut state_rx) = mpsc::unbounded_channel();
628621
let mut log = Log::new(storage::engine::Memory::new(), false)?;
629622
log.append(1, Some(vec![0x01]))?;
630623
log.append(1, Some(vec![0x02]))?;
@@ -636,8 +629,8 @@ pub mod tests {
636629
peers: HashSet::from([2, 3, 4, 5]),
637630
term: 1,
638631
log,
632+
state: Box::new(TestState::new(0)),
639633
node_tx,
640-
state_tx,
641634
role: Follower::new(Some(2), None),
642635
};
643636

@@ -667,7 +660,6 @@ pub mod tests {
667660
event: Event::AcceptEntries { last_index: 2 },
668661
}],
669662
);
670-
assert_messages(&mut state_rx, vec![]);
671663
Ok(())
672664
}
673665

0 commit comments

Comments
 (0)