Skip to content

Commit 26ede8c

Browse files
committed
raft: replace Node with BoxedNode
1 parent ac805b6 commit 26ede8c

File tree

4 files changed

+224
-296
lines changed

4 files changed

+224
-296
lines changed

src/raft/node/candidate.rs

+39-32
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::super::{Address, Event, Message};
2-
use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RoleNode, Term, Ticks};
2+
use super::{
3+
rand_election_timeout, BoxedNode, DynNode, Follower, Leader, NodeID, RoleNode, Term, Ticks,
4+
};
35
use crate::error::{Error, Result};
46

57
use ::log::{debug, info, warn};
@@ -81,22 +83,42 @@ impl RoleNode<Candidate> {
8183
Ok(node)
8284
}
8385

84-
/// Processes a message.
85-
pub fn step(mut self, msg: Message) -> Result<Node> {
86+
/// Campaign for leadership by increasing the term, voting for ourself, and
87+
/// soliciting votes from all peers.
88+
pub(super) fn campaign(&mut self) -> Result<()> {
89+
let term = self.term + 1;
90+
info!("Starting new election for term {}", term);
91+
self.role = Candidate::new();
92+
self.role.votes.insert(self.id); // vote for ourself
93+
self.term = term;
94+
self.log.set_term(term, Some(self.id))?;
95+
96+
let (last_index, last_term) = self.log.get_last_index();
97+
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
98+
Ok(())
99+
}
100+
}
101+
102+
impl DynNode for RoleNode<Candidate> {
103+
fn id(&self) -> NodeID {
104+
self.id
105+
}
106+
107+
fn step(mut self: Box<Self>, msg: Message) -> Result<BoxedNode> {
86108
self.assert()?;
87109
self.assert_step(&msg);
88110

89111
// Drop messages from past terms.
90112
if msg.term < self.term && msg.term > 0 {
91113
debug!("Dropping message from past term ({:?})", msg);
92-
return Ok(self.into());
114+
return Ok(self);
93115
}
94116

95117
// If we receive a message for a future term, become a leaderless
96118
// follower in it and step the message. If the message is a Heartbeat or
97119
// AppendEntries from the leader, stepping it will follow the leader.
98120
if msg.term > self.term {
99-
return self.become_follower(msg.term, None)?.step(msg);
121+
return Box::new(self.become_follower(msg.term, None)?).step(msg);
100122
}
101123

102124
match msg.event {
@@ -108,14 +130,15 @@ impl RoleNode<Candidate> {
108130
Event::GrantVote => {
109131
self.role.votes.insert(msg.from.unwrap());
110132
if self.role.votes.len() as u64 >= self.quorum() {
111-
return Ok(self.become_leader()?.into());
133+
return Ok(Box::new(self.become_leader()?));
112134
}
113135
}
114136

115137
// If we receive a heartbeat or entries in this term, we lost the
116138
// election and have a new leader. Follow it and step the message.
117139
Event::Heartbeat { .. } | Event::AppendEntries { .. } => {
118-
return self.become_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
140+
return Box::new(self.become_follower(msg.term, Some(msg.from.unwrap()))?)
141+
.step(msg);
119142
}
120143

121144
// Abort any inbound client requests while candidate.
@@ -130,33 +153,17 @@ impl RoleNode<Candidate> {
130153
| Event::RejectEntries { .. }
131154
| Event::ClientResponse { .. } => warn!("Received unexpected message {:?}", msg),
132155
}
133-
Ok(self.into())
156+
Ok(self)
134157
}
135158

136-
/// Processes a logical clock tick.
137-
pub fn tick(mut self) -> Result<Node> {
159+
fn tick(mut self: Box<Self>) -> Result<BoxedNode> {
138160
self.assert()?;
139161

140162
self.role.election_duration += 1;
141163
if self.role.election_duration >= self.role.election_timeout {
142164
self.campaign()?;
143165
}
144-
Ok(self.into())
145-
}
146-
147-
/// Campaign for leadership by increasing the term, voting for ourself, and
148-
/// soliciting votes from all peers.
149-
pub(super) fn campaign(&mut self) -> Result<()> {
150-
let term = self.term + 1;
151-
info!("Starting new election for term {}", term);
152-
self.role = Candidate::new();
153-
self.role.votes.insert(self.id); // vote for ourself
154-
self.term = term;
155-
self.log.set_term(term, Some(self.id))?;
156-
157-
let (last_index, last_term) = self.log.get_last_index();
158-
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
159-
Ok(())
166+
Ok(self)
160167
}
161168
}
162169

@@ -170,7 +177,7 @@ mod tests {
170177

171178
#[allow(clippy::type_complexity)]
172179
fn setup() -> Result<(
173-
RoleNode<Candidate>,
180+
Box<RoleNode<Candidate>>,
174181
mpsc::UnboundedReceiver<Message>,
175182
mpsc::UnboundedReceiver<Instruction>,
176183
)> {
@@ -193,14 +200,14 @@ mod tests {
193200
role: Candidate::new(),
194201
};
195202
node.role.votes.insert(1);
196-
Ok((node, node_rx, state_rx))
203+
Ok((Box::new(node), node_rx, state_rx))
197204
}
198205

199206
#[test]
200207
// Heartbeat for current term converts to follower and emits ConfirmLeader.
201208
fn step_heartbeat_current_term() -> Result<()> {
202209
let (candidate, mut node_rx, mut state_rx) = setup()?;
203-
let mut node = candidate.step(Message {
210+
let mut node = Box::new(candidate).step(Message {
204211
from: Address::Node(2),
205212
to: Address::Node(1),
206213
term: 3,
@@ -265,7 +272,7 @@ mod tests {
265272
fn step_grantvote() -> Result<()> {
266273
let (candidate, mut node_rx, mut state_rx) = setup()?;
267274
let peers = candidate.peers.clone();
268-
let mut node = Node::Candidate(candidate);
275+
let mut node: BoxedNode = candidate;
269276

270277
// The first vote is not sufficient for a quorum (3 votes including self)
271278
node = node.step(Message {
@@ -322,7 +329,7 @@ mod tests {
322329
// ClientRequest returns Error::Abort.
323330
fn step_clientrequest() -> Result<()> {
324331
let (candidate, mut node_rx, mut state_rx) = setup()?;
325-
let mut node = Node::Candidate(candidate);
332+
let mut node: BoxedNode = candidate;
326333

327334
node = node.step(Message {
328335
from: Address::Client,
@@ -348,7 +355,7 @@ mod tests {
348355
fn tick() -> Result<()> {
349356
let (candidate, mut node_rx, mut state_rx) = setup()?;
350357
let timeout = candidate.role.election_timeout;
351-
let mut node = Node::Candidate(candidate);
358+
let mut node: BoxedNode = candidate;
352359

353360
assert!(timeout > 0);
354361
for _ in 0..timeout {

src/raft/node/follower.rs

+45-41
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::super::{Address, Event, Instruction, Message, RequestID, Response};
2-
use super::{rand_election_timeout, Candidate, Node, NodeID, RoleNode, Term, Ticks};
2+
use super::{rand_election_timeout, BoxedNode, Candidate, DynNode, NodeID, RoleNode, Term, Ticks};
33
use crate::error::{Error, Result};
44

55
use ::log::{debug, error, info, warn};
@@ -94,22 +94,46 @@ impl RoleNode<Follower> {
9494
Ok(self)
9595
}
9696

97-
/// Processes a message.
98-
pub fn step(mut self, msg: Message) -> Result<Node> {
97+
/// Aborts all forwarded requests.
98+
fn abort_forwarded(&mut self) -> Result<()> {
99+
for id in std::mem::take(&mut self.role.forwarded) {
100+
debug!("Aborting forwarded request {:x?}", id);
101+
self.send(Address::Client, Event::ClientResponse { id, response: Err(Error::Abort) })?;
102+
}
103+
Ok(())
104+
}
105+
106+
/// Checks if an address is the current leader.
107+
fn is_leader(&self, from: &Address) -> bool {
108+
if let Some(leader) = &self.role.leader {
109+
if let Address::Node(from) = from {
110+
return leader == from;
111+
}
112+
}
113+
false
114+
}
115+
}
116+
117+
impl DynNode for RoleNode<Follower> {
118+
fn id(&self) -> NodeID {
119+
self.id
120+
}
121+
122+
fn step(mut self: Box<Self>, msg: Message) -> Result<BoxedNode> {
99123
self.assert()?;
100124
self.assert_step(&msg);
101125

102126
// Drop messages from past terms.
103127
if msg.term < self.term && msg.term > 0 {
104128
debug!("Dropping message from past term ({:?})", msg);
105-
return Ok(self.into());
129+
return Ok(self);
106130
}
107131

108132
// If we receive a message for a future term, become a leaderless
109133
// follower in it and step the message. If the message is a Heartbeat or
110134
// AppendEntries from the leader, stepping it will follow the leader.
111135
if msg.term > self.term {
112-
return self.become_follower(None, msg.term)?.step(msg);
136+
return Box::new(self.become_follower(None, msg.term)?).step(msg);
113137
}
114138

115139
// Record when we last saw a message from the leader (if any).
@@ -126,7 +150,7 @@ impl RoleNode<Follower> {
126150
let from = msg.from.unwrap();
127151
match self.role.leader {
128152
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
129-
None => self = self.become_follower(Some(from), msg.term)?,
153+
None => self = Box::new(self.become_follower(Some(from), msg.term)?),
130154
}
131155

132156
// Advance commit index and apply entries if possible.
@@ -149,7 +173,7 @@ impl RoleNode<Follower> {
149173
let from = msg.from.unwrap();
150174
match self.role.leader {
151175
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
152-
None => self = self.become_follower(Some(from), msg.term)?,
176+
None => self = Box::new(self.become_follower(Some(from), msg.term)?),
153177
}
154178

155179
// Append the entries, if possible.
@@ -169,7 +193,7 @@ impl RoleNode<Follower> {
169193
// If we already voted for someone else in this term, ignore it.
170194
if let Some(voted_for) = self.role.voted_for {
171195
if from != voted_for {
172-
return Ok(self.into());
196+
return Ok(self);
173197
}
174198
}
175199

@@ -189,7 +213,7 @@ impl RoleNode<Follower> {
189213
Event::ClientRequest { ref id, .. } => {
190214
if msg.from != Address::Client {
191215
error!("Received client request from non-client {:?}", msg.from);
192-
return Ok(self.into());
216+
return Ok(self);
193217
}
194218

195219
let id = id.clone();
@@ -206,7 +230,7 @@ impl RoleNode<Follower> {
206230
Event::ClientResponse { id, mut response } => {
207231
if !self.is_leader(&msg.from) {
208232
error!("Received client response from non-leader {:?}", msg.from);
209-
return Ok(self.into());
233+
return Ok(self);
210234
}
211235

212236
// TODO: Get rid of this field, it should be returned at the RPC
@@ -225,37 +249,17 @@ impl RoleNode<Follower> {
225249
| Event::RejectEntries { .. }
226250
| Event::GrantVote { .. } => warn!("Received unexpected message {:?}", msg),
227251
};
228-
Ok(self.into())
252+
Ok(self)
229253
}
230254

231-
/// Processes a logical clock tick.
232-
pub fn tick(mut self) -> Result<Node> {
255+
fn tick(mut self: Box<Self>) -> Result<BoxedNode> {
233256
self.assert()?;
234257

235258
self.role.leader_seen += 1;
236259
if self.role.leader_seen >= self.role.election_timeout {
237-
return Ok(self.become_candidate()?.into());
238-
}
239-
Ok(self.into())
240-
}
241-
242-
/// Aborts all forwarded requests.
243-
fn abort_forwarded(&mut self) -> Result<()> {
244-
for id in std::mem::take(&mut self.role.forwarded) {
245-
debug!("Aborting forwarded request {:x?}", id);
246-
self.send(Address::Client, Event::ClientResponse { id, response: Err(Error::Abort) })?;
247-
}
248-
Ok(())
249-
}
250-
251-
/// Checks if an address is the current leader.
252-
fn is_leader(&self, from: &Address) -> bool {
253-
if let Some(leader) = &self.role.leader {
254-
if let Address::Node(from) = from {
255-
return leader == from;
256-
}
260+
return Ok(Box::new(self.become_candidate()?));
257261
}
258-
false
262+
Ok(self)
259263
}
260264
}
261265

@@ -278,7 +282,7 @@ pub mod tests {
278282

279283
#[allow(clippy::type_complexity)]
280284
fn setup() -> Result<(
281-
RoleNode<Follower>,
285+
Box<RoleNode<Follower>>,
282286
mpsc::UnboundedReceiver<Message>,
283287
mpsc::UnboundedReceiver<Instruction>,
284288
)> {
@@ -300,7 +304,7 @@ pub mod tests {
300304
state_tx,
301305
role: Follower::new(Some(2), None),
302306
};
303-
Ok((node, node_rx, state_rx))
307+
Ok((Box::new(node), node_rx, state_rx))
304308
}
305309

306310
#[test]
@@ -620,7 +624,7 @@ pub mod tests {
620624
role: Follower::new(Some(2), None),
621625
};
622626

623-
let mut node = follower.step(Message {
627+
let mut node = Box::new(follower).step(Message {
624628
from: Address::Node(2),
625629
to: Address::Node(1),
626630
term: 3,
@@ -863,7 +867,7 @@ pub mod tests {
863867
// ClientRequest is forwarded, as is the response.
864868
fn step_clientrequest_clientresponse() -> Result<()> {
865869
let (follower, mut node_rx, mut state_rx) = setup()?;
866-
let mut node = Node::Follower(follower);
870+
let mut node: BoxedNode = follower;
867871

868872
node = node.step(Message {
869873
from: Address::Client,
@@ -917,7 +921,7 @@ pub mod tests {
917921
fn step_clientrequest_no_leader() -> Result<()> {
918922
let (mut follower, mut node_rx, mut state_rx) = setup()?;
919923
follower.role = Follower::new(None, None);
920-
let mut node = Node::Follower(follower);
924+
let mut node: BoxedNode = follower;
921925

922926
node = node.step(Message {
923927
from: Address::Client,
@@ -943,7 +947,7 @@ pub mod tests {
943947
#[test]
944948
fn step_clientrequest_aborted() -> Result<()> {
945949
let (follower, mut node_rx, mut state_rx) = setup()?;
946-
let mut node = Node::Follower(follower);
950+
let mut node: BoxedNode = follower;
947951

948952
node = node.step(Message {
949953
from: Address::Client,
@@ -1004,7 +1008,7 @@ pub mod tests {
10041008
fn tick() -> Result<()> {
10051009
let (follower, mut node_rx, mut state_rx) = setup()?;
10061010
let timeout = follower.role.election_timeout;
1007-
let mut node = Node::Follower(follower);
1011+
let mut node: BoxedNode = follower;
10081012

10091013
// Make sure heartbeats reset election timeout
10101014
assert!(timeout > 0);

0 commit comments

Comments
 (0)