Skip to content

Commit f83c73c

Browse files
committed
Apply noop commands to state machine.
This simplifies applied_index tracking.
1 parent 6abf3f0 commit f83c73c

File tree

3 files changed

+53
-61
lines changed

3 files changed

+53
-61
lines changed

Diff for: src/raft/node/mod.rs

+9-14
Original file line numberDiff line numberDiff line change
@@ -479,23 +479,18 @@ mod tests {
479479
}
480480

481481
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
482-
async fn new_state_apply_missing() -> Result<()> {
482+
#[should_panic(expected = "applied index above commit index")]
483+
async fn new_state_apply_missing() {
483484
let (node_tx, _) = mpsc::unbounded_channel();
484-
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?;
485-
log.append(1, Some(vec![0x01]))?;
486-
log.append(2, None)?;
487-
log.append(2, Some(vec![0x02]))?;
488-
log.commit(3)?;
489-
log.append(2, Some(vec![0x03]))?;
485+
let mut log = Log::new(Box::new(storage::engine::Memory::new()), false).unwrap();
486+
log.append(1, Some(vec![0x01])).unwrap();
487+
log.append(2, None).unwrap();
488+
log.append(2, Some(vec![0x02])).unwrap();
489+
log.commit(3).unwrap();
490+
log.append(2, Some(vec![0x03])).unwrap();
490491
let state = Box::new(TestState::new(4));
491492

492-
assert_eq!(
493-
Node::new(1, vec![2, 3], log, state.clone(), node_tx).await.err(),
494-
Some(Error::Internal(
495-
"State machine applied index 4 greater than log commit index 3".into()
496-
))
497-
);
498-
Ok(())
493+
Node::new(1, vec![2, 3], log, state.clone(), node_tx).await.unwrap();
499494
}
500495

501496
#[tokio::test]

Diff for: src/raft/state.rs

+28-33
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,16 @@ pub trait State: Send {
1212
/// Returns the last applied index from the state machine.
1313
fn get_applied_index(&self) -> Index;
1414

15-
/// Mutates the state machine. If the state machine returns Error::Internal, the Raft node
16-
/// halts. For any other error, the state is applied and the error propagated to the caller.
17-
fn mutate(&mut self, index: Index, command: Vec<u8>) -> Result<Vec<u8>>;
15+
/// Applies a log entry to the state machine. If it returns Error::Internal,
16+
/// the Raft node halts. Any other error is considered applied and returned
17+
/// to the caller.
18+
///
19+
/// The entry may contain a noop command, which is committed by Raft during
20+
/// leader changes. This still needs to be applied to the state machine to
21+
/// properly track the applied index, and returns an empty result.
22+
///
23+
/// TODO: consider using runtime assertions instead of Error::Internal.
24+
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>>;
1825

1926
/// Queries the state machine. All errors are propagated to the caller.
2027
fn query(&self, command: Vec<u8>) -> Result<Vec<u8>>;
@@ -51,7 +58,6 @@ struct Query {
5158
pub struct Driver {
5259
state_rx: UnboundedReceiverStream<Instruction>,
5360
node_tx: mpsc::UnboundedSender<Message>,
54-
applied_index: Index,
5561
/// Notify clients when their mutation is applied. <index, (client, id)>
5662
notify: HashMap<Index, (Address, Vec<u8>)>,
5763
/// Execute client queries when they receive a quorum. <index, <id, query>>
@@ -67,16 +73,14 @@ impl Driver {
6773
Self {
6874
state_rx: UnboundedReceiverStream::new(state_rx),
6975
node_tx,
70-
applied_index: 0,
7176
notify: HashMap::new(),
7277
queries: BTreeMap::new(),
7378
}
7479
}
7580

7681
/// Drives a state machine.
7782
pub async fn drive(mut self, mut state: Box<dyn State>) -> Result<()> {
78-
self.applied_index = state.get_applied_index();
79-
debug!("Starting state machine driver at applied index {}", self.applied_index);
83+
debug!("Starting state machine driver at applied index {}", state.get_applied_index());
8084
while let Some(instruction) = self.state_rx.next().await {
8185
if let Err(error) = self.execute(instruction, &mut *state).await {
8286
error!("Halting state machine due to error: {}", error);
@@ -91,40 +95,29 @@ impl Driver {
9195
pub fn apply_log(&mut self, state: &mut dyn State, log: &mut Log) -> Result<Index> {
9296
let applied_index = state.get_applied_index();
9397
let (commit_index, _) = log.get_commit_index();
94-
if applied_index > commit_index {
95-
return Err(Error::Internal(format!(
96-
"State machine applied index {} greater than log commit index {}",
97-
applied_index, commit_index
98-
)));
99-
}
98+
assert!(applied_index <= commit_index, "applied index above commit index");
99+
100100
if applied_index < commit_index {
101101
let mut scan = log.scan((applied_index + 1)..=commit_index)?;
102102
while let Some(entry) = scan.next().transpose()? {
103103
self.apply(state, entry)?;
104104
}
105105
}
106-
Ok(self.applied_index)
106+
Ok(state.get_applied_index())
107107
}
108108

109109
/// Applies an entry to the state machine.
110110
pub fn apply(&mut self, state: &mut dyn State, entry: Entry) -> Result<Index> {
111-
// Apply the command, unless it's a noop.
111+
// Apply the command.
112112
debug!("Applying {:?}", entry);
113-
if let Some(command) = entry.command {
114-
match state.mutate(entry.index, command) {
115-
Err(error @ Error::Internal(_)) => return Err(error),
116-
result => self.notify_applied(entry.index, result)?,
117-
};
118-
}
119-
// We have to track applied_index here, separately from the state machine, because
120-
// no-op log entries are significant for whether a query should be executed.
121-
//
122-
// TODO: track noop commands in the state machine.
123-
self.applied_index = entry.index;
113+
match state.apply(entry) {
114+
Err(error @ Error::Internal(_)) => return Err(error),
115+
result => self.notify_applied(state.get_applied_index(), result)?,
116+
};
124117
// Try to execute any pending queries, since they may have been submitted for a
125118
// commit_index which hadn't been applied yet.
126119
self.query_execute(state)?;
127-
Ok(self.applied_index)
120+
Ok(state.get_applied_index())
128121
}
129122

130123
/// Executes a state machine instruction.
@@ -202,7 +195,7 @@ impl Driver {
202195

203196
/// Executes any queries that are ready.
204197
fn query_execute(&mut self, state: &mut dyn State) -> Result<()> {
205-
for query in self.query_ready(self.applied_index) {
198+
for query in self.query_ready(state.get_applied_index()) {
206199
debug!("Executing query {:?}", query.command);
207200
let result = state.query(query.command);
208201
if let Err(error @ Error::Internal(_)) = result {
@@ -291,11 +284,13 @@ pub mod tests {
291284
*self.applied_index.lock().unwrap()
292285
}
293286

294-
// Appends the command to the internal commands list.
295-
fn mutate(&mut self, index: Index, command: Vec<u8>) -> Result<Vec<u8>> {
296-
self.commands.lock()?.push(command.clone());
297-
*self.applied_index.lock()? = index;
298-
Ok(command)
287+
// Appends the entry to the internal command list.
288+
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
289+
if let Some(command) = &entry.command {
290+
self.commands.lock()?.push(command.clone());
291+
}
292+
*self.applied_index.lock()? = entry.index;
293+
Ok(entry.command.unwrap_or_default())
299294
}
300295

301296
// Appends the command to the internal commands list.

Diff for: src/sql/engine/raft.rs

+16-14
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use super::super::schema::{Catalog, Table, Tables};
22
use super::super::types::{Expression, Row, Value};
33
use super::{Engine as _, IndexScan, Scan, Transaction as _};
44
use crate::error::{Error, Result};
5-
use crate::raft;
5+
use crate::raft::{self, Entry};
66
use crate::storage::{self, bincode, mvcc::TransactionState};
77

88
use serde::{de::DeserializeOwned, Deserialize, Serialize};
@@ -294,8 +294,8 @@ impl<E: storage::engine::Engine> State<E> {
294294
Ok(State { engine, applied_index })
295295
}
296296

297-
/// Applies a state machine mutation
298-
fn apply(&mut self, mutation: Mutation) -> Result<Vec<u8>> {
297+
/// Mutates the state machine.
298+
fn mutate(&mut self, mutation: Mutation) -> Result<Vec<u8>> {
299299
match mutation {
300300
Mutation::Begin { read_only, as_of } => {
301301
let txn = if !read_only {
@@ -335,17 +335,19 @@ impl<E: storage::engine::Engine> raft::State for State<E> {
335335
self.applied_index
336336
}
337337

338-
fn mutate(&mut self, index: u64, command: Vec<u8>) -> Result<Vec<u8>> {
339-
// We don't check that index == applied_index + 1, since the Raft log commits no-op
340-
// entries during leader election which we need to ignore.
341-
match self.apply(bincode::deserialize(&command)?) {
342-
error @ Err(Error::Internal(_)) => error,
343-
result => {
344-
self.engine.set_metadata(b"applied_index", bincode::serialize(&(index))?)?;
345-
self.applied_index = index;
346-
result
347-
}
348-
}
338+
fn apply(&mut self, entry: Entry) -> Result<Vec<u8>> {
339+
assert_eq!(entry.index, self.applied_index + 1, "entry index not after applied index");
340+
341+
let result = match &entry.command {
342+
Some(command) => match self.mutate(bincode::deserialize(command)?) {
343+
error @ Err(Error::Internal(_)) => return error, // don't record as applied
344+
result => result,
345+
},
346+
None => Ok(Vec::new()),
347+
};
348+
self.applied_index = entry.index;
349+
self.engine.set_metadata(b"applied_index", bincode::serialize(&entry.index)?)?;
350+
result
349351
}
350352

351353
fn query(&self, command: Vec<u8>) -> Result<Vec<u8>> {

0 commit comments

Comments
 (0)