diff --git a/.gitignore b/.gitignore index 8a9d93265..223f36ece 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,4 @@ -/clusters/*/toydb-?/data +/clusters/*/toydb*/data /data /target .vscode/ diff --git a/clusters/docker/docker-compose.yml b/clusters/docker/docker-compose.yml index 79328663e..4b1d78ef9 100644 --- a/clusters/docker/docker-compose.yml +++ b/clusters/docker/docker-compose.yml @@ -6,62 +6,62 @@ networks: ipam: driver: default config: - - subnet: 172.20.0.0/24 + - subnet: 172.20.0.0/24 services: - toydb-a: &toydb + toydb1: &toydb build: .. environment: - - TOYDB_LOG_LEVEL + - TOYDB_LOG_LEVEL volumes: - - ./toydb-a/toydb.yaml:/etc/toydb.yaml - - ./toydb-a/data:/var/lib/toydb + - ./toydb1/toydb.yaml:/etc/toydb.yaml + - ./toydb1/data:/var/lib/toydb networks: toydb: ipv4_address: 172.20.0.101 ports: - - 9601:9605 + - 9601:9605 - toydb-b: + toydb2: <<: *toydb volumes: - - ./toydb-b/toydb.yaml:/etc/toydb.yaml - - ./toydb-b/data:/var/lib/toydb + - ./toydb2/toydb.yaml:/etc/toydb.yaml + - ./toydb2/data:/var/lib/toydb networks: toydb: ipv4_address: 172.20.0.102 ports: - - 9602:9605 + - 9602:9605 - toydb-c: + toydb3: <<: *toydb volumes: - - ./toydb-c/toydb.yaml:/etc/toydb.yaml - - ./toydb-c/data:/var/lib/toydb + - ./toydb3/toydb.yaml:/etc/toydb.yaml + - ./toydb3/data:/var/lib/toydb networks: toydb: ipv4_address: 172.20.0.103 ports: - - 9603:9605 + - 9603:9605 - toydb-d: + toydb4: <<: *toydb volumes: - - ./toydb-d/toydb.yaml:/etc/toydb.yaml - - ./toydb-d/data:/var/lib/toydb + - ./toydb4/toydb.yaml:/etc/toydb.yaml + - ./toydb4/data:/var/lib/toydb networks: toydb: ipv4_address: 172.20.0.104 ports: - - 9604:9605 + - 9604:9605 - toydb-e: + toydb5: <<: *toydb volumes: - - ./toydb-e/toydb.yaml:/etc/toydb.yaml - - ./toydb-e/data:/var/lib/toydb + - ./toydb5/toydb.yaml:/etc/toydb.yaml + - ./toydb5/data:/var/lib/toydb networks: toydb: ipv4_address: 172.20.0.105 ports: - - 9605:9605 + - 9605:9605 diff --git a/clusters/docker/toydb-a/toydb.yaml b/clusters/docker/toydb-a/toydb.yaml deleted file mode 100644 index f5788479f..000000000 --- a/clusters/docker/toydb-a/toydb.yaml +++ /dev/null @@ -1,7 +0,0 @@ -id: toydb-a -data_dir: /var/lib/toydb -peers: - toydb-b: 172.20.0.102 - toydb-c: 172.20.0.103 - toydb-d: 172.20.0.104 - toydb-e: 172.20.0.105 \ No newline at end of file diff --git a/clusters/docker/toydb-b/toydb.yaml b/clusters/docker/toydb-b/toydb.yaml deleted file mode 100644 index 6b3d8709f..000000000 --- a/clusters/docker/toydb-b/toydb.yaml +++ /dev/null @@ -1,7 +0,0 @@ -id: toydb-b -data_dir: /var/lib/toydb -peers: - toydb-a: 172.20.0.101 - toydb-c: 172.20.0.103 - toydb-d: 172.20.0.104 - toydb-e: 172.20.0.105 diff --git a/clusters/docker/toydb-c/toydb.yaml b/clusters/docker/toydb-c/toydb.yaml deleted file mode 100644 index 6f73a1e90..000000000 --- a/clusters/docker/toydb-c/toydb.yaml +++ /dev/null @@ -1,7 +0,0 @@ -id: toydb-c -data_dir: /var/lib/toydb -peers: - toydb-a: 172.20.0.101 - toydb-b: 172.20.0.102 - toydb-d: 172.20.0.104 - toydb-e: 172.20.0.105 diff --git a/clusters/docker/toydb-d/toydb.yaml b/clusters/docker/toydb-d/toydb.yaml deleted file mode 100644 index 91934a18a..000000000 --- a/clusters/docker/toydb-d/toydb.yaml +++ /dev/null @@ -1,7 +0,0 @@ -id: toydb-d -data_dir: /var/lib/toydb -peers: - toydb-a: 172.20.0.101 - toydb-b: 172.20.0.102 - toydb-c: 172.20.0.103 - toydb-e: 172.20.0.105 diff --git a/clusters/docker/toydb-e/toydb.yaml b/clusters/docker/toydb-e/toydb.yaml deleted file mode 100644 index 8288798ef..000000000 --- a/clusters/docker/toydb-e/toydb.yaml +++ /dev/null @@ -1,7 +0,0 @@ -id: toydb-e -data_dir: /var/lib/toydb -peers: - toydb-a: 172.20.0.101 - toydb-b: 172.20.0.102 - toydb-c: 172.20.0.103 - toydb-d: 172.20.0.104 diff --git a/clusters/docker/toydb-a/data/.gitkeep b/clusters/docker/toydb1/data/.gitkeep similarity index 100% rename from clusters/docker/toydb-a/data/.gitkeep rename to clusters/docker/toydb1/data/.gitkeep diff --git a/clusters/docker/toydb1/toydb.yaml b/clusters/docker/toydb1/toydb.yaml new file mode 100644 index 000000000..d2163b89b --- /dev/null +++ b/clusters/docker/toydb1/toydb.yaml @@ -0,0 +1,7 @@ +id: 1 +data_dir: /var/lib/toydb +peers: + 2: 172.20.0.102 + 3: 172.20.0.103 + 4: 172.20.0.104 + 5: 172.20.0.105 \ No newline at end of file diff --git a/clusters/docker/toydb-b/data/.gitkeep b/clusters/docker/toydb2/data/.gitkeep similarity index 100% rename from clusters/docker/toydb-b/data/.gitkeep rename to clusters/docker/toydb2/data/.gitkeep diff --git a/clusters/docker/toydb2/toydb.yaml b/clusters/docker/toydb2/toydb.yaml new file mode 100644 index 000000000..344336a72 --- /dev/null +++ b/clusters/docker/toydb2/toydb.yaml @@ -0,0 +1,7 @@ +id: 2 +data_dir: /var/lib/toydb +peers: + 1: 172.20.0.101 + 3: 172.20.0.103 + 4: 172.20.0.104 + 5: 172.20.0.105 diff --git a/clusters/docker/toydb-c/data/.gitkeep b/clusters/docker/toydb3/data/.gitkeep similarity index 100% rename from clusters/docker/toydb-c/data/.gitkeep rename to clusters/docker/toydb3/data/.gitkeep diff --git a/clusters/docker/toydb3/toydb.yaml b/clusters/docker/toydb3/toydb.yaml new file mode 100644 index 000000000..4aac1b338 --- /dev/null +++ b/clusters/docker/toydb3/toydb.yaml @@ -0,0 +1,7 @@ +3: toydb-c +data_dir: /var/lib/toydb +peers: + 1: 172.20.0.101 + 2: 172.20.0.102 + 4: 172.20.0.104 + 5: 172.20.0.105 diff --git a/clusters/docker/toydb-d/data/.gitkeep b/clusters/docker/toydb4/data/.gitkeep similarity index 100% rename from clusters/docker/toydb-d/data/.gitkeep rename to clusters/docker/toydb4/data/.gitkeep diff --git a/clusters/docker/toydb4/toydb.yaml b/clusters/docker/toydb4/toydb.yaml new file mode 100644 index 000000000..499cc23c0 --- /dev/null +++ b/clusters/docker/toydb4/toydb.yaml @@ -0,0 +1,7 @@ +4: toydb-d +data_dir: /var/lib/toydb +peers: + 1: 172.20.0.101 + 2: 172.20.0.102 + 3: 172.20.0.103 + 5: 172.20.0.105 diff --git a/clusters/docker/toydb-e/data/.gitkeep b/clusters/docker/toydb5/data/.gitkeep similarity index 100% rename from clusters/docker/toydb-e/data/.gitkeep rename to clusters/docker/toydb5/data/.gitkeep diff --git a/clusters/docker/toydb5/toydb.yaml b/clusters/docker/toydb5/toydb.yaml new file mode 100644 index 000000000..05e590caa --- /dev/null +++ b/clusters/docker/toydb5/toydb.yaml @@ -0,0 +1,7 @@ +5: toydb-e +data_dir: /var/lib/toydb +peers: + 1: 172.20.0.101 + 2: 172.20.0.102 + 3: 172.20.0.103 + 4: 172.20.0.104 diff --git a/clusters/local/run.sh b/clusters/local/run.sh index 051c8cf16..4ec23a990 100755 --- a/clusters/local/run.sh +++ b/clusters/local/run.sh @@ -4,8 +4,8 @@ set -euo pipefail cargo build --release --bin toydb -for ID in a b c d e; do - (cargo run -q --release -- -c toydb-$ID/toydb.yaml 2>&1 | sed -e "s/\\(.*\\)/toydb-$ID \\1/g") & +for ID in 1 2 3 4 5; do + (cargo run -q --release -- -c toydb$ID/toydb.yaml 2>&1 | sed -e "s/\\(.*\\)/toydb$ID \\1/g") & done trap 'kill $(jobs -p)' EXIT diff --git a/clusters/local/toydb-a/toydb.yaml b/clusters/local/toydb-a/toydb.yaml deleted file mode 100644 index 325abbc72..000000000 --- a/clusters/local/toydb-a/toydb.yaml +++ /dev/null @@ -1,10 +0,0 @@ -id: toydb-a -data_dir: toydb-a/data -sync: false -listen_sql: 0.0.0.0:9601 -listen_raft: 0.0.0.0:9701 -peers: - toydb-b: 127.0.0.1:9702 - toydb-c: 127.0.0.1:9703 - toydb-d: 127.0.0.1:9704 - toydb-e: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-b/toydb.yaml b/clusters/local/toydb-b/toydb.yaml deleted file mode 100644 index c8961c107..000000000 --- a/clusters/local/toydb-b/toydb.yaml +++ /dev/null @@ -1,10 +0,0 @@ -id: toydb-b -data_dir: toydb-b/data -sync: false -listen_sql: 0.0.0.0:9602 -listen_raft: 0.0.0.0:9702 -peers: - toydb-a: 127.0.0.1:9701 - toydb-c: 127.0.0.1:9703 - toydb-d: 127.0.0.1:9704 - toydb-e: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-c/toydb.yaml b/clusters/local/toydb-c/toydb.yaml deleted file mode 100644 index d6645bab8..000000000 --- a/clusters/local/toydb-c/toydb.yaml +++ /dev/null @@ -1,10 +0,0 @@ -id: toydb-c -data_dir: toydb-c/data -sync: false -listen_sql: 0.0.0.0:9603 -listen_raft: 0.0.0.0:9703 -peers: - toydb-a: 127.0.0.1:9701 - toydb-b: 127.0.0.1:9702 - toydb-d: 127.0.0.1:9704 - toydb-e: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-d/toydb.yaml b/clusters/local/toydb-d/toydb.yaml deleted file mode 100644 index 11c6ce3ce..000000000 --- a/clusters/local/toydb-d/toydb.yaml +++ /dev/null @@ -1,10 +0,0 @@ -id: toydb-d -data_dir: toydb-d/data -sync: false -listen_sql: 0.0.0.0:9604 -listen_raft: 0.0.0.0:9704 -peers: - toydb-a: 127.0.0.1:9701 - toydb-b: 127.0.0.1:9702 - toydb-c: 127.0.0.1:9703 - toydb-e: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-e/toydb.yaml b/clusters/local/toydb-e/toydb.yaml deleted file mode 100644 index 47d48d84b..000000000 --- a/clusters/local/toydb-e/toydb.yaml +++ /dev/null @@ -1,10 +0,0 @@ -id: toydb-e -data_dir: toydb-e/data -sync: false -listen_sql: 0.0.0.0:9605 -listen_raft: 0.0.0.0:9705 -peers: - toydb-a: 127.0.0.1:9701 - toydb-b: 127.0.0.1:9702 - toydb-c: 127.0.0.1:9703 - toydb-d: 127.0.0.1:9704 \ No newline at end of file diff --git a/clusters/local/toydb-a/data/.gitkeep b/clusters/local/toydb1/data/.gitkeep similarity index 100% rename from clusters/local/toydb-a/data/.gitkeep rename to clusters/local/toydb1/data/.gitkeep diff --git a/clusters/local/toydb1/data/log b/clusters/local/toydb1/data/log new file mode 100644 index 000000000..cb542e548 Binary files /dev/null and b/clusters/local/toydb1/data/log differ diff --git a/clusters/local/toydb1/data/state b/clusters/local/toydb1/data/state new file mode 100644 index 000000000..033a480cc Binary files /dev/null and b/clusters/local/toydb1/data/state differ diff --git a/clusters/local/toydb1/toydb.yaml b/clusters/local/toydb1/toydb.yaml new file mode 100644 index 000000000..a00912fe8 --- /dev/null +++ b/clusters/local/toydb1/toydb.yaml @@ -0,0 +1,10 @@ +id: 1 +data_dir: toydb1/data +sync: false +listen_sql: 0.0.0.0:9601 +listen_raft: 0.0.0.0:9701 +peers: + 2: 127.0.0.1:9702 + 3: 127.0.0.1:9703 + 4: 127.0.0.1:9704 + 5: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-b/data/.gitkeep b/clusters/local/toydb2/data/.gitkeep similarity index 100% rename from clusters/local/toydb-b/data/.gitkeep rename to clusters/local/toydb2/data/.gitkeep diff --git a/clusters/local/toydb2/data/log b/clusters/local/toydb2/data/log new file mode 100644 index 000000000..fa8c39041 Binary files /dev/null and b/clusters/local/toydb2/data/log differ diff --git a/clusters/local/toydb2/data/state b/clusters/local/toydb2/data/state new file mode 100644 index 000000000..033a480cc Binary files /dev/null and b/clusters/local/toydb2/data/state differ diff --git a/clusters/local/toydb2/toydb.yaml b/clusters/local/toydb2/toydb.yaml new file mode 100644 index 000000000..0273ec44f --- /dev/null +++ b/clusters/local/toydb2/toydb.yaml @@ -0,0 +1,10 @@ +id: 2 +data_dir: toydb2/data +sync: false +listen_sql: 0.0.0.0:9602 +listen_raft: 0.0.0.0:9702 +peers: + 1: 127.0.0.1:9701 + 3: 127.0.0.1:9703 + 4: 127.0.0.1:9704 + 5: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-c/data/.gitkeep b/clusters/local/toydb3/data/.gitkeep similarity index 100% rename from clusters/local/toydb-c/data/.gitkeep rename to clusters/local/toydb3/data/.gitkeep diff --git a/clusters/local/toydb3/data/log b/clusters/local/toydb3/data/log new file mode 100644 index 000000000..b905d3499 Binary files /dev/null and b/clusters/local/toydb3/data/log differ diff --git a/clusters/local/toydb3/data/state b/clusters/local/toydb3/data/state new file mode 100644 index 000000000..033a480cc Binary files /dev/null and b/clusters/local/toydb3/data/state differ diff --git a/clusters/local/toydb3/toydb.yaml b/clusters/local/toydb3/toydb.yaml new file mode 100644 index 000000000..5ad3a8589 --- /dev/null +++ b/clusters/local/toydb3/toydb.yaml @@ -0,0 +1,10 @@ +id: 3 +data_dir: toydb-c/data +sync: false +listen_sql: 0.0.0.0:9603 +listen_raft: 0.0.0.0:9703 +peers: + 1: 127.0.0.1:9701 + 2: 127.0.0.1:9702 + 4: 127.0.0.1:9704 + 5: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-d/data/.gitkeep b/clusters/local/toydb4/data/.gitkeep similarity index 100% rename from clusters/local/toydb-d/data/.gitkeep rename to clusters/local/toydb4/data/.gitkeep diff --git a/clusters/local/toydb4/data/log b/clusters/local/toydb4/data/log new file mode 100644 index 000000000..9923f853c Binary files /dev/null and b/clusters/local/toydb4/data/log differ diff --git a/clusters/local/toydb4/data/state b/clusters/local/toydb4/data/state new file mode 100644 index 000000000..033a480cc Binary files /dev/null and b/clusters/local/toydb4/data/state differ diff --git a/clusters/local/toydb4/toydb.yaml b/clusters/local/toydb4/toydb.yaml new file mode 100644 index 000000000..9930f726b --- /dev/null +++ b/clusters/local/toydb4/toydb.yaml @@ -0,0 +1,10 @@ +id: 4 +data_dir: toydb-d/data +sync: false +listen_sql: 0.0.0.0:9604 +listen_raft: 0.0.0.0:9704 +peers: + 1: 127.0.0.1:9701 + 2: 127.0.0.1:9702 + 3: 127.0.0.1:9703 + 5: 127.0.0.1:9705 \ No newline at end of file diff --git a/clusters/local/toydb-e/data/.gitkeep b/clusters/local/toydb5/data/.gitkeep similarity index 100% rename from clusters/local/toydb-e/data/.gitkeep rename to clusters/local/toydb5/data/.gitkeep diff --git a/clusters/local/toydb5/data/log b/clusters/local/toydb5/data/log new file mode 100644 index 000000000..ea7eb8ecc Binary files /dev/null and b/clusters/local/toydb5/data/log differ diff --git a/clusters/local/toydb5/data/state b/clusters/local/toydb5/data/state new file mode 100644 index 000000000..033a480cc Binary files /dev/null and b/clusters/local/toydb5/data/state differ diff --git a/clusters/local/toydb5/toydb.yaml b/clusters/local/toydb5/toydb.yaml new file mode 100644 index 000000000..b2e4b1606 --- /dev/null +++ b/clusters/local/toydb5/toydb.yaml @@ -0,0 +1,10 @@ +id: 5 +data_dir: toydb-e/data +sync: false +listen_sql: 0.0.0.0:9605 +listen_raft: 0.0.0.0:9705 +peers: + 1: 127.0.0.1:9701 + 2: 127.0.0.1:9702 + 3: 127.0.0.1:9703 + 4: 127.0.0.1:9704 \ No newline at end of file diff --git a/config/toydb.yaml b/config/toydb.yaml index 37c3a05b5..a29507254 100644 --- a/config/toydb.yaml +++ b/config/toydb.yaml @@ -1,5 +1,5 @@ # The node ID, peer ID/address map (empty for single node), and log level. -id: toydb +id: 1 peers: {} log_level: INFO diff --git a/docs/examples.md b/docs/examples.md index c2d520e11..293eb8641 100644 --- a/docs/examples.md +++ b/docs/examples.md @@ -21,13 +21,13 @@ To start a five-node cluster on the local machine (requires a working ``` $ (cd clusters/local && ./run.sh) -toydb-b 19:06:28 [ INFO] Listening on 0.0.0.0:9602 (SQL) and 0.0.0.0:9702 (Raft) -toydb-b 19:06:28 [ERROR] Failed connecting to Raft peer 127.0.0.1:9705: Connection refused -toydb-e 19:06:28 [ INFO] Listening on 0.0.0.0:9605 (SQL) and 0.0.0.0:9705 (Raft) +toydb2 19:06:28 [ INFO] Listening on 0.0.0.0:9602 (SQL) and 0.0.0.0:9702 (Raft) +toydb2 19:06:28 [ERROR] Failed connecting to Raft peer 127.0.0.1:9705: Connection refused +toydb5 19:06:28 [ INFO] Listening on 0.0.0.0:9605 (SQL) and 0.0.0.0:9705 (Raft) [...] -toydb-e 19:06:29 [ INFO] Voting for toydb-d in term 1 election -toydb-c 19:06:29 [ INFO] Voting for toydb-d in term 1 election -toydb-d 19:06:29 [ INFO] Won election for term 1, becoming leader +toydb5 19:06:29 [ INFO] Voting for toydb-d in term 1 election +toydb3 19:06:29 [ INFO] Voting for toydb-d in term 1 election +toydb4 19:06:29 [ INFO] Won election for term 1, becoming leader ``` In a separate terminal, start a `toysql` client and check the server status: @@ -37,9 +37,9 @@ $ cargo run --release --bin toysql Connected to toyDB node "toydb-e". Enter !help for instructions. toydb> !status -Server: toydb-e (leader toydb-d in term 1 with 5 nodes) +Server: 5 (leader 4 in term 1 with 5 nodes) Raft log: 1 committed, 0 applied, 0.000 MB (hybrid storage) -Node logs: toydb-a:1 toydb-b:1 toydb-c:1 toydb-d:1 toydb-e:1 +Node logs: 1:1 2:1 3:1 4:1 5:1 SQL txns: 0 active, 0 total (bitcask storage) ``` diff --git a/src/bin/toydb.rs b/src/bin/toydb.rs index 2bca88784..f6b2b49c2 100644 --- a/src/bin/toydb.rs +++ b/src/bin/toydb.rs @@ -59,7 +59,7 @@ async fn main() -> Result<()> { name => return Err(Error::Config(format!("Unknown SQL storage engine {}", name))), }; - Server::new(&cfg.id, cfg.peers, raft_log, raft_state) + Server::new(cfg.id, cfg.peers, raft_log, raft_state) .await? .listen(&cfg.listen_sql, &cfg.listen_raft) .await? @@ -69,8 +69,8 @@ async fn main() -> Result<()> { #[derive(Debug, Deserialize)] struct Config { - id: String, - peers: HashMap, + id: raft::NodeID, + peers: HashMap, listen_sql: String, listen_raft: String, log_level: String, diff --git a/src/raft/log.rs b/src/raft/log.rs index 95d1370a7..fcfdd65cf 100644 --- a/src/raft/log.rs +++ b/src/raft/log.rs @@ -4,6 +4,8 @@ use crate::storage::{self, bincode, keycode}; use ::log::debug; use serde::{Deserialize, Serialize}; +use super::NodeID; + /// A log index. pub type Index = u64; @@ -125,7 +127,7 @@ impl Log { } /// Returns the last known term (0 if none), and cast vote (if any). - pub fn get_term(&mut self) -> Result<(Term, Option)> { + pub fn get_term(&mut self) -> Result<(Term, Option)> { let (term, voted_for) = self .engine .get(&Key::TermVote.encode()?)? @@ -137,7 +139,7 @@ impl Log { } /// Sets the most recent term, and cast vote (if any). - pub fn set_term(&mut self, term: Term, voted_for: Option<&str>) -> Result<()> { + pub fn set_term(&mut self, term: Term, voted_for: Option) -> Result<()> { self.engine.set(&Key::TermVote.encode()?, bincode::serialize(&(term, voted_for))?)?; self.maybe_flush() } @@ -458,8 +460,8 @@ mod tests { let mut l = setup(); assert_eq!(l.get_term()?, (0, None)); - l.set_term(1, Some("a"))?; - assert_eq!(l.get_term()?, (1, Some("a".into()))); + l.set_term(1, Some(1))?; + assert_eq!(l.get_term()?, (1, Some(1))); l.set_term(0, None)?; assert_eq!(l.get_term()?, (0, None)); diff --git a/src/raft/message.rs b/src/raft/message.rs index f37904694..75b9d1303 100644 --- a/src/raft/message.rs +++ b/src/raft/message.rs @@ -1,4 +1,4 @@ -use super::{Entry, Status}; +use super::{Entry, NodeID, Status}; use crate::error::Result; use serde_derive::{Deserialize, Serialize}; @@ -9,7 +9,7 @@ pub enum Address { /// Broadcast to all peers. Peers, /// A remote peer. - Peer(String), + Peer(NodeID), /// The local node. Local, /// A local client. diff --git a/src/raft/mod.rs b/src/raft/mod.rs index bf77d9448..d9eda8eb9 100644 --- a/src/raft/mod.rs +++ b/src/raft/mod.rs @@ -6,6 +6,6 @@ mod state; pub use self::log::{Engine, Entry, Log, Scan}; pub use message::{Address, Event, Message, Request, Response}; -pub use node::{Node, Status}; +pub use node::{Node, NodeID, Status}; pub use server::Server; pub use state::{Driver, Instruction, State}; diff --git a/src/raft/node/candidate.rs b/src/raft/node/candidate.rs index f549d0b55..105c55740 100644 --- a/src/raft/node/candidate.rs +++ b/src/raft/node/candidate.rs @@ -1,5 +1,5 @@ use super::super::{Address, Event, Message, Response}; -use super::{Follower, Leader, Node, RoleNode, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN}; +use super::{Follower, Leader, Node, NodeID, RoleNode, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN}; use crate::error::Result; use ::log::{debug, info, warn}; @@ -30,13 +30,13 @@ impl Candidate { impl RoleNode { /// Transition to follower role. - fn become_follower(mut self, term: u64, leader: &str) -> Result> { + fn become_follower(mut self, term: u64, leader: NodeID) -> Result> { info!("Discovered leader {} for term {}, following", leader, term); self.term = term; self.log.set_term(term, None)?; let mut node = self.become_role(Follower::new(Some(leader), None))?; node.abort_proxied()?; - node.forward_queued(Address::Peer(leader.to_string()))?; + node.forward_queued(Address::Peer(leader))?; Ok(node) } @@ -60,14 +60,14 @@ impl RoleNode { return Ok(self.into()); } if msg.term > self.term { - if let Address::Peer(from) = &msg.from { + if let Address::Peer(from) = msg.from { return self.become_follower(msg.term, from)?.step(msg); } } match msg.event { Event::Heartbeat { .. } => { - if let Address::Peer(from) = &msg.from { + if let Address::Peer(from) = msg.from { return self.become_follower(msg.term, from)?.step(msg); } } @@ -89,7 +89,7 @@ impl RoleNode { Event::ClientResponse { id, mut response } => { if let Ok(Response::Status(ref mut status)) = response { - status.server = self.id.clone(); + status.server = self.id; } self.proxied_reqs.remove(&id); self.send(Address::Client, Event::ClientResponse { id, response })?; @@ -147,8 +147,8 @@ mod tests { log.set_term(3, None)?; let mut node = RoleNode { - id: "a".into(), - peers: vec!["b".into(), "c".into(), "d".into(), "e".into()], + id: 1, + peers: vec![2, 3, 4, 5], term: 3, log, node_tx, @@ -176,8 +176,8 @@ mod tests { fn step_heartbeat_current_term() -> Result<()> { let (candidate, mut node_rx, mut state_rx) = setup()?; let mut node = candidate.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, })?; @@ -187,7 +187,7 @@ mod tests { vec![ Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 0, event: Event::ClientRequest { id: vec![0xaf], @@ -196,7 +196,7 @@ mod tests { }, Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, }, @@ -212,8 +212,8 @@ mod tests { fn step_heartbeat_future_term() -> Result<()> { let (candidate, mut node_rx, mut state_rx) = setup()?; let mut node = candidate.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 4, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, })?; @@ -223,7 +223,7 @@ mod tests { vec![ Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 0, event: Event::ClientRequest { id: vec![0xaf], @@ -232,7 +232,7 @@ mod tests { }, Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 4, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, }, @@ -247,8 +247,8 @@ mod tests { fn step_heartbeat_past_term() -> Result<()> { let (candidate, mut node_rx, mut state_rx) = setup()?; let mut node = candidate.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 2, event: Event::Heartbeat { commit_index: 1, commit_term: 1 }, })?; @@ -266,8 +266,8 @@ mod tests { // The first vote is not sufficient for a quorum (3 votes including self) node = node.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::GrantVote, })?; @@ -277,8 +277,8 @@ mod tests { // However, the second external vote makes us leader node = node.step(Message { - from: Address::Peer("e".into()), - to: Address::Peer("a".into()), + from: Address::Peer(5), + to: Address::Peer(1), term: 3, event: Event::GrantVote, })?; diff --git a/src/raft/node/follower.rs b/src/raft/node/follower.rs index 55f1d901c..80e6ed80b 100644 --- a/src/raft/node/follower.rs +++ b/src/raft/node/follower.rs @@ -1,5 +1,5 @@ use super::super::{Address, Event, Instruction, Message, Response}; -use super::{Candidate, Node, RoleNode, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN}; +use super::{Candidate, Node, NodeID, RoleNode, ELECTION_TIMEOUT_MAX, ELECTION_TIMEOUT_MIN}; use crate::error::Result; use ::log::{debug, info, warn}; @@ -9,21 +9,21 @@ use rand::Rng as _; #[derive(Debug)] pub struct Follower { /// The leader, or None if just initialized. - leader: Option, + leader: Option, /// The number of ticks since the last message from the leader. leader_seen_ticks: u64, /// The timeout before triggering an election. leader_seen_timeout: u64, /// The node we voted for in the current term, if any. - voted_for: Option, + voted_for: Option, } impl Follower { /// Creates a new follower role. - pub fn new(leader: Option<&str>, voted_for: Option<&str>) -> Self { + pub fn new(leader: Option, voted_for: Option) -> Self { Self { - leader: leader.map(String::from), - voted_for: voted_for.map(String::from), + leader, + voted_for, leader_seen_ticks: 0, leader_seen_timeout: rand::thread_rng() .gen_range(ELECTION_TIMEOUT_MIN..=ELECTION_TIMEOUT_MAX), @@ -44,7 +44,7 @@ impl RoleNode { } /// Transforms the node into a follower for a new leader. - fn become_follower(mut self, leader: &str, term: u64) -> Result> { + fn become_follower(mut self, leader: NodeID, term: u64) -> Result> { let mut voted_for = None; if term > self.term { info!("Discovered new term {}, following leader {}", term, leader); @@ -54,9 +54,9 @@ impl RoleNode { info!("Discovered leader {}, following", leader); voted_for = self.role.voted_for; }; - self.role = Follower::new(Some(leader), voted_for.as_deref()); + self.role = Follower::new(Some(leader), voted_for); self.abort_proxied()?; - self.forward_queued(Address::Peer(leader.to_string()))?; + self.forward_queued(Address::Peer(leader))?; Ok(self) } @@ -71,7 +71,7 @@ impl RoleNode { warn!("Ignoring invalid message: {}", err); return Ok(self.into()); } - if let Address::Peer(from) = &msg.from { + if let Address::Peer(from) = msg.from { if msg.term > self.term || self.role.leader.is_none() { return self.become_follower(from, msg.term)?.step(msg); } @@ -97,8 +97,8 @@ impl RoleNode { } Event::SolicitVote { last_index, last_term } => { - if let Some(voted_for) = &self.role.voted_for { - if msg.from != Address::Peer(voted_for.clone()) { + if let Some(voted_for) = self.role.voted_for { + if msg.from != Address::Peer(voted_for) { return Ok(self.into()); } } @@ -111,8 +111,8 @@ impl RoleNode { } if let Address::Peer(from) = msg.from { info!("Voting for {} in term {} election", from, self.term); - self.send(Address::Peer(from.clone()), Event::GrantVote)?; - self.log.set_term(self.term, Some(&from))?; + self.send(Address::Peer(from), Event::GrantVote)?; + self.log.set_term(self.term, Some(from))?; self.role.voted_for = Some(from); } } @@ -130,9 +130,9 @@ impl RoleNode { } Event::ClientRequest { ref id, .. } => { - if let Some(leader) = self.role.leader.as_deref() { + if let Some(leader) = self.role.leader { self.proxied_reqs.insert(id.clone(), msg.from); - self.send(Address::Peer(leader.to_string()), msg.event)? + self.send(Address::Peer(leader), msg.event)? } else { self.queued_reqs.push((msg.from, msg.event)); } @@ -140,7 +140,7 @@ impl RoleNode { Event::ClientResponse { id, mut response } => { if let Ok(Response::Status(ref mut status)) = response { - status.server = self.id.clone(); + status.server = self.id; } self.proxied_reqs.remove(&id); self.send(Address::Client, Event::ClientResponse { id, response })?; @@ -177,12 +177,12 @@ pub mod tests { use std::collections::HashMap; use tokio::sync::mpsc; - pub fn follower_leader(node: &RoleNode) -> Option { - node.role.leader.clone() + pub fn follower_leader(node: &RoleNode) -> Option { + node.role.leader } - pub fn follower_voted_for(node: &RoleNode) -> Option { - node.role.voted_for.clone() + pub fn follower_voted_for(node: &RoleNode) -> Option { + node.role.voted_for } #[allow(clippy::type_complexity)] @@ -201,15 +201,15 @@ pub mod tests { log.set_term(3, None)?; let node = RoleNode { - id: "a".into(), - peers: vec!["b".into(), "c".into(), "d".into(), "e".into()], + id: 1, + peers: vec![2, 3, 4, 5], term: 3, log, node_tx, state_tx, proxied_reqs: HashMap::new(), queued_reqs: Vec::new(), - role: Follower::new(Some("b"), None), + role: Follower::new(Some(2), None), }; Ok((node, node_rx, state_rx)) } @@ -219,17 +219,17 @@ pub mod tests { fn step_heartbeat() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None).committed(3); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(3); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, }], @@ -248,17 +248,17 @@ pub mod tests { fn step_heartbeat_conflict_commit_term() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 3, commit_term: 3 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None).committed(2); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: false }, }], @@ -272,17 +272,17 @@ pub mod tests { fn step_heartbeat_missing_commit_entry() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 5, commit_term: 3 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None).committed(2); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ConfirmLeader { commit_index: 5, has_committed: false }, }], @@ -296,12 +296,12 @@ pub mod tests { fn step_heartbeat_fake_leader() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 5, commit_term: 3 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None).committed(2); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2); assert_messages(&mut node_rx, vec![]); assert_messages(&mut state_rx, vec![]); Ok(()) @@ -313,17 +313,17 @@ pub mod tests { let (mut follower, mut node_rx, mut state_rx) = setup()?; follower.role = Follower::new(None, None); let mut node = follower.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("c")).voted_for(None).committed(3); + assert_node(&mut node).is_follower().term(3).leader(Some(3)).voted_for(None).committed(3); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, }], @@ -342,17 +342,17 @@ pub mod tests { fn step_heartbeat_old_commit_index() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 1, commit_term: 1 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None).committed(2); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ConfirmLeader { commit_index: 1, has_committed: true }, }], @@ -366,17 +366,17 @@ pub mod tests { fn step_heartbeat_future_term() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 4, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; - assert_node(&mut node).is_follower().term(4).leader(Some("c")).voted_for(None); + assert_node(&mut node).is_follower().term(4).leader(Some(3)).voted_for(None); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 4, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, }], @@ -395,12 +395,12 @@ pub mod tests { fn step_heartbeat_past_term() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 2, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None).committed(2); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None).committed(2); assert_messages(&mut node_rx, vec![]); assert_messages(&mut state_rx, vec![]); Ok(()) @@ -413,17 +413,17 @@ pub mod tests { // The first vote request in this term yields a vote response. let mut node = follower.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::SolicitVote { last_index: 3, last_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(Some("c")); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(Some(3)); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 3, event: Event::GrantVote, }], @@ -432,17 +432,17 @@ pub mod tests { // Another vote request from the same sender is granted. node = node.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::SolicitVote { last_index: 3, last_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(Some("c")); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(Some(3)); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 3, event: Event::GrantVote, }], @@ -451,12 +451,12 @@ pub mod tests { // But a vote request from a different node is ignored. node = node.step(Message { - from: Address::Peer("d".into()), - to: Address::Peer("a".into()), + from: Address::Peer(4), + to: Address::Peer(1), term: 3, event: Event::SolicitVote { last_index: 3, last_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(Some("c")); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(Some(3)); assert_messages(&mut node_rx, vec![]); assert_messages(&mut state_rx, vec![]); Ok(()) @@ -467,12 +467,12 @@ pub mod tests { fn step_grantvote_noop() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::GrantVote, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")); + assert_node(&mut node).is_follower().term(3).leader(Some(2)); assert_messages(&mut node_rx, vec![]); assert_messages(&mut state_rx, vec![]); Ok(()) @@ -483,12 +483,12 @@ pub mod tests { fn step_solicitvote_last_index_outdated() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::SolicitVote { last_index: 2, last_term: 2 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None); assert_messages(&mut node_rx, vec![]); assert_messages(&mut state_rx, vec![]); Ok(()) @@ -499,12 +499,12 @@ pub mod tests { fn step_solicitvote_last_term_outdated() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::SolicitVote { last_index: 3, last_term: 1 }, })?; - assert_node(&mut node).is_follower().term(3).leader(Some("b")).voted_for(None); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).voted_for(None); assert_messages(&mut node_rx, vec![]); assert_messages(&mut state_rx, vec![]); Ok(()) @@ -522,20 +522,20 @@ pub mod tests { log.append(2, Some(vec![0x03]))?; let follower = RoleNode { - id: "a".into(), - peers: vec!["b".into(), "c".into(), "d".into(), "e".into()], + id: 1, + peers: vec![2, 3, 4, 5], term: 0, log, node_tx, state_tx, proxied_reqs: HashMap::new(), queued_reqs: Vec::new(), - role: Follower::new(Some("b"), None), + role: Follower::new(Some(2), None), }; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 0, @@ -554,7 +554,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::AcceptEntries { last_index: 2 }, }], @@ -568,8 +568,8 @@ pub mod tests { fn step_replicateentries_append() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 3, @@ -591,7 +591,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::AcceptEntries { last_index: 5 }, }], @@ -605,8 +605,8 @@ pub mod tests { fn step_replicateentries_partial_overlap() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 1, @@ -627,7 +627,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::AcceptEntries { last_index: 4 }, }], @@ -641,8 +641,8 @@ pub mod tests { fn step_replicateentries_replace() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 2, @@ -663,7 +663,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::AcceptEntries { last_index: 4 }, }], @@ -677,8 +677,8 @@ pub mod tests { fn step_replicateentries_replace_partial() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 2, @@ -699,7 +699,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::AcceptEntries { last_index: 4 }, }], @@ -713,8 +713,8 @@ pub mod tests { fn step_replicateentries_reject_missing_base_index() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 5, @@ -731,7 +731,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::RejectEntries, }], @@ -745,8 +745,8 @@ pub mod tests { fn step_replicateentries_reject_missing_base_term() -> Result<()> { let (follower, mut node_rx, mut state_rx) = setup()?; let mut node = follower.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ReplicateEntries { base_index: 1, @@ -763,7 +763,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::RejectEntries, }], @@ -787,14 +787,14 @@ pub mod tests { assert_node(&mut node) .is_follower() .term(3) - .leader(Some("b")) + .leader(Some(2)) .proxied(vec![(vec![0x01], Address::Client)]) .queued(vec![]); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ClientRequest { id: vec![0x01], @@ -805,20 +805,15 @@ pub mod tests { assert_messages(&mut state_rx, vec![]); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ClientResponse { id: vec![0x01], response: Ok(Response::State(vec![0xaf])), }, })?; - assert_node(&mut node) - .is_follower() - .term(3) - .leader(Some("b")) - .proxied(vec![]) - .queued(vec![]); + assert_node(&mut node).is_follower().term(3).leader(Some(2)).proxied(vec![]).queued(vec![]); assert_messages( &mut node_rx, vec![Message { @@ -857,15 +852,15 @@ pub mod tests { // When a leader appears, we will proxy the queued request to them. node = node.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; assert_node(&mut node) .is_follower() .term(3) - .leader(Some("c")) + .leader(Some(3)) .proxied(vec![(vec![0x01], Address::Client)]) .queued(vec![]); assert_messages( @@ -873,7 +868,7 @@ pub mod tests { vec![ Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 0, event: Event::ClientRequest { id: vec![0x01], @@ -882,7 +877,7 @@ pub mod tests { }, Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 3, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, }, @@ -912,14 +907,14 @@ pub mod tests { assert_node(&mut node) .is_follower() .term(3) - .leader(Some("b")) + .leader(Some(2)) .proxied(vec![(vec![0x01], Address::Client)]) .queued(vec![]); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ClientRequest { id: vec![0x01], @@ -931,17 +926,12 @@ pub mod tests { // When a new leader appears, the proxied request is aborted. node = node.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 4, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; - assert_node(&mut node) - .is_follower() - .term(4) - .leader(Some("c")) - .proxied(vec![]) - .queued(vec![]); + assert_node(&mut node).is_follower().term(4).leader(Some(3)).proxied(vec![]).queued(vec![]); assert_messages( &mut node_rx, vec![ @@ -953,7 +943,7 @@ pub mod tests { }, Message { from: Address::Local, - to: Address::Peer("c".into()), + to: Address::Peer(3), term: 4, event: Event::ConfirmLeader { commit_index: 3, has_committed: true }, }, @@ -977,11 +967,11 @@ pub mod tests { // Make sure heartbeats reset election timeout assert!(timeout > 0); for _ in 0..(3 * timeout) { - assert_node(&mut node).is_follower().term(3).leader(Some("b")); + assert_node(&mut node).is_follower().term(3).leader(Some(2)); node = node.tick()?; node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 2, commit_term: 1 }, })?; @@ -989,7 +979,7 @@ pub mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, }], @@ -997,7 +987,7 @@ pub mod tests { } for _ in 0..timeout { - assert_node(&mut node).is_follower().term(3).leader(Some("b")); + assert_node(&mut node).is_follower().term(3).leader(Some(2)); node = node.tick()?; } assert_node(&mut node).is_candidate().term(4); diff --git a/src/raft/node/leader.rs b/src/raft/node/leader.rs index ee1538324..d5131ab63 100644 --- a/src/raft/node/leader.rs +++ b/src/raft/node/leader.rs @@ -1,5 +1,5 @@ use super::super::{Address, Event, Instruction, Message, Request, Response, Status}; -use super::{Follower, Node, RoleNode, HEARTBEAT_INTERVAL}; +use super::{Follower, Node, NodeID, RoleNode, HEARTBEAT_INTERVAL}; use crate::error::{Error, Result}; use ::log::{debug, info, warn}; @@ -11,22 +11,22 @@ pub struct Leader { /// Number of ticks since last heartbeat. heartbeat_ticks: u64, /// The next index to replicate to a peer. - peer_next_index: HashMap, + peer_next_index: HashMap, /// The last index known to be replicated on a peer. - peer_last_index: HashMap, + peer_last_index: HashMap, } impl Leader { /// Creates a new leader role. - pub fn new(peers: Vec, last_index: u64) -> Self { + pub fn new(peers: Vec, last_index: u64) -> Self { let mut leader = Self { heartbeat_ticks: 0, peer_next_index: HashMap::new(), peer_last_index: HashMap::new(), }; for peer in peers { - leader.peer_next_index.insert(peer.clone(), last_index + 1); - leader.peer_last_index.insert(peer.clone(), 0); + leader.peer_next_index.insert(peer, last_index + 1); + leader.peer_last_index.insert(peer, 0); } leader } @@ -34,7 +34,7 @@ impl Leader { impl RoleNode { /// Transforms the leader into a follower - fn become_follower(mut self, term: u64, leader: &str) -> Result> { + fn become_follower(mut self, term: u64, leader: NodeID) -> Result> { info!("Discovered new leader {} for term {}, following", leader, term); self.term = term; self.log.set_term(term, None)?; @@ -46,7 +46,7 @@ impl RoleNode { pub fn append(&mut self, command: Option>) -> Result { let index = self.log.append(self.term, command)?; for peer in self.peers.clone() { - self.replicate(&peer)?; + self.replicate(peer)?; } Ok(index) } @@ -76,11 +76,11 @@ impl RoleNode { } /// Replicates the log to a peer. - fn replicate(&mut self, peer: &str) -> Result<()> { + fn replicate(&mut self, peer: NodeID) -> Result<()> { let peer_next = self .role .peer_next_index - .get(peer) + .get(&peer) .cloned() .ok_or_else(|| Error::Internal(format!("Unknown peer {}", peer)))?; let base_index = if peer_next > 0 { peer_next - 1 } else { 0 }; @@ -91,10 +91,7 @@ impl RoleNode { }; let entries = self.log.scan(peer_next..)?.collect::>>()?; debug!("Replicating {} entries at base {} to {}", entries.len(), base_index, peer); - self.send( - Address::Peer(peer.to_string()), - Event::ReplicateEntries { base_index, base_term, entries }, - )?; + self.send(Address::Peer(peer), Event::ReplicateEntries { base_index, base_term, entries })?; Ok(()) } @@ -105,28 +102,28 @@ impl RoleNode { return Ok(self.into()); } if msg.term > self.term { - if let Address::Peer(from) = &msg.from { + if let Address::Peer(from) = msg.from { return self.become_follower(msg.term, from)?.step(msg); } } match msg.event { Event::ConfirmLeader { commit_index, has_committed } => { - if let Address::Peer(from) = msg.from.clone() { + if let Address::Peer(from) = msg.from { self.state_tx.send(Instruction::Vote { term: msg.term, index: commit_index, address: msg.from, })?; if !has_committed { - self.replicate(&from)?; + self.replicate(from)?; } } } Event::AcceptEntries { last_index } => { if let Address::Peer(from) = msg.from { - self.role.peer_last_index.insert(from.clone(), last_index); + self.role.peer_last_index.insert(from, last_index); self.role.peer_next_index.insert(from, last_index + 1); } self.commit()?; @@ -134,12 +131,12 @@ impl RoleNode { Event::RejectEntries => { if let Address::Peer(from) = msg.from { - self.role.peer_next_index.entry(from.clone()).and_modify(|i| { + self.role.peer_next_index.entry(from).and_modify(|i| { if *i > 1 { *i -= 1 } }); - self.replicate(&from)?; + self.replicate(from)?; } } @@ -174,8 +171,8 @@ impl RoleNode { Event::ClientRequest { id, request: Request::Status } => { let engine_status = self.log.status()?; let mut status = Box::new(Status { - server: self.id.clone(), - leader: self.id.clone(), + server: self.id, + leader: self.id, term: self.term, node_last_index: self.role.peer_last_index.clone(), commit_index: self.log.get_commit_index().0, @@ -183,13 +180,13 @@ impl RoleNode { storage: engine_status.name.clone(), storage_size: engine_status.size, }); - status.node_last_index.insert(self.id.clone(), self.log.get_last_index().0); + status.node_last_index.insert(self.id, self.log.get_last_index().0); self.state_tx.send(Instruction::Status { id, address: msg.from, status })? } Event::ClientResponse { id, mut response } => { if let Ok(Response::Status(ref mut status)) = response { - status.server = self.id.clone(); + status.server = self.id; } self.send(Address::Client, Event::ClientResponse { id, response })?; } @@ -237,7 +234,7 @@ mod tests { )> { let (node_tx, node_rx) = mpsc::unbounded_channel(); let (state_tx, state_rx) = mpsc::unbounded_channel(); - let peers = vec!["b".into(), "c".into(), "d".into(), "e".into()]; + let peers = vec![2, 3, 4, 5]; let mut log = Log::new(Box::new(storage::engine::Memory::new()), false)?; log.append(1, Some(vec![0x01]))?; log.append(1, Some(vec![0x02]))?; @@ -248,7 +245,7 @@ mod tests { log.set_term(3, None)?; let node = RoleNode { - id: "a".into(), + id: 1, peers: peers.clone(), term: 3, role: Leader::new(peers, log.get_last_index().0), @@ -268,8 +265,8 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ConfirmLeader { commit_index: 2, has_committed: true }, })?; @@ -277,7 +274,7 @@ mod tests { assert_messages(&mut node_rx, vec![]); assert_messages( &mut state_rx, - vec![Instruction::Vote { term: 3, index: 2, address: Address::Peer("b".into()) }], + vec![Instruction::Vote { term: 3, index: 2, address: Address::Peer(2) }], ); Ok(()) } @@ -289,8 +286,8 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::ConfirmLeader { commit_index: 2, has_committed: false }, })?; @@ -299,14 +296,14 @@ mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ReplicateEntries { base_index: 5, base_term: 3, entries: vec![] }, }], ); assert_messages( &mut state_rx, - vec![Instruction::Vote { term: 3, index: 2, address: Address::Peer("b".into()) }], + vec![Instruction::Vote { term: 3, index: 2, address: Address::Peer(2) }], ); Ok(()) } @@ -318,8 +315,8 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::Heartbeat { commit_index: 5, commit_term: 3 }, })?; @@ -336,17 +333,17 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 4, event: Event::Heartbeat { commit_index: 7, commit_term: 4 }, })?; - assert_node(&mut node).is_follower().term(4).leader(Some("b")).committed(2); + assert_node(&mut node).is_follower().term(4).leader(Some(2)).committed(2); assert_messages( &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 4, event: Event::ConfirmLeader { commit_index: 7, has_committed: false }, }], @@ -362,8 +359,8 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 2, event: Event::Heartbeat { commit_index: 3, commit_term: 2 }, })?; @@ -379,8 +376,8 @@ mod tests { let mut node: Node = leader.into(); node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::AcceptEntries { last_index: 4 }, })?; @@ -389,8 +386,8 @@ mod tests { assert_messages(&mut state_rx, vec![]); node = node.step(Message { - from: Address::Peer("c".into()), - to: Address::Peer("a".into()), + from: Address::Peer(3), + to: Address::Peer(1), term: 3, event: Event::AcceptEntries { last_index: 5 }, })?; @@ -409,8 +406,8 @@ mod tests { ); node = node.step(Message { - from: Address::Peer("d".into()), - to: Address::Peer("a".into()), + from: Address::Peer(4), + to: Address::Peer(1), term: 3, event: Event::AcceptEntries { last_index: 5 }, })?; @@ -435,8 +432,8 @@ mod tests { for _ in 0..5 { node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::AcceptEntries { last_index: 5 }, })?; @@ -457,7 +454,7 @@ mod tests { for peer in peers.into_iter() { node = node.step(Message { from: Address::Peer(peer), - to: Address::Peer("a".into()), + to: Address::Peer(1), term: 3, event: Event::AcceptEntries { last_index: 3 }, })?; @@ -478,7 +475,7 @@ mod tests { for (i, peer) in peers.into_iter().enumerate() { node = node.step(Message { from: Address::Peer(peer), - to: Address::Peer("a".into()), + to: Address::Peer(1), term: 3, event: Event::AcceptEntries { last_index: 7 }, })?; @@ -517,8 +514,8 @@ mod tests { for i in 0..(entries.len() + 3) { node = node.step(Message { - from: Address::Peer("b".into()), - to: Address::Peer("a".into()), + from: Address::Peer(2), + to: Address::Peer(1), term: 3, event: Event::RejectEntries, })?; @@ -529,7 +526,7 @@ mod tests { &mut node_rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 3, event: Event::ReplicateEntries { base_index: index as u64, @@ -649,18 +646,12 @@ mod tests { id: vec![0x01], address: Address::Client, status: Box::new(Status { - server: "a".into(), - leader: "a".into(), + server: 1, + leader: 1, term: 3, - node_last_index: vec![ - ("a".into(), 5), - ("b".into(), 0), - ("c".into(), 0), - ("d".into(), 0), - ("e".into(), 0), - ] - .into_iter() - .collect(), + node_last_index: vec![(1, 5), (2, 0), (3, 0), (4, 0), (5, 0)] + .into_iter() + .collect(), commit_index: 2, apply_index: 0, storage: "memory".into(), diff --git a/src/raft/node/mod.rs b/src/raft/node/mod.rs index 510ed3560..b9f4c17b8 100644 --- a/src/raft/node/mod.rs +++ b/src/raft/node/mod.rs @@ -13,6 +13,9 @@ use serde_derive::{Deserialize, Serialize}; use std::collections::HashMap; use tokio::sync::mpsc; +/// A node ID. +pub type NodeID = u8; + /// The interval between leader heartbeats, in ticks. const HEARTBEAT_INTERVAL: u64 = 1; @@ -25,10 +28,10 @@ const ELECTION_TIMEOUT_MAX: u64 = 15 * HEARTBEAT_INTERVAL; /// Node status #[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] pub struct Status { - pub server: String, - pub leader: String, + pub server: NodeID, + pub leader: NodeID, pub term: u64, - pub node_last_index: HashMap, + pub node_last_index: HashMap, pub commit_index: u64, pub apply_index: u64, pub storage: String, @@ -45,8 +48,8 @@ pub enum Node { impl Node { /// Creates a new Raft node, starting as a follower, or leader if no peers. pub async fn new( - id: &str, - peers: Vec, + id: NodeID, + peers: Vec, mut log: Log, mut state: Box, node_tx: mpsc::UnboundedSender, @@ -70,7 +73,7 @@ impl Node { let (term, voted_for) = log.get_term()?; let node = RoleNode { - id: id.to_owned(), + id, peers, term, log, @@ -78,7 +81,7 @@ impl Node { state_tx, queued_reqs: Vec::new(), proxied_reqs: HashMap::new(), - role: Follower::new(None, voted_for.as_deref()), + role: Follower::new(None, voted_for), }; if node.peers.is_empty() { info!("No peers specified, starting as leader"); @@ -90,11 +93,11 @@ impl Node { } /// Returns the node ID. - pub fn id(&self) -> String { + pub fn id(&self) -> NodeID { match self { - Node::Candidate(n) => n.id.clone(), - Node::Follower(n) => n.id.clone(), - Node::Leader(n) => n.id.clone(), + Node::Candidate(n) => n.id, + Node::Follower(n) => n.id, + Node::Leader(n) => n.id, } } @@ -138,8 +141,8 @@ impl From> for Node { // A Raft node with role R pub struct RoleNode { - id: String, - peers: Vec, + id: NodeID, + peers: Vec, term: u64, log: Log, node_tx: mpsc::UnboundedSender, @@ -224,8 +227,8 @@ impl RoleNode { return Err(Error::Internal(format!("Message from past term {}", msg.term))); } - match &msg.to { - Address::Peer(id) if id == &self.id => Ok(()), + match msg.to { + Address::Peer(id) if id == self.id => Ok(()), Address::Local => Ok(()), Address::Peers => Ok(()), Address::Peer(id) => { @@ -322,9 +325,9 @@ mod tests { } } - pub fn leader(self, leader: Option<&str>) -> Self { + pub fn leader(self, leader: Option) -> Self { assert_eq!( - leader.map(str::to_owned), + leader, match self.node { Node::Candidate(_) => None, Node::Follower(n) => follower_leader(n), @@ -383,9 +386,9 @@ mod tests { self } - pub fn voted_for(mut self, voted_for: Option<&str>) -> Self { + pub fn voted_for(mut self, voted_for: Option) -> Self { assert_eq!( - voted_for.map(str::to_owned), + voted_for, match self.node { Node::Candidate(_) => None, Node::Follower(n) => follower_voted_for(n), @@ -394,7 +397,7 @@ mod tests { "Unexpected voted_for" ); let (_, saved_voted_for) = self.log().get_term().unwrap(); - assert_eq!(saved_voted_for.as_deref(), voted_for, "Unexpected voted_for saved in log"); + assert_eq!(saved_voted_for, voted_for, "Unexpected voted_for saved in log"); self } } @@ -404,17 +407,17 @@ mod tests { } fn setup_rolenode() -> Result<(RoleNode<()>, mpsc::UnboundedReceiver)> { - setup_rolenode_peers(vec!["b".into(), "c".into()]) + setup_rolenode_peers(vec![2, 3]) } fn setup_rolenode_peers( - peers: Vec, + peers: Vec, ) -> Result<(RoleNode<()>, mpsc::UnboundedReceiver)> { let (node_tx, node_rx) = mpsc::unbounded_channel(); let (state_tx, _) = mpsc::unbounded_channel(); let node = RoleNode { role: (), - id: "a".into(), + id: 1, peers, term: 1, log: Log::new(Box::new(storage::engine::Memory::new()), false)?, @@ -430,8 +433,8 @@ mod tests { async fn new() -> Result<()> { let (node_tx, _) = mpsc::unbounded_channel(); let node = Node::new( - "a", - vec!["b".into(), "c".into()], + 1, + vec![2, 3], Log::new(Box::new(storage::engine::Memory::new()), false)?, Box::new(TestState::new(0)), node_tx, @@ -439,9 +442,9 @@ mod tests { .await?; match node { Node::Follower(rolenode) => { - assert_eq!(rolenode.id, "a".to_owned()); + assert_eq!(rolenode.id, 1); assert_eq!(rolenode.term, 0); - assert_eq!(rolenode.peers, vec!["b".to_owned(), "c".to_owned()]); + assert_eq!(rolenode.peers, vec![2, 3]); } _ => panic!("Expected node to start as follower"), } @@ -459,7 +462,7 @@ mod tests { log.append(2, Some(vec![0x03]))?; let state = Box::new(TestState::new(0)); - Node::new("a", vec!["b".into(), "c".into()], log, state.clone(), node_tx).await?; + Node::new(1, vec![2, 3], log, state.clone(), node_tx).await?; tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert_eq!(state.list(), vec![vec![0x01], vec![0x02]]); assert_eq!(state.applied_index(), 3); @@ -477,7 +480,7 @@ mod tests { log.append(2, Some(vec![0x03]))?; let state = Box::new(TestState::new(2)); - Node::new("a", vec!["b".into(), "c".into()], log, state.clone(), node_tx).await?; + Node::new(1, vec![2, 3], log, state.clone(), node_tx).await?; tokio::time::sleep(std::time::Duration::from_millis(100)).await; assert_eq!(state.list(), vec![vec![0x02]]); assert_eq!(state.applied_index(), 3); @@ -496,7 +499,7 @@ mod tests { let state = Box::new(TestState::new(4)); assert_eq!( - Node::new("a", vec!["b".into(), "c".into()], log, state.clone(), node_tx).await.err(), + Node::new(1, vec![2, 3], log, state.clone(), node_tx).await.err(), Some(Error::Internal( "State machine applied index 4 greater than log committed index 3".into() )) @@ -508,7 +511,7 @@ mod tests { async fn new_single() -> Result<()> { let (node_tx, _) = mpsc::unbounded_channel(); let node = Node::new( - "a", + 1, vec![], Log::new(Box::new(storage::engine::Memory::new()), false)?, Box::new(TestState::new(0)), @@ -517,7 +520,7 @@ mod tests { .await?; match node { Node::Leader(rolenode) => { - assert_eq!(rolenode.id, "a".to_owned()); + assert_eq!(rolenode.id, 1); assert_eq!(rolenode.term, 0); assert!(rolenode.peers.is_empty()); } @@ -530,9 +533,9 @@ mod tests { fn become_role() -> Result<()> { let (node, _) = setup_rolenode()?; let new = node.become_role("role")?; - assert_eq!(new.id, "a".to_owned()); + assert_eq!(new.id, 1); assert_eq!(new.term, 1); - assert_eq!(new.peers, vec!["b".to_owned(), "c".to_owned()]); + assert_eq!(new.peers, vec![2, 3]); assert_eq!(new.role, "role"); Ok(()) } @@ -541,8 +544,7 @@ mod tests { fn quorum() -> Result<()> { let quorums = vec![(1, 1), (2, 2), (3, 2), (4, 3), (5, 3), (6, 4), (7, 4), (8, 5)]; for (size, quorum) in quorums.into_iter() { - let peers: Vec = - (0..(size as u8 - 1)).map(|i| (i as char).to_string()).collect(); + let peers: Vec = (1..(size as u8)).collect(); assert_eq!(peers.len(), size as usize - 1); let (node, _) = setup_rolenode_peers(peers)?; assert_eq!(node.quorum(), quorum); @@ -553,12 +555,12 @@ mod tests { #[test] fn send() -> Result<()> { let (node, mut rx) = setup_rolenode()?; - node.send(Address::Peer("b".into()), Event::Heartbeat { commit_index: 1, commit_term: 1 })?; + node.send(Address::Peer(2), Event::Heartbeat { commit_index: 1, commit_term: 1 })?; assert_messages( &mut rx, vec![Message { from: Address::Local, - to: Address::Peer("b".into()), + to: Address::Peer(2), term: 1, event: Event::Heartbeat { commit_index: 1, commit_term: 1 }, }], diff --git a/src/raft/server.rs b/src/raft/server.rs index b898ecb4b..d91ba2eb4 100644 --- a/src/raft/server.rs +++ b/src/raft/server.rs @@ -1,4 +1,4 @@ -use super::{Address, Event, Log, Message, Node, Request, Response, State}; +use super::{Address, Event, Log, Message, Node, NodeID, Request, Response, State}; use crate::error::{Error, Result}; use ::log::{debug, error}; @@ -18,22 +18,21 @@ const TICK: Duration = Duration::from_millis(100); /// A Raft server. pub struct Server { node: Node, - peers: HashMap, + peers: HashMap, node_rx: mpsc::UnboundedReceiver, } impl Server { /// Creates a new Raft cluster pub async fn new( - id: &str, - peers: HashMap, + id: NodeID, + peers: HashMap, log: Log, state: Box, ) -> Result { let (node_tx, node_rx) = mpsc::unbounded_channel(); Ok(Self { - node: Node::new(id, peers.keys().map(|k| k.to_string()).collect(), log, state, node_tx) - .await?, + node: Node::new(id, peers.keys().copied().collect(), log, state, node_tx).await?, peers, node_rx, }) @@ -147,12 +146,12 @@ impl Server { /// Sends outbound messages to peers via TCP. async fn tcp_send( - node_id: String, - peers: HashMap, + node_id: NodeID, + peers: HashMap, out_rx: mpsc::UnboundedReceiver, ) -> Result<()> { let mut out_rx = UnboundedReceiverStream::new(out_rx); - let mut peer_txs: HashMap> = HashMap::new(); + let mut peer_txs: HashMap> = HashMap::new(); for (id, addr) in peers.into_iter() { let (tx, rx) = mpsc::channel::(1000); @@ -162,11 +161,11 @@ impl Server { while let Some(mut message) = out_rx.next().await { if message.from == Address::Local { - message.from = Address::Peer(node_id.clone()) + message.from = Address::Peer(node_id) } - let to = match &message.to { - Address::Peers => peer_txs.keys().cloned().collect(), - Address::Peer(peer) => vec![peer.to_string()], + let to = match message.to { + Address::Peers => peer_txs.keys().copied().collect(), + Address::Peer(peer) => vec![peer], addr => { error!("Received outbound message for non-TCP address {:?}", addr); continue; diff --git a/src/raft/state.rs b/src/raft/state.rs index c376be751..1dbf94e2c 100644 --- a/src/raft/state.rs +++ b/src/raft/state.rs @@ -310,7 +310,7 @@ pub mod tests { state_tx.send(Instruction::Notify { id: vec![0x01], index: 1, - address: Address::Peer("a".into()), + address: Address::Peer(1), })?; state_tx.send(Instruction::Query { id: vec![0x02], @@ -330,7 +330,7 @@ pub mod tests { vec![ Message { from: Address::Local, - to: Address::Peer("a".into()), + to: Address::Peer(1), term: 0, event: Event::ClientResponse { id: vec![0x01], response: Err(Error::Abort) } }, @@ -398,11 +398,7 @@ pub mod tests { entry: Entry { index: 1, term: 2, command: Some(vec![0xaf]) }, })?; state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Local })?; - state_tx.send(Instruction::Vote { - term: 2, - index: 1, - address: Address::Peer("a".into()), - })?; + state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Peer(1) })?; std::mem::drop(state_tx); let node_rx = UnboundedReceiverStream::new(node_rx); @@ -439,11 +435,7 @@ pub mod tests { entry: Entry { index: 1, term: 1, command: Some(vec![0xaf]) }, })?; state_tx.send(Instruction::Vote { term: 2, index: 1, address: Address::Local })?; - state_tx.send(Instruction::Vote { - term: 1, - index: 1, - address: Address::Peer("a".into()), - })?; + state_tx.send(Instruction::Vote { term: 1, index: 1, address: Address::Peer(1) })?; std::mem::drop(state_tx); let node_rx = UnboundedReceiverStream::new(node_rx); diff --git a/src/server.rs b/src/server.rs index 10e35f7e0..d0d54175b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -26,8 +26,8 @@ pub struct Server { impl Server { /// Creates a new toyDB server. pub async fn new( - id: &str, - peers: HashMap, + id: raft::NodeID, + peers: HashMap, raft_log: raft::Log, raft_state: Box, ) -> Result { diff --git a/tests/client/mod.rs b/tests/client/mod.rs index f2c805582..762425f35 100644 --- a/tests/client/mod.rs +++ b/tests/client/mod.rs @@ -122,10 +122,10 @@ async fn status() -> Result<()> { c.status().await?, Status { raft: raft::Status { - server: "test".into(), - leader: "test".into(), + server: 1, + leader: 1, term: 0, - node_last_index: vec![("test".to_string(), 26)].into_iter().collect(), + node_last_index: vec![(1, 26)].into_iter().collect(), commit_index: 26, apply_index: 26, storage: "bitcask".into(), diff --git a/tests/client/pool.rs b/tests/client/pool.rs index 5d3804b1f..a663dd5ae 100644 --- a/tests/client/pool.rs +++ b/tests/client/pool.rs @@ -28,10 +28,7 @@ async fn get() -> Result<()> { servers.insert(client.status().await?.raft.server); ids.insert(client.id()); } - assert_eq!( - servers, - HashSet::from_iter(vec!["toydb0".into(), "toydb1".into(), "toydb2".into()]) - ); + assert_eq!(servers, HashSet::from_iter(vec![1, 2, 3])); assert_eq!(ids, HashSet::from_iter(vec![0, 1, 2, 3, 4])); // Further clients won't be ready diff --git a/tests/setup.rs b/tests/setup.rs index 11600ed4a..d4a9301d3 100644 --- a/tests/setup.rs +++ b/tests/setup.rs @@ -69,10 +69,10 @@ pub fn simple() -> Vec<&'static str> { /// Sets up a test server pub async fn server( - id: &str, + id: raft::NodeID, addr_sql: &str, addr_raft: &str, - peers: HashMap, + peers: HashMap, ) -> Result { let dir = TempDir::new("toydb")?; let mut srv = Server::new( @@ -95,7 +95,7 @@ pub async fn server( /// Sets up a server with a client pub async fn server_with_client(queries: Vec<&str>) -> Result<(Client, Teardown)> { - let teardown = server("test", "127.0.0.1:9605", "127.0.0.1:9705", HashMap::new()).await?; + let teardown = server(1, "127.0.0.1:9605", "127.0.0.1:9705", HashMap::new()).await?; let client = Client::new("127.0.0.1:9605").await?; if !queries.is_empty() { client.execute("BEGIN").await?; @@ -108,29 +108,26 @@ pub async fn server_with_client(queries: Vec<&str>) -> Result<(Client, Teardown) } /// Sets up a server cluster -pub async fn cluster(nodes: HashMap) -> Result { +pub async fn cluster(nodes: HashMap) -> Result { let mut teardown = Teardown::empty(); for (id, (addr_sql, addr_raft)) in nodes.iter() { let peers = nodes .iter() .filter(|(i, _)| i != &id) - .map(|(id, (_, raft))| (id.clone(), raft.clone())) + .map(|(id, (_, raft))| (*id, raft.clone())) .collect(); - teardown.merge(server(id, addr_sql, addr_raft, peers).await?); + teardown.merge(server(*id, addr_sql, addr_raft, peers).await?); } Ok(teardown) } /// Sets up a server cluster with clients -pub async fn cluster_with_clients( - size: u64, - queries: Vec<&str>, -) -> Result<(Vec, Teardown)> { +pub async fn cluster_with_clients(size: u8, queries: Vec<&str>) -> Result<(Vec, Teardown)> { let mut nodes = HashMap::new(); - for i in 0..size { + for i in 1..=size { nodes.insert( - format!("toydb{}", i), - (format!("127.0.0.1:{}", 9605 + i), format!("127.0.0.1:{}", 9705 + i)), + i, + (format!("127.0.0.1:{}", 9605 + i as u64), format!("127.0.0.1:{}", 9705 + i as u64)), ); } let teardown = cluster(nodes.clone()).await?; @@ -156,15 +153,15 @@ pub async fn cluster_with_clients( /// Sets up a server cluster with a client pool pub async fn cluster_with_pool( - cluster_size: u64, + cluster_size: u8, pool_size: u64, queries: Vec<&str>, ) -> Result<(Pool, Teardown)> { let mut nodes = HashMap::new(); - for i in 0..cluster_size { + for i in 1..=cluster_size { nodes.insert( - format!("toydb{}", i), - (format!("127.0.0.1:{}", 9605 + i), format!("127.0.0.1:{}", 9705 + i)), + i, + (format!("127.0.0.1:{}", 9605 + i as u64), format!("127.0.0.1:{}", 9705 + i as u64)), ); } let teardown = cluster(nodes.clone()).await?;