Skip to content

Commit 32595ad

Browse files
committed
Add and use quorum_value() for Raft commits
1 parent 8a9a633 commit 32595ad

File tree

2 files changed

+38
-15
lines changed

2 files changed

+38
-15
lines changed

src/raft/node/leader.rs

Lines changed: 14 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,12 @@ impl RawNode<Leader> {
107107
);
108108

109109
let from = msg.from.unwrap();
110-
self.role.progress.entry(from).and_modify(|p| {
111-
p.last = last_index;
112-
p.next = last_index + 1;
113-
});
114-
self.maybe_commit()?;
110+
let progress = self.role.progress.get_mut(&from).unwrap();
111+
if last_index > progress.last {
112+
progress.last = last_index;
113+
progress.next = last_index + 1;
114+
self.maybe_commit()?;
115+
}
115116
}
116117

117118
// A follower rejected log entries we sent it, typically because it
@@ -225,16 +226,14 @@ impl RawNode<Leader> {
225226
fn maybe_commit(&mut self) -> Result<Index> {
226227
// Determine the new commit index, i.e. the last index replicated to a
227228
// quorum of peers.
228-
let mut last_indexes = self
229-
.role
230-
.progress
231-
.values()
232-
.map(|p| p.last)
233-
.chain(std::iter::once(self.log.get_last_index().0))
234-
.collect::<Vec<_>>();
235-
last_indexes.sort_unstable();
236-
last_indexes.reverse();
237-
let commit_index = last_indexes[self.quorum_size() as usize - 1];
229+
let commit_index = self.quorum_value(
230+
self.role
231+
.progress
232+
.values()
233+
.map(|p| p.last)
234+
.chain(std::iter::once(self.log.get_last_index().0))
235+
.collect(),
236+
);
238237

239238
// A 0 commit index means we haven't committed anything yet.
240239
if commit_index == 0 {

src/raft/node/mod.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ impl<R: Role> RawNode<R> {
173173
quorum_size(self.cluster_size())
174174
}
175175

176+
/// Returns the quorum value of the given unsorted slice, in descending
177+
/// order. The slice must have the same size as the cluster.
178+
fn quorum_value<T: Ord + Copy>(&self, values: Vec<T>) -> T {
179+
assert!(values.len() == self.cluster_size() as usize, "values must match cluster size");
180+
quorum_value(values)
181+
}
182+
176183
/// Sends an event
177184
fn send(&self, to: Address, event: Event) -> Result<()> {
178185
let msg = Message { term: self.term, from: Address::Node(self.id), to, event };
@@ -228,6 +235,14 @@ fn quorum_size(size: u8) -> u8 {
228235
size / 2 + 1
229236
}
230237

238+
/// Returns the quorum (median) value of the given unsorted slice, in descending
239+
/// order. The slice cannot be empty.
240+
fn quorum_value<T: Ord + Copy>(mut values: Vec<T>) -> T {
241+
assert!(!values.is_empty(), "no values provided");
242+
let index = quorum_size(values.len() as u8) as usize - 1;
243+
*values.select_nth_unstable_by(index, |a, b: &T| a.cmp(b).reverse()).1
244+
}
245+
231246
#[cfg(test)]
232247
mod tests {
233248
pub use super::super::state::tests::TestState;
@@ -534,4 +549,13 @@ mod tests {
534549
assert_eq!(super::quorum_size(size), quorum);
535550
}
536551
}
552+
553+
#[test]
554+
fn quorum_value() {
555+
assert_eq!(super::quorum_value(vec![1]), 1);
556+
assert_eq!(super::quorum_value(vec![1, 3, 2]), 2);
557+
assert_eq!(super::quorum_value(vec![4, 1, 3, 2]), 2);
558+
assert_eq!(super::quorum_value(vec![1, 1, 1, 2, 2]), 1);
559+
assert_eq!(super::quorum_value(vec![1, 1, 2, 2, 2]), 2);
560+
}
537561
}

0 commit comments

Comments
 (0)