Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: change PersistentStorage to KvStorageInterface trait object to allow user choose MemStorage #522

Merged
merged 4 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ docs
.DS_Store

pkg/
temp/
tmp/
data/
nohup.out
Expand Down
111 changes: 42 additions & 69 deletions crates/core/src/dht/chord.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Chord algorithm implement.
#![warn(missing_docs)]
use std::str::FromStr;
use std::sync::Arc;
use std::sync::Mutex;
use std::sync::MutexGuard;
Expand All @@ -25,17 +26,24 @@ use crate::dht::SuccessorReader;
use crate::dht::SuccessorWriter;
use crate::error::Error;
use crate::error::Result;
use crate::storage::KvStorageInterface;
use crate::storage::MemStorage;
use crate::storage::PersistenceStorage;
use crate::storage::PersistenceStorageReadAndWrite;
use crate::storage::PersistenceStorageRemove;

/// `VNodeStorage` is the type accepted by `PeerRing::new_with_storage`.
/// It's used to store [VirtualNode]s in a storage media provided by user.
#[cfg(feature = "wasm")]
pub type VNodeStorage = Box<dyn KvStorageInterface<VirtualNode>>;

/// `VNodeStorage` is the type accepted by `PeerRing::new_with_storage`.
/// It's used to store [VirtualNode]s in a storage media provided by user.
#[cfg(not(feature = "wasm"))]
pub type VNodeStorage = Box<dyn KvStorageInterface<VirtualNode> + Send + Sync>;

