Skip to content

Commit 2faf4e4

Browse files
committed
raft: replace Node with BoxedNode
1 parent 00e803b commit 2faf4e4

File tree

4 files changed

+195
-311
lines changed

4 files changed

+195
-311
lines changed

src/raft/node/candidate.rs

+33-26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
use super::super::{Address, Event, Message};
2-
use super::{rand_election_timeout, Follower, Leader, Node, NodeID, RoleNode, Term, Ticks};
2+
use super::{
3+
rand_election_timeout, BoxedNode, DynNode, Follower, Leader, NodeID, RoleNode, Term, Ticks,
4+
};
35
use crate::error::{Error, Result};
46

57
use ::log::{debug, info, warn};
@@ -81,22 +83,42 @@ impl RoleNode<Candidate> {
8183
Ok(node)
8284
}
8385

84-
/// Processes a message.
85-
pub fn step(mut self, msg: Message) -> Result<Node> {
86+
/// Campaign for leadership by increasing the term, voting for ourself, and
87+
/// soliciting votes from all peers.
88+
pub(super) fn campaign(&mut self) -> Result<()> {
89+
let term = self.term + 1;
90+
info!("Starting new election for term {}", term);
91+
self.role = Candidate::new();
92+
self.role.votes.insert(self.id); // vote for ourself
93+
self.term = term;
94+
self.log.set_term(term, Some(self.id))?;
95+
96+
let (last_index, last_term) = self.log.get_last_index();
97+
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
98+
Ok(())
99+
}
100+
}
101+
102+
impl DynNode for RoleNode<Candidate> {
103+
fn id(&self) -> NodeID {
104+
self.id
105+
}
106+
107+
fn step(mut self: Box<Self>, msg: Message) -> Result<BoxedNode> {
86108
self.assert()?;
87109
self.assert_step(&msg);
88110

89111
// Drop messages from past terms.
90112
if msg.term < self.term && msg.term > 0 {
91113
debug!("Dropping message from past term ({:?})", msg);
92-
return Ok(self.into());
114+
return Ok(self);
93115
}
94116

95117
// If we receive a message for a future term, become a leaderless
96118
// follower in it and step the message. If the message is a Heartbeat or
97119
// AppendEntries from the leader, stepping it will follow the leader.
98120
if msg.term > self.term {
99-
return self.become_follower(msg.term, None)?.step(msg);
121+
return Box::new(self.become_follower(msg.term, None)?).step(msg);
100122
}
101123

102124
match msg.event {
@@ -108,14 +130,15 @@ impl RoleNode<Candidate> {
108130
Event::GrantVote => {
109131
self.role.votes.insert(msg.from.unwrap());
110132
if self.role.votes.len() as u64 >= self.quorum() {
111-
return Ok(self.become_leader()?.into());
133+
return Ok(Box::new(self.become_leader()?));
112134
}
113135
}
114136

115137
// If we receive a heartbeat or entries in this term, we lost the
116138
// election and have a new leader. Follow it and step the message.
117139
Event::Heartbeat { .. } | Event::AppendEntries { .. } => {
118-
return self.become_follower(msg.term, Some(msg.from.unwrap()))?.step(msg);
140+
return Box::new(self.become_follower(msg.term, Some(msg.from.unwrap()))?)
141+
.step(msg);
119142
}
120143

121144
// Abort any inbound client requests while candidate.
@@ -130,32 +153,16 @@ impl RoleNode<Candidate> {
130153
| Event::RejectEntries { .. }
131154
| Event::ClientResponse { .. } => warn!("Received unexpected message {:?}", msg),
132155
}
133-
Ok(self.into())
156+
Ok(self)
134157
}
135158

136-
/// Processes a logical clock tick.
137-
pub fn tick(mut self) -> Result<Node> {
159+
fn tick(mut self: Box<Self>) -> Result<BoxedNode> {
138160
self.assert()?;
139161

140162
self.role.election_duration += 1;
141163
if self.role.election_duration >= self.role.election_timeout {
142164
self.campaign()?;
143165
}
144-
Ok(self.into())
145-
}
146-
147-
/// Campaign for leadership by increasing the term, voting for ourself, and
148-
/// soliciting votes from all peers.
149-
pub(super) fn campaign(&mut self) -> Result<()> {
150-
let term = self.term + 1;
151-
info!("Starting new election for term {}", term);
152-
self.role = Candidate::new();
153-
self.role.votes.insert(self.id); // vote for ourself
154-
self.term = term;
155-
self.log.set_term(term, Some(self.id))?;
156-
157-
let (last_index, last_term) = self.log.get_last_index();
158-
self.send(Address::Broadcast, Event::SolicitVote { last_index, last_term })?;
159-
Ok(())
166+
Ok(self)
160167
}
161168
}

src/raft/node/follower.rs

+38-34
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use super::super::{Address, Event, Instruction, Message, RequestID, Response};
2-
use super::{rand_election_timeout, Candidate, Node, NodeID, RoleNode, Term, Ticks};
2+
use super::{rand_election_timeout, BoxedNode, Candidate, DynNode, NodeID, RoleNode, Term, Ticks};
33
use crate::error::{Error, Result};
44

55
use ::log::{debug, error, info, warn};
@@ -94,22 +94,46 @@ impl RoleNode<Follower> {
9494
Ok(self)
9595
}
9696

97-
/// Processes a message.
98-
pub fn step(mut self, msg: Message) -> Result<Node> {
97+
/// Aborts all forwarded requests.
98+
fn abort_forwarded(&mut self) -> Result<()> {
99+
for id in std::mem::take(&mut self.role.forwarded) {
100+
debug!("Aborting forwarded request {:x?}", id);
101+
self.send(Address::Client, Event::ClientResponse { id, response: Err(Error::Abort) })?;
102+
}
103+
Ok(())
104+
}
105+
106+
/// Checks if an address is the current leader.
107+
fn is_leader(&self, from: &Address) -> bool {
108+
if let Some(leader) = &self.role.leader {
109+
if let Address::Node(from) = from {
110+
return leader == from;
111+
}
112+
}
113+
false
114+
}
115+
}
116+
117+
impl DynNode for RoleNode<Follower> {
118+
fn id(&self) -> NodeID {
119+
self.id
120+
}
121+
122+
fn step(mut self: Box<Self>, msg: Message) -> Result<BoxedNode> {
99123
self.assert()?;
100124
self.assert_step(&msg);
101125

102126
// Drop messages from past terms.
103127
if msg.term < self.term && msg.term > 0 {
104128
debug!("Dropping message from past term ({:?})", msg);
105-
return Ok(self.into());
129+
return Ok(self);
106130
}
107131

108132
// If we receive a message for a future term, become a leaderless
109133
// follower in it and step the message. If the message is a Heartbeat or
110134
// AppendEntries from the leader, stepping it will follow the leader.
111135
if msg.term > self.term {
112-
return self.become_follower(None, msg.term)?.step(msg);
136+
return Box::new(self.become_follower(None, msg.term)?).step(msg);
113137
}
114138

115139
// Record when we last saw a message from the leader (if any).
@@ -126,7 +150,7 @@ impl RoleNode<Follower> {
126150
let from = msg.from.unwrap();
127151
match self.role.leader {
128152
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
129-
None => self = self.become_follower(Some(from), msg.term)?,
153+
None => self = Box::new(self.become_follower(Some(from), msg.term)?),
130154
}
131155

132156
// Advance commit index and apply entries if possible.
@@ -149,7 +173,7 @@ impl RoleNode<Follower> {
149173
let from = msg.from.unwrap();
150174
match self.role.leader {
151175
Some(leader) => assert_eq!(from, leader, "Multiple leaders in term"),
152-
None => self = self.become_follower(Some(from), msg.term)?,
176+
None => self = Box::new(self.become_follower(Some(from), msg.term)?),
153177
}
154178

155179
// Append the entries, if possible.
@@ -169,7 +193,7 @@ impl RoleNode<Follower> {
169193
// If we already voted for someone else in this term, ignore it.
170194
if let Some(voted_for) = self.role.voted_for {
171195
if from != voted_for {
172-
return Ok(self.into());
196+
return Ok(self);
173197
}
174198
}
175199

@@ -189,7 +213,7 @@ impl RoleNode<Follower> {
189213
Event::ClientRequest { ref id, .. } => {
190214
if msg.from != Address::Client {
191215
error!("Received client request from non-client {:?}", msg.from);
192-
return Ok(self.into());
216+
return Ok(self);
193217
}
194218

195219
let id = id.clone();
@@ -206,7 +230,7 @@ impl RoleNode<Follower> {
206230
Event::ClientResponse { id, mut response } => {
207231
if !self.is_leader(&msg.from) {
208232
error!("Received client response from non-leader {:?}", msg.from);
209-
return Ok(self.into());
233+
return Ok(self);
210234
}
211235

212236
// TODO: Get rid of this field, it should be returned at the RPC
@@ -225,36 +249,16 @@ impl RoleNode<Follower> {
225249
| Event::RejectEntries { .. }
226250
| Event::GrantVote { .. } => warn!("Received unexpected message {:?}", msg),
227251
};
228-
Ok(self.into())
252+
Ok(self)
229253
}
230254

231-
/// Processes a logical clock tick.
232-
pub fn tick(mut self) -> Result<Node> {
255+
fn tick(mut self: Box<Self>) -> Result<BoxedNode> {
233256
self.assert()?;
234257

235258
self.role.leader_seen += 1;
236259
if self.role.leader_seen >= self.role.election_timeout {
237-
return Ok(self.become_candidate()?.into());
260+
return Ok(Box::new(self.become_candidate()?));
238261
}
239-
Ok(self.into())
240-
}
241-
242-
/// Aborts all forwarded requests.
243-
fn abort_forwarded(&mut self) -> Result<()> {
244-
for id in std::mem::take(&mut self.role.forwarded) {
245-
debug!("Aborting forwarded request {:x?}", id);
246-
self.send(Address::Client, Event::ClientResponse { id, response: Err(Error::Abort) })?;
247-
}
248-
Ok(())
249-
}
250-
251-
/// Checks if an address is the current leader.
252-
fn is_leader(&self, from: &Address) -> bool {
253-
if let Some(leader) = &self.role.leader {
254-
if let Address::Node(from) = from {
255-
return leader == from;
256-
}
257-
}
258-
false
262+
Ok(self)
259263
}
260264
}

0 commit comments

Comments
 (0)