From d2b1a79f2d0205902f2f01abee61d7bebf50257a Mon Sep 17 00:00:00 2001 From: Erik Grinaker Date: Sat, 18 Nov 2023 21:44:47 +0100 Subject: [PATCH] Remove `raft::Address::Local` --- src/raft/message.rs | 2 -- src/raft/node/candidate.rs | 14 +++++----- src/raft/node/follower.rs | 52 +++++++++++++++++++------------------- src/raft/node/leader.rs | 22 ++++++++-------- src/raft/node/mod.rs | 8 +++--- src/raft/server.rs | 20 ++++++--------- src/raft/state.rs | 25 ++++++++++-------- 7 files changed, 69 insertions(+), 74 deletions(-) diff --git a/src/raft/message.rs b/src/raft/message.rs index a35b82aeb..bef414a40 100644 --- a/src/raft/message.rs +++ b/src/raft/message.rs @@ -11,8 +11,6 @@ pub enum Address { /// A node with the specified node ID (local or remote). Valid both as /// sender and recipient. Node(NodeID), - /// The local node. - Local, /// A local client. Client, } diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index 825e7231e..68e465ae9 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -172,7 +172,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, @@ -197,7 +197,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 4, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, @@ -252,7 +252,7 @@ mod tests { assert_eq!( node_rx.try_recv()?, Message { - from: Address::Local, + from: Address::Node(1), to: Address::Broadcast, term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, @@ -263,7 +263,7 @@ mod tests { assert_eq!( node_rx.try_recv()?, Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(to), term: 3, event: Event::ReplicateEntries { @@ -288,7 +288,7 @@ mod tests { node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Mutate(vec![0xaf]) }, })?; @@ -296,7 +296,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 3, event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) }, @@ -322,7 +322,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Broadcast, term: 4, event: Event::SolicitVote { last_index: 3, last_term: 2 }, diff --git a/src/raft/node/follower.rs b/src/raft/node/follower.rs index 57d415474..60cf2b5f5 100644 --- a/src/raft/node/follower.rs +++ b/src/raft/node/follower.rs @@ -228,7 +228,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, @@ -257,7 +257,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: false }, @@ -281,7 +281,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ConfirmLeader { commit_index: 5, has_committed: false }, @@ -322,7 +322,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(3), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, @@ -351,7 +351,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ConfirmLeader { commit_index: 1, has_committed: true }, @@ -375,7 +375,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(3), term: 4, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, @@ -422,7 +422,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(3), term: 3, event: Event::GrantVote, @@ -441,7 +441,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(3), term: 3, event: Event::GrantVote, @@ -552,7 +552,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::AcceptEntries { last_index: 2 }, @@ -589,7 +589,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::AcceptEntries { last_index: 5 }, @@ -625,7 +625,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::AcceptEntries { last_index: 4 }, @@ -661,7 +661,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::AcceptEntries { last_index: 4 }, @@ -697,7 +697,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::AcceptEntries { last_index: 4 }, @@ -729,7 +729,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::RejectEntries, @@ -761,7 +761,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::RejectEntries, @@ -779,7 +779,7 @@ pub mod tests { node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Mutate(vec![0xaf]) }, })?; @@ -791,7 +791,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ClientRequest { @@ -815,7 +815,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 3, event: Event::ClientResponse { @@ -837,7 +837,7 @@ pub mod tests { node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Mutate(vec![0xaf]) }, })?; @@ -845,7 +845,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 3, event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) }, @@ -863,7 +863,7 @@ pub mod tests { node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Mutate(vec![0xaf]) }, })?; @@ -875,7 +875,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ClientRequest { @@ -898,13 +898,13 @@ pub mod tests { &mut node_rx, vec![ Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 4, event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) }, }, Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(3), term: 4, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, @@ -940,7 +940,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, @@ -957,7 +957,7 @@ pub mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Broadcast, term: 4, event: Event::SolicitVote { last_index: 3, last_term: 2 }, diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index 97cc18354..2b4f4bfe5 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -153,7 +153,7 @@ impl RoleNode { self.state_tx.send(Instruction::Vote { term: self.term, index: commit_index, - address: Address::Local, + address: Address::Node(self.id), })?; if !self.peers.is_empty() { self.send(Address::Broadcast, Event::Heartbeat { commit_index, commit_term })?; @@ -294,7 +294,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ReplicateEntries { base_index: 5, base_term: 3, entries: vec![] }, @@ -341,7 +341,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 4, event: Event::ConfirmLeader { commit_index: 7, has_committed: false }, @@ -524,7 +524,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 3, event: Event::ReplicateEntries { @@ -551,7 +551,7 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Query(vec![0xaf]) }, })?; @@ -559,7 +559,7 @@ mod tests { assert_messages( &mut node_rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Broadcast, term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, @@ -576,7 +576,7 @@ mod tests { index: 2, quorum, }, - Instruction::Vote { term: 3, index: 2, address: Address::Local }, + Instruction::Vote { term: 3, index: 2, address: Address::Node(1) }, ], ); Ok(()) @@ -591,7 +591,7 @@ mod tests { node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Mutate(vec![0xaf]) }, })?; @@ -605,7 +605,7 @@ mod tests { assert_eq!( node_rx.try_recv()?, Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(peer), term: 3, event: Event::ReplicateEntries { @@ -633,7 +633,7 @@ mod tests { node = node.step(Message { from: Address::Client, - to: Address::Local, + to: Address::Node(1), term: 0, event: Event::ClientRequest { id: vec![0x01], request: Request::Status }, })?; @@ -677,7 +677,7 @@ mod tests { assert_eq!( node_rx.try_recv()?, Message { - from: Address::Local, + from: Address::Node(1), to: Address::Broadcast, term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index e1ebccf25..1ef9c7209 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -58,7 +58,7 @@ impl Node { node_tx: mpsc::UnboundedSender, ) -> Result { let (state_tx, state_rx) = mpsc::unbounded_channel(); - let mut driver = Driver::new(state_rx, node_tx.clone()); + let mut driver = Driver::new(id, state_rx, node_tx.clone()); driver.apply_log(&mut *state, &mut log)?; tokio::spawn(driver.drive(state)); @@ -172,7 +172,7 @@ impl RoleNode { /// Sends an event fn send(&self, to: Address, event: Event) -> Result<()> { - let msg = Message { term: self.term, from: Address::Local, to, event }; + let msg = Message { term: self.term, from: Address::Node(self.id), to, event }; debug!("Sending {:?}", msg); Ok(self.node_tx.send(msg)?) } @@ -183,7 +183,6 @@ impl RoleNode { Address::Broadcast => { return Err(Error::Internal("Message from broadcast address".into())) } - Address::Local => return Err(Error::Internal("Message from local node".into())), Address::Client if !matches!(msg.event, Event::ClientRequest { .. }) => { return Err(Error::Internal("Non-request message from client".into())); } @@ -199,7 +198,6 @@ impl RoleNode { match msg.to { Address::Node(id) if id == self.id => Ok(()), - Address::Local => Ok(()), Address::Broadcast => Ok(()), Address::Node(id) => { Err(Error::Internal(format!("Received message for other node {}", id))) @@ -511,7 +509,7 @@ mod tests { assert_messages( &mut rx, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(2), term: 1, event: Event::Heartbeat { commit_index: 1, commit_term: 1 }, diff --git a/src/raft/server.rs b/src/raft/server.rs index 3720933dd..17e1961ac 100644 --- a/src/raft/server.rs +++ b/src/raft/server.rs @@ -48,8 +48,7 @@ impl Server { let (tcp_out_tx, tcp_out_rx) = mpsc::unbounded_channel::(); let (task, tcp_receiver) = Self::tcp_receive(listener, tcp_in_tx).remote_handle(); tokio::spawn(task); - let (task, tcp_sender) = - Self::tcp_send(self.node.id(), self.peers, tcp_out_rx).remote_handle(); + let (task, tcp_sender) = Self::tcp_send(self.peers, tcp_out_rx).remote_handle(); tokio::spawn(task); let (task, eventloop) = Self::eventloop(self.node, self.node_rx, client_rx, tcp_in_rx, tcp_out_tx) @@ -97,13 +96,14 @@ impl Server { Some((request, response_tx)) = client_rx.next() => { let id = Uuid::new_v4().as_bytes().to_vec(); - requests.insert(id.clone(), response_tx); - node = node.step(Message{ + let msg = Message{ from: Address::Client, - to: Address::Local, + to: Address::Node(node.id()), term: 0, - event: Event::ClientRequest{id, request}, - })?; + event: Event::ClientRequest{id: id.clone(), request}, + }; + node = node.step(msg)?; + requests.insert(id, response_tx); } } } @@ -146,7 +146,6 @@ impl Server { /// Sends outbound messages to peers via TCP. async fn tcp_send( - node_id: NodeID, peers: HashMap, out_rx: mpsc::UnboundedReceiver, ) -> Result<()> { @@ -159,10 +158,7 @@ impl Server { tokio::spawn(Self::tcp_send_peer(addr, rx)); } - while let Some(mut message) = out_rx.next().await { - if message.from == Address::Local { - message.from = Address::Node(node_id) - } + while let Some(message) = out_rx.next().await { let to = match message.to { Address::Broadcast => peer_txs.keys().copied().collect(), Address::Node(peer) => vec![peer], diff --git a/src/raft/state.rs b/src/raft/state.rs index 1337b11b7..145607aed 100644 --- a/src/raft/state.rs +++ b/src/raft/state.rs @@ -1,4 +1,4 @@ -use super::{Address, Entry, Event, Index, Log, Message, Response, Status, Term}; +use super::{Address, Entry, Event, Index, Log, Message, NodeID, Response, Status, Term}; use crate::error::{Error, Result}; use log::{debug, error}; @@ -56,6 +56,7 @@ struct Query { /// Drives a state machine, taking operations from state_rx and sending results via node_tx. pub struct Driver { + node_id: NodeID, state_rx: UnboundedReceiverStream, node_tx: mpsc::UnboundedSender, /// Notify clients when their mutation is applied. @@ -67,10 +68,12 @@ pub struct Driver { impl Driver { /// Creates a new state machine driver. pub fn new( + node_id: NodeID, state_rx: mpsc::UnboundedReceiver, node_tx: mpsc::UnboundedSender, ) -> Self { Self { + node_id, state_rx: UnboundedReceiverStream::new(state_rx), node_tx, notify: HashMap::new(), @@ -248,7 +251,7 @@ impl Driver { /// Sends a message. fn send(&self, to: Address, event: Event) -> Result<()> { - let msg = Message { from: Address::Local, to, term: 0, event }; + let msg = Message { from: Address::Node(self.node_id), to, term: 0, event }; debug!("Sending {:?}", msg); Ok(self.node_tx.send(msg)?) } @@ -308,7 +311,7 @@ pub mod tests { let state = Box::new(TestState::new(0)); let (state_tx, state_rx) = mpsc::unbounded_channel(); let (node_tx, node_rx) = mpsc::unbounded_channel(); - tokio::spawn(Driver::new(state_rx, node_tx).drive(state.clone())); + tokio::spawn(Driver::new(1, state_rx, node_tx).drive(state.clone())); Ok((state, state_tx, node_rx)) } @@ -329,7 +332,7 @@ pub mod tests { index: 1, quorum: 2, })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Local })?; + state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; state_tx.send(Instruction::Abort)?; std::mem::drop(state_tx); @@ -338,13 +341,13 @@ pub mod tests { node_rx.collect::>().await, vec![ Message { - from: Address::Local, + from: Address::Node(1), to: Address::Node(1), term: 0, event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) } }, Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 0, event: Event::ClientResponse { id: vec![0x02], response: Err(Error::Abort) } @@ -376,7 +379,7 @@ pub mod tests { assert_eq!( node_rx.collect::>().await, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 0, event: Event::ClientResponse { @@ -406,15 +409,15 @@ pub mod tests { state_tx.send(Instruction::Apply { entry: Entry { index: 1, term: 2, command: Some(vec![0xaf]) }, })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Local })?; state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(1) })?; + state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(2) })?; std::mem::drop(state_tx); let node_rx = UnboundedReceiverStream::new(node_rx); assert_eq!( node_rx.collect::>().await, vec![Message { - from: Address::Local, + from: Address::Node(1), to: Address::Client, term: 0, event: Event::ClientResponse { @@ -443,7 +446,7 @@ pub mod tests { state_tx.send(Instruction::Apply { entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, })?; - state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Local })?; + state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Node(1) })?; state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; std::mem::drop(state_tx); @@ -467,7 +470,7 @@ pub mod tests { state_tx.send(Instruction::Apply { entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, })?; - state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Local })?; + state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Node(1) })?; std::mem::drop(state_tx); let node_rx = UnboundedReceiverStream::new(node_rx);