Skip to content

Commit

Permalink
Move all algorithms to algos module
Browse files Browse the repository at this point in the history
  • Loading branch information
DrChat committed Jan 18, 2025
1 parent c045e02 commit e52792b
Showing 1 changed file with 151 additions and 128 deletions.
279 changes: 151 additions & 128 deletions atrium-repo/src/mst.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,116 +325,38 @@ mod algos {

Ok((left, right))
}
}

// https://users.rust-lang.org/t/how-to-find-common-prefix-of-two-byte-slices-effectively/25815/3
fn prefix(xs: &[u8], ys: &[u8]) -> usize {
prefix_chunks::<128>(xs, ys)
}

fn prefix_chunks<const N: usize>(xs: &[u8], ys: &[u8]) -> usize {
// N.B: We take exact chunks here to entice the compiler to autovectorize this loop.
let off =
std::iter::zip(xs.chunks_exact(N), ys.chunks_exact(N)).take_while(|(x, y)| x == y).count()
* N;
off + std::iter::zip(&xs[off..], &ys[off..]).take_while(|(x, y)| x == y).count()
}

/// Calculate the number of leading zeroes from the sha256 hash of a byte array
///
/// Reference: https://github.com/bluesky-social/atproto/blob/13636ba963225407f63c20253b983a92dcfe1bfa/packages/repo/src/mst/util.ts#L8-L23
fn leading_zeroes(key: &[u8]) -> usize {
let digest = sha2::Sha256::digest(key);
let mut zeroes = 0;

for byte in digest.iter() {
zeroes += (*byte < 0b0100_0000) as usize; // 64
zeroes += (*byte < 0b0001_0000) as usize; // 16
zeroes += (*byte < 0b0000_0100) as usize; // 4
zeroes += (*byte < 0b0000_0001) as usize; // 1

if *byte != 0 {
// If the byte is nonzero, then there cannot be any more leading zeroes.
break;
}
}

zeroes
}

/// A merkle search tree data structure, backed by storage implementing
/// [AsyncBlockStoreRead] and optionally [AsyncBlockStoreWrite].
///
/// This data structure is merely a convenience structure that implements
/// algorithms that handle certain common operations one may want to perform
/// against a MST.
///
/// The structure does not actually load the merkle search tree into memory
/// or perform any deep copies. The tree itself lives entirely inside of the
/// provided backing storage. This also carries the implication that any operation
/// performed against the tree will have performance that reflects that of accesses
/// to the backing storage.
///
/// If your backing storage is implemented by a cloud service, such as a
/// database or block storage service, you will likely want to insert a
/// caching layer in your block storage to ensure that performance remains
/// fast.
///
/// ---
///
/// There are two factors that determine the placement of nodes inside of
/// a merkle search tree:
/// - The number of leading zeroes in the SHA256 hash of the key
/// - The key's lexicographic position inside of a layer
///
/// # Reference
/// * Official documentation: https://atproto.com/guides/data-repos
/// * Useful reading: https://interjectedfuture.com/crdts-turned-inside-out/
pub struct Tree<S> {
storage: S,
root: Cid,
}

// N.B: It's trivial to clone the tree if it's trivial to clone the backing storage,
// so implement clone if the storage also implements it.
impl<S: Clone> Clone for Tree<S> {
fn clone(&self) -> Self {
Self { storage: self.storage.clone(), root: self.root.clone() }
}
}

impl<S> std::fmt::Debug for Tree<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree").field("root", &self.root).finish_non_exhaustive()
}
}

impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
/// Create a new MST with an empty root node
pub async fn create(mut storage: S) -> Result<Self, Error> {
let node = Node { entries: vec![] };
let cid = node.serialize_into(&mut storage).await?;

Ok(Self { storage, root: cid })
/// Prune entries that contain a single nested tree entry from the root.
pub async fn prune(
mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
root: Cid,
) -> Result<Cid, Error> {
let (_node_path, cid) = algos::traverse(&mut bs, root, algos::traverse_prune()).await?;
Ok(cid)
}

pub async fn add(&mut self, key: &str, value: Cid) -> Result<(), Error> {
pub async fn add(
mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
root: Cid,
key: &str,
value: Cid,
) -> Result<Cid, Error> {
// Compute the layer where this note should be added.
let target_layer = leading_zeroes(key.as_bytes());

// Now traverse to the node containing the target layer.
let mut node_path = vec![];
let mut node_cid = self.root.clone();
let mut node_cid = root.clone();

// There are three cases we need to handle:
// 1) The target layer is above the tree (and our entire tree needs to be pushed down).
// 2) The target layer is contained within the tree (and we will traverse to find it).
// 3) The tree is currently empty (trivial).
let mut node = match self.depth(None).await {
let mut node = match compute_depth(&mut bs, root).await {
Ok(Some(layer)) => {
match layer.cmp(&target_layer) {
// The new key can be inserted into the root node.
Ordering::Equal => Node::read_from(&mut self.storage, node_cid).await?,
Ordering::Equal => Node::read_from(&mut bs, node_cid).await?,
// The entire tree needs to be shifted down.
Ordering::Less => {
let mut layer = layer + 1;
Expand All @@ -443,7 +365,7 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
let node = Node { entries: vec![NodeEntry::Tree(node_cid)] };

if layer < target_layer {
node_cid = node.serialize_into(&mut self.storage).await?;
node_cid = node.serialize_into(&mut bs).await?;
layer += 1;
} else {
break node;
Expand All @@ -456,7 +378,7 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {

// Traverse to the lowest possible layer in the tree.
let (path, (mut node, partition)) =
algos::traverse(&mut self.storage, node_cid, |node, _cid| {
algos::traverse(&mut bs, node_cid, |node, _cid| {
if layer == target_layer {
Ok(algos::TraverseAction::Stop((node, 0)))
} else {
Expand Down Expand Up @@ -530,8 +452,7 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
}
Some(NodeEntry::Tree(e)) => {
// Need to split the subtree into two based on the node's key.
let (left, right) =
algos::split_subtree(&mut self.storage, e.clone(), key).await?;
let (left, right) = algos::split_subtree(&mut bs, e.clone(), key).await?;

// Insert the new node inbetween the two subtrees.
let right_subvec = node.entries.split_off(partition + 1);
Expand Down Expand Up @@ -559,42 +480,47 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
node.entries.push(NodeEntry::Leaf(TreeEntry { key: key.to_string(), value }));
}

let mut cid = node.serialize_into(&mut self.storage).await?;
let mut cid = node.serialize_into(&mut bs).await?;

// Now walk back up the node path chain and update parent entries to point to the new node's CID.
for (mut parent, i) in node_path.into_iter().rev() {
parent.entries[i] = NodeEntry::Tree(cid);
cid = parent.serialize_into(&mut self.storage).await?;
cid = parent.serialize_into(&mut bs).await?;
}

self.root = cid;
Ok(())
Ok(cid)
}

/// Update an existing key with a new value.
pub async fn update(&mut self, key: &str, value: Cid) -> Result<(), Error> {
pub async fn update(
mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
root: Cid,
key: &str,
value: Cid,
) -> Result<Cid, Error> {
let (node_path, (mut node, index)) =
algos::traverse(&mut self.storage, self.root, algos::traverse_find(key)).await?;
algos::traverse(&mut bs, root, algos::traverse_find(key)).await?;

// Update the value.
node.entries[index] = NodeEntry::Leaf(TreeEntry { key: key.to_string(), value });

let mut cid = node.serialize_into(&mut self.storage).await?;
let mut cid = node.serialize_into(&mut bs).await?;

// Now walk up the node path chain and update parent entries to point to the new node's CID.
for (mut parent, i) in node_path.into_iter().rev() {
parent.entries[i] = NodeEntry::Tree(cid);
cid = parent.serialize_into(&mut self.storage).await?;
cid = parent.serialize_into(&mut bs).await?;
}

self.root = cid;
Ok(())
Ok(cid)
}

/// Delete a key from the tree.
pub async fn delete(&mut self, key: &str) -> Result<(), Error> {
pub async fn delete(
mut bs: impl AsyncBlockStoreRead + AsyncBlockStoreWrite,
root: Cid,
key: &str,
) -> Result<Cid, Error> {
let (node_path, (mut node, index)) =
algos::traverse(&mut self.storage, self.root, algos::traverse_find(key)).await?;
algos::traverse(&mut bs, root, algos::traverse_find(key)).await?;

// Remove the key.
node.entries.remove(index);
Expand All @@ -604,7 +530,7 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
if let (Some(NodeEntry::Tree(lc)), Some(NodeEntry::Tree(rc))) =
(node.entries.get(index), node.entries.get(index + 1))
{
let cid = algos::merge_subtrees(&mut self.storage, lc.clone(), rc.clone()).await?;
let cid = algos::merge_subtrees(&mut bs, lc.clone(), rc.clone()).await?;
node.entries[index] = NodeEntry::Tree(cid);
node.entries.remove(index + 1);
}
Expand All @@ -613,17 +539,14 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
// Option-alize the node depending on whether or not it is empty.
let node = (!node.entries.is_empty()).then_some(node);

let mut cid = if let Some(node) = node {
Some(node.serialize_into(&mut self.storage).await?)
} else {
None
};
let mut cid =
if let Some(node) = node { Some(node.serialize_into(&mut bs).await?) } else { None };

// Now walk back up the node path chain and update parent entries to point to the new node's CID.
for (mut parent, i) in node_path.into_iter().rev() {
if let Some(cid) = cid.as_mut() {
parent.entries[i] = NodeEntry::Tree(cid.clone());
*cid = parent.serialize_into(&mut self.storage).await?;
*cid = parent.serialize_into(&mut bs).await?;
} else {
// The node ended up becoming empty, so it will be orphaned.
// Note that we can safely delete this entry from the parent because it's guaranteed that
Expand All @@ -634,7 +557,7 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
cid = if parent.entries.is_empty() {
None
} else {
Some(parent.serialize_into(&mut self.storage).await?)
Some(parent.serialize_into(&mut bs).await?)
};
}
}
Expand All @@ -644,20 +567,120 @@ impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
} else {
// The tree is now empty. Create a new empty node.
let node = Node { entries: vec![] };
node.serialize_into(&mut self.storage).await?
node.serialize_into(&mut bs).await?
};

self.root = cid;
self.prune().await?;
let cid = prune(&mut bs, cid).await?;
Ok(cid)
}
}

// https://users.rust-lang.org/t/how-to-find-common-prefix-of-two-byte-slices-effectively/25815/3
fn prefix(xs: &[u8], ys: &[u8]) -> usize {
prefix_chunks::<128>(xs, ys)
}

fn prefix_chunks<const N: usize>(xs: &[u8], ys: &[u8]) -> usize {
// N.B: We take exact chunks here to entice the compiler to autovectorize this loop.
let off =
std::iter::zip(xs.chunks_exact(N), ys.chunks_exact(N)).take_while(|(x, y)| x == y).count()
* N;
off + std::iter::zip(&xs[off..], &ys[off..]).take_while(|(x, y)| x == y).count()
}

/// Calculate the number of leading zeroes from the sha256 hash of a byte array
///
/// Reference: https://github.com/bluesky-social/atproto/blob/13636ba963225407f63c20253b983a92dcfe1bfa/packages/repo/src/mst/util.ts#L8-L23
fn leading_zeroes(key: &[u8]) -> usize {
let digest = sha2::Sha256::digest(key);
let mut zeroes = 0;

for byte in digest.iter() {
zeroes += (*byte < 0b0100_0000) as usize; // 64
zeroes += (*byte < 0b0001_0000) as usize; // 16
zeroes += (*byte < 0b0000_0100) as usize; // 4
zeroes += (*byte < 0b0000_0001) as usize; // 1

if *byte != 0 {
// If the byte is nonzero, then there cannot be any more leading zeroes.
break;
}
}

zeroes
}

/// A merkle search tree data structure, backed by storage implementing
/// [AsyncBlockStoreRead] and optionally [AsyncBlockStoreWrite].
///
/// This data structure is merely a convenience structure that implements
/// algorithms that handle certain common operations one may want to perform
/// against a MST.
///
/// The structure does not actually load the merkle search tree into memory
/// or perform any deep copies. The tree itself lives entirely inside of the
/// provided backing storage. This also carries the implication that any operation
/// performed against the tree will have performance that reflects that of accesses
/// to the backing storage.
///
/// If your backing storage is implemented by a cloud service, such as a
/// database or block storage service, you will likely want to insert a
/// caching layer in your block storage to ensure that performance remains
/// fast.
///
/// ---
///
/// There are two factors that determine the placement of nodes inside of
/// a merkle search tree:
/// - The number of leading zeroes in the SHA256 hash of the key
/// - The key's lexicographic position inside of a layer
///
/// # Reference
/// * Official documentation: https://atproto.com/guides/data-repos
/// * Useful reading: https://interjectedfuture.com/crdts-turned-inside-out/
pub struct Tree<S> {
storage: S,
root: Cid,
}

// N.B: It's trivial to clone the tree if it's trivial to clone the backing storage,
// so implement clone if the storage also implements it.
impl<S: Clone> Clone for Tree<S> {
fn clone(&self) -> Self {
Self { storage: self.storage.clone(), root: self.root.clone() }
}
}

impl<S> std::fmt::Debug for Tree<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Tree").field("root", &self.root).finish_non_exhaustive()
}
}

impl<S: AsyncBlockStoreRead + AsyncBlockStoreWrite> Tree<S> {
/// Create a new MST with an empty root node
pub async fn create(mut storage: S) -> Result<Self, Error> {
let node = Node { entries: vec![] };
let cid = node.serialize_into(&mut storage).await?;

Ok(Self { storage, root: cid })
}

/// Add a new key with the specified value to the tree.
pub async fn add(&mut self, key: &str, value: Cid) -> Result<(), Error> {
self.root = algos::add(&mut self.storage, self.root, key, value).await?;
Ok(())
}

/// Prune entries that contain a single nested tree entry from the root.
async fn prune(&mut self) -> Result<(), Error> {
let (_node_path, cid) =
algos::traverse(&mut self.storage, self.root, algos::traverse_prune()).await?;
/// Update an existing key with a new value.
pub async fn update(&mut self, key: &str, value: Cid) -> Result<(), Error> {
self.root = algos::update(&mut self.storage, self.root, key, value).await?;
Ok(())
}

self.root = cid;
/// Delete a key from the tree.
pub async fn delete(&mut self, key: &str) -> Result<(), Error> {
self.root = algos::delete(&mut self.storage, self.root, key).await?;
Ok(())
}
}
Expand Down

0 comments on commit e52792b

Please sign in to comment.