/// PeerRing is used to help a node interact with other nodes.
/// All nodes in rings network form a clockwise ring in the order of Did.
/// This struct takes its name from that.
/// PeerRing implemented [Chord] algorithm.
/// PeerRing implemented [ChordStorage] protocol.
#[derive(Clone)]
pub struct PeerRing {
/// The did of current node.
pub did: Did,
Expand All @@ -48,9 +56,9 @@ pub struct PeerRing {
/// The did of previous node on the ring.
pub predecessor: Arc<Mutex<Option<Did>>>,
/// Local storage for [ChordStorage].
pub storage: Arc<PersistenceStorage>,
pub storage: VNodeStorage,
/// Local cache for [ChordStorage].
pub cache: Arc<MemStorage<Did, VirtualNode>>,
pub cache: VNodeStorage,
}

/// Type alias is just for making the code easy to read.
Expand Down Expand Up @@ -184,14 +192,14 @@ impl From<Vec<PeerRingAction>> for PeerRingAction {

impl PeerRing {
/// Same as new with config, but with a given storage.
pub fn new_with_storage(did: Did, succ_max: u8, storage: PersistenceStorage) -> Self {
pub fn new_with_storage(did: Did, succ_max: u8, storage: VNodeStorage) -> Self {
Self {
successor_seq: SuccessorSeq::new(did, succ_max),
predecessor: Arc::new(Mutex::new(None)),
// for Eth address, it's 160
finger: Arc::new(Mutex::new(FingerTable::new(did, 160))),
storage: Arc::new(storage),
cache: Arc::new(MemStorage::<Did, VirtualNode>::new()),
storage,
cache: Box::new(MemStorage::new()),
did,
}
}
Expand Down Expand Up @@ -386,7 +394,7 @@ impl<const REDUNDANT: u16> ChordStorage<PeerRingAction, REDUNDANT> for PeerRing
for vid in vid.rotate_affine(REDUNDANT) {
let maybe_act = match self.find_successor(vid) {
// Resource should be stored in current node.
Ok(PeerRingAction::Some(succ)) => match self.storage.get(&vid).await {
Ok(PeerRingAction::Some(succ)) => match self.storage.get(&vid.to_string()).await {
Ok(Some(v)) => Ok(PeerRingAction::SomeVNode(v)),
Ok(None) => {
tracing::debug!(
Expand Down Expand Up @@ -437,13 +445,13 @@ impl<const REDUNDANT: u16> ChordStorage<PeerRingAction, REDUNDANT> for PeerRing
let maybe_act = match self.find_successor(vid) {
// `vnode` should be on current node.
Ok(PeerRingAction::Some(_)) => {
let this = if let Ok(Some(this)) = self.storage.get(&vid).await {
let this = if let Ok(Some(this)) = self.storage.get(&vid.to_string()).await {
Ok(this)
} else {
op.clone().gen_default_vnode()
}?;
let vnode = this.operate(op.clone())?;
self.storage.put(&vid, &vnode).await?;
self.storage.put(&vid.to_string(), &vnode).await?;
Ok(PeerRingAction::None)
}
// `vnode` should be on other nodes.
Expand Down Expand Up @@ -472,11 +480,13 @@ impl ChordStorageSync<PeerRingAction> for PeerRing {
/// and sync them to the new successor.
async fn sync_vnode_with_successor(&self, new_successor: Did) -> Result<PeerRingAction> {
let mut data = Vec::<VirtualNode>::new();
let all_items: Vec<(Did, VirtualNode)> = self.storage.get_all().await?;
let all_items: Vec<(String, VirtualNode)> = self.storage.get_all().await?;

// Pop out all items that are not between current node and `new_successor`.
for (vid, vnode) in all_items.iter() {
if self.bias(*vid) > self.bias(new_successor) && self.storage.remove(vid).await.is_ok()
for (vid_str, vnode) in all_items.iter() {
let vid = Did::from_str(vid_str)?;
if self.bias(vid) > self.bias(new_successor)
&& self.storage.remove(vid_str).await.is_ok()
{
data.push(vnode.clone());
}
Expand All @@ -497,13 +507,13 @@ impl ChordStorageSync<PeerRingAction> for PeerRing {
#[cfg_attr(not(feature = "wasm"), async_trait)]
impl ChordStorageCache<PeerRingAction> for PeerRing {
/// Cache fetched `vnode` locally.
fn local_cache_set(&self, vnode: VirtualNode) {
self.cache.set(&vnode.did.clone(), vnode);
async fn local_cache_put(&self, vnode: VirtualNode) -> Result<()> {
self.cache.put(&vnode.did.to_string(), &vnode).await
}

/// Get vnode from local cache.
fn local_cache_get(&self, vid: Did) -> Option<VirtualNode> {
self.cache.get(&vid)
async fn local_cache_get(&self, vid: Did) -> Result<Option<VirtualNode>> {
self.cache.get(&vid.to_string()).await
}
}

Expand Down Expand Up @@ -630,20 +640,6 @@ mod tests {

#[tokio::test]
async fn test_chord_finger() -> Result<()> {
let db_path_a = PersistenceStorage::random_path("./tmp");
let db_path_b = PersistenceStorage::random_path("./tmp");
let db_path_c = PersistenceStorage::random_path("./tmp");

let db_1 = PersistenceStorage::new_with_path(db_path_a.as_str())
.await
.unwrap();
let db_2 = PersistenceStorage::new_with_path(db_path_b.as_str())
.await
.unwrap();
let db_3 = PersistenceStorage::new_with_path(db_path_c.as_str())
.await
.unwrap();

// Setup did a, b, c, d in a clockwise order.
let a = Did::from_str("0x00E807fcc88dD319270493fB2e822e388Fe36ab0").unwrap();
let b = Did::from_str("0x119999cf1046e68e36E1aA2E0E07105eDDD1f08E").unwrap();
Expand All @@ -665,7 +661,7 @@ mod tests {
assert_eq!(seq, vec![a, b, c, d]);

// Setup node_a and ensure its successor sequence and finger table is empty.
let node_a = PeerRing::new_with_storage(a, 3, db_1);
let node_a = PeerRing::new_with_storage(a, 3, Box::new(MemStorage::new()));
assert!(node_a.successors().is_empty()?);
assert!(node_a.lock_finger()?.is_empty());

Expand Down Expand Up @@ -745,7 +741,7 @@ mod tests {
);

// Since the test above is clockwise, we need to test anti-clockwise situation.
let node_a = PeerRing::new_with_storage(a, 3, db_2);
let node_a = PeerRing::new_with_storage(a, 3, Box::new(MemStorage::new()));

// Test join ring with node_c.
assert_eq!(
Expand All @@ -767,7 +763,7 @@ mod tests {
assert_eq!(node_a.successors().list()?, vec![b, c]);

// Test join over half ring.
let node_d = PeerRing::new_with_storage(d, 1, db_3);
let node_d = PeerRing::new_with_storage(d, 1, Box::new(MemStorage::new()));
assert_eq!(
node_d.join(a)?,
PeerRingAction::RemoteAction(a, RemoteAction::FindSuccessorForConnect(d))
Expand Down Expand Up @@ -810,7 +806,6 @@ mod tests {
// Because node_a is closer to node_d, and the sequence is full.
assert_eq!(node_d.successors().list()?, vec![a]);

tokio::fs::remove_dir_all("./tmp").await.ok();
Ok(())
}

Expand All @@ -823,16 +818,8 @@ mod tests {
}
let did1: Did = key1.address().into();
let did2: Did = key2.address().into();
let db_path1 = PersistenceStorage::random_path("./tmp");
let db_path2 = PersistenceStorage::random_path("./tmp");
let db_1 = PersistenceStorage::new_with_path(db_path1.as_str())
.await
.unwrap();
let db_2 = PersistenceStorage::new_with_path(db_path2.as_str())
.await
.unwrap();
let node1 = PeerRing::new_with_storage(did1, 3, db_1);
let node2 = PeerRing::new_with_storage(did2, 3, db_2);
let node1 = PeerRing::new_with_storage(did1, 3, Box::new(MemStorage::new()));
let node2 = PeerRing::new_with_storage(did2, 3, Box::new(MemStorage::new()));

node1.join(did2)?;
node2.join(did1)?;
Expand All @@ -851,7 +838,6 @@ mod tests {
did1,
did2
);
tokio::fs::remove_dir_all("./tmp").await.ok();

Ok(())
}
Expand All @@ -863,16 +849,8 @@ mod tests {
let max = Did::from(BigUint::from(2u16).pow(160) - 1u16);
let zero = Did::from(BigUint::from(2u16).pow(160));

let db_path1 = PersistenceStorage::random_path("./tmp");
let db_path2 = PersistenceStorage::random_path("./tmp");
let db_1 = PersistenceStorage::new_with_path(db_path1.as_str())
.await
.unwrap();
let db_2 = PersistenceStorage::new_with_path(db_path2.as_str())
.await
.unwrap();
let node1 = PeerRing::new_with_storage(did1, 3, db_1);
let node2 = PeerRing::new_with_storage(did2, 3, db_2);
let node1 = PeerRing::new_with_storage(did1, 3, Box::new(MemStorage::new()));
let node2 = PeerRing::new_with_storage(did2, 3, Box::new(MemStorage::new()));

node1.join(did2)?;
node2.join(did1)?;
Expand All @@ -897,7 +875,6 @@ mod tests {
did2,
did1
);
tokio::fs::remove_dir_all("./tmp").await.ok();

Ok(())
}
Expand Down Expand Up @@ -926,31 +903,27 @@ mod tests {
}

let dhts = gen_sorted_dht(5).await;
let (n1, n2, n3, n4, n5) = (
dhts[0].clone(),
dhts[1].clone(),
dhts[2].clone(),
dhts[3].clone(),
dhts[4].clone(),
);
let [n1, n2, n3, n4, n5] = dhts.as_slice() else {
panic!("wrong dhts length");
};
// we now have:
// n1 < n2 < n3 < n4

// n1 join n2
n1.join(n2.did).unwrap();
n2.join(n1.did).unwrap();
// for now n1, n2 are `mutual successors`.
check_is_mutual_successors(&n1, &n2);
check_is_mutual_successors(n1, n2);
// n1 join n3

n1.join(n3.did).unwrap();
n1.join(n4.did).unwrap();
// for now n1's successor should include n1 and n3
check_succ_is_including(&n1, vec![n2.did, n3.did, n4.did]);
check_succ_is_including(n1, vec![n2.did, n3.did, n4.did]);

n1.join(n5.did).unwrap();
// n5 is not in n1's successor list
assert!(!assert_successor(&n1, &n5.did));
assert!(!assert_successor(n1, &n5.did));

#[cfg_attr(feature = "wasm", async_trait(?Send))]
#[cfg_attr(not(feature = "wasm"), async_trait)]
Expand Down
28 changes: 15 additions & 13 deletions crates/core/src/dht/mod.rs
Original file line number Diff line number Diff line change
@@ -1,35 +1,37 @@
#![warn(missing_docs)]
//! Implementation of Ring's DHT
//!
//! which is based on CHORD, ref: <https://pdos.csail.mit.edu/papers/ton:chord/paper-ton.pdf>
//! With high probability, the number of nodes that must be contacted to find a successor in an N-node network is O(log N).
pub mod did;
pub use did::Did;

mod chord;
pub use chord::TopoInfo;
pub mod did;
/// Finger table for Rings
pub mod finger;
mod stabilization;
/// Implement Subring with VNode
pub mod subring;
pub mod successor;
pub use successor::SuccessorReader;
pub use successor::SuccessorWriter;
pub mod types;
/// VNode is a special node that only has virtual address
pub mod vnode;

pub use chord::PeerRing;
pub use chord::PeerRingAction;
pub use chord::RemoteAction as PeerRingRemoteAction;
pub use chord::TopoInfo;
pub use chord::VNodeStorage;
pub use did::Did;
pub use finger::FingerTable;
pub use stabilization::Stabilization;
pub use stabilization::TStabilize;
pub use successor::SuccessorReader;
pub use successor::SuccessorWriter;
pub use types::Chord;
pub use types::ChordStorage;
pub use types::ChordStorageCache;
pub use types::ChordStorageSync;
pub use types::CorrectChord;
pub use types::LiveDid;
mod stabilization;
pub use stabilization::Stabilization;
pub use stabilization::TStabilize;
/// Implement Subring with VNode
pub mod subring;
/// VNode is a special node that only has virtual address
pub mod vnode;

#[cfg(test)]
pub mod tests {
Expand Down
6 changes: 3 additions & 3 deletions crates/core/src/dht/stabilization.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ pub mod tests {
let key1 = SecretKey::random();
let key2 = SecretKey::random();
let key3 = SecretKey::random();
let (node1, _) = prepare_node(key1).await;
let (node2, _) = prepare_node(key2).await;
let (node3, _) = prepare_node(key3).await;
let node1 = prepare_node(key1).await;
let node2 = prepare_node(key2).await;
let node3 = prepare_node(key3).await;

// Shouldn't listen to message handler here,
// otherwise it will automatically remove disconnected transport.
Expand Down
4 changes: 2 additions & 2 deletions crates/core/src/dht/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ pub trait ChordStorageSync<Action>: Chord<Action> {
#[cfg_attr(not(feature = "wasm"), async_trait)]
pub trait ChordStorageCache<Action>: Chord<Action> {
/// Cache fetched resource locally.
fn local_cache_set(&self, vnode: VirtualNode);
async fn local_cache_put(&self, vnode: VirtualNode) -> Result<()>;
/// Get local cache.
fn local_cache_get(&self, vid: Did) -> Option<VirtualNode>;
async fn local_cache_get(&self, vid: Did) -> Result<Option<VirtualNode>>;
}

/// Chord online correction that inspired by Pamela Zave's work.
Expand Down
Loading