diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..d35fffd --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,7 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "concurrent-non-blocking-queue" +version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..2f00287 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,6 @@ +[package] +name = "concurrent-non-blocking-queue" +version = "0.1.0" +edition = "2021" + +[dependencies] diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..b4e213c --- /dev/null +++ b/src/main.rs @@ -0,0 +1,138 @@ +use std::ptr; +use std::sync::atomic::{AtomicPtr, Ordering}; + +#[derive(Debug)] +struct Node { + data: T, + next: AtomicPtr>, +} + +#[derive(Debug)] +pub struct LockFreeQueue { + head: AtomicPtr>, + tail: AtomicPtr> +} + +impl LockFreeQueue { + + pub fn new() -> Self { + let dummy_node = Box::into_raw(Box::new(Node { + data: Default::default(), + next: AtomicPtr::new(ptr::null_mut()), + })); + + LockFreeQueue { + head: AtomicPtr::new(dummy_node), + tail: AtomicPtr::new(dummy_node), + } + } + + pub fn offer(&self, data: T) { + let new_node = Box::into_raw(Box::new(Node { + data, + next: AtomicPtr::new(ptr::null_mut()), + })); + let mut tail = self.tail.load(Ordering::Relaxed); + let mut next; + loop { + unsafe { + next = (*tail).next.load(Ordering::Relaxed); + + if next.is_null() { + if (*tail) + .next + .compare_exchange(next, new_node, Ordering::Release, Ordering::Relaxed) + .unwrap() + == next + { + break; + } + } else { + self.tail.compare_exchange(tail, next, Ordering::Release, Ordering::Relaxed); + tail = self.tail.load(Ordering::Relaxed); + } + } + } + self.tail.compare_exchange(tail, new_node, Ordering::Release, Ordering::Relaxed); + } + + pub fn take(&self) -> Option { + let mut head = self.head.load(Ordering::Relaxed); + let mut next; + loop { + unsafe { + next = (*head).next.load(Ordering::Relaxed); + + if next.is_null() { + return None; + } + + if self + .head + .compare_and_swap(head, next, Ordering::Relaxed) + == head + { + let node = Box::from_raw(head); + return Some(node.data); + } + + head = self.head.load(Ordering::Relaxed); + } + } + } + + pub fn is_empty(&self) -> bool { + let head = self.head.load(Ordering::Relaxed); + let next = unsafe { (*head).next.load(Ordering::Relaxed) }; + next.is_null() + } +} + +impl Drop for LockFreeQueue { + + fn drop(&mut self) { + let mut node = self.head.load(Ordering::Relaxed); + while node != ptr::null_mut() { + let n = unsafe { Box::from_raw(node) }; + node = n.next.load(Ordering::Relaxed); + } + } +} + + +#[cfg(test)] +mod tests { + use crate::LockFreeQueue; + + #[test] + fn test_is_empty() { + let queue = LockFreeQueue::new(); + + assert!(queue.is_empty()); + + queue.offer(1); + assert!(!queue.is_empty()); + + queue.take(); + assert!(queue.is_empty()); + } + + #[test] + fn test_take_from_empty_queue() { + let queue: LockFreeQueue = LockFreeQueue::new(); + assert_eq!(queue.take(), None); + } + + #[test] + fn test_offer_take() { + let queue = LockFreeQueue::new(); + + queue.offer(1); + queue.offer(2); + + assert_eq!(queue.take(), Some(0)); // TODO this dummy element + assert_eq!(queue.take(), Some(1)); + assert_eq!(queue.take(), Some(2)); + assert_eq!(queue.take(), None); + } +} \ No newline at end of file