Skip to content

Feat: Implement Circular Buffer in pydatastructs #672

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pydatastructs/linear_data_structures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
_extensions
)

from .circular_buffer import CircularBuffer # Add the CircularBuffer class to the import list
__all__.append('CircularBuffer') # Add CircularBuffer to the __all__ list

from .arrays import (
OneDimensionalArray,
DynamicOneDimensionalArray,
Expand Down
96 changes: 96 additions & 0 deletions pydatastructs/linear_data_structures/circular_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""
Module implements Circular buffer.
"""

__all__ = ['CircularBuffer']

from pydatastructs.utils.misc_util import _check_type

class CircularBuffer(object):

__slots__ = ['size', 'buffer', 'rear', 'front', 'dtype', 'count']

def __new__(cls, size, dtype):
obj = object.__new__(cls)
obj.size = size
obj.buffer = [None] * size
obj.rear = obj.front = -1
obj.dtype = dtype
obj.count = 0
return obj

def __str__(self):
return ' -> '.join([str(i) for i in self.buffer])

def __repr__(self):
return self.__str__()

def enqueue(self, data):
"""
Adds data to the buffer.

Parameters
==========

data
Data to be added to the buffer.
"""
_check_type(data, self.dtype)
if self.is_full():
raise OverflowError("Circular buffer is full")
if self.front == -1:
self.front = 0
self.rear = (self.rear + 1) % self.size
self.buffer[self.rear] = self.dtype(data)
self.count += 1

def dequeue(self):
"""
Removes and returns the data from the buffer.
"""
if self.is_empty():
raise ValueError("Circular buffer is empty")
data = self.buffer[self.front]
self.buffer[self.front] = None
if self.front == self.rear:
self.front = self.rear = -1
else:
self.front = (self.front + 1) % self.size
self.count -= 1
return data

def peek(self):
"""
Returns the data at the front of the buffer without removing it.
"""
if self.is_empty():
raise ValueError("Circular buffer is empty")
return self.buffer[self.front]

def get(self, index):
"""
Get the data at the index.

Parameters
==========

index: int
The index of the data to be fetched.
"""
if self.is_empty():
raise IndexError("The buffer is empty.")
if index < 0 or index >= self.size:
raise IndexError("Index out of bounds.")
return self.buffer[index]

def is_empty(self):
"""
Checks if the buffer is empty.
"""
return self.count == 0

def is_full(self):
"""
Checks if the buffer is full.
"""
return self.count == self.size
43 changes: 43 additions & 0 deletions pydatastructs/linear_data_structures/tests/test_circular_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import unittest
from pydatastructs.linear_data_structures.circular_buffer import CircularBuffer

class TestCircularBuffer(unittest.TestCase):

def setUp(self):
self.buffer = CircularBuffer(3, dtype=int) # Added dtype argument

def test_enqueue_dequeue(self):
self.buffer.enqueue(1)
self.buffer.enqueue(2)
self.assertEqual(self.buffer.dequeue(), 1)
self.assertEqual(self.buffer.dequeue(), 2)

def test_is_empty(self):
self.assertTrue(self.buffer.is_empty())
self.buffer.enqueue(1)
self.assertFalse(self.buffer.is_empty())

def test_is_full(self):
self.buffer.enqueue(1)
self.buffer.enqueue(2)
self.buffer.enqueue(3)
self.assertTrue(self.buffer.is_full())

def test_peek(self):
self.buffer.enqueue(1)
self.buffer.enqueue(2)
self.assertEqual(self.buffer.peek(), 1)

def test_enqueue_overflow(self):
self.buffer.enqueue(1)
self.buffer.enqueue(2)
self.buffer.enqueue(3)
with self.assertRaises(OverflowError):
self.buffer.enqueue(4)

def test_dequeue_underflow(self):
with self.assertRaises(ValueError):
self.buffer.dequeue()

if __name__ == '__main__':
unittest.main()
Loading