Skip to content

Test indexed_gzip code with GzipStream #108

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 112 additions & 3 deletions indexed_gzip/tests/ctest_indexed_gzip.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,113 @@ from . import tempdir
from . import touch
from . import compress

from collections import deque

from libc.stdio cimport (SEEK_SET,
SEEK_CUR,
SEEK_END)


class BytesBuffer(BytesIO):
"""
A class for a buffer of bytes. Unlike io.BytesIO(), this class
keeps track of the buffer's size (in bytes).
"""

def __init__(self):
self.__buf = deque()
self.__size = 0
self.__pos = 0

def __len__(self):
return self.__size

def write(self, data):
self.__buf.append(data)
self.__size += len(data)
# print(f"In BytesBuffer write, self.__size: {self.__size}, len(data): {len(data)}")

def read(self, size = None):
if size is None:
size = self.__size
ret_list = []
while size > 0 and len(self.__buf):
s = self.__buf.popleft()
size -= len(s)
# print(f"In BytesBUffer read, current size to read: {size}")
ret_list.append(s)
if size < 0:
# print(f"Before correct size, ret list[-1]: {len(ret_list[-1])}")
ret_list[-1], remainder = ret_list[-1][:size], ret_list[-1][size:]
self.__buf.appendleft(remainder)

ret = b''.join(ret_list)
self.__size -= len(ret)
# print(f"After correct size, ret list[-1]: {len(ret_list[-1])}, len(reminder): {len(remainder)}, len(ret) : {len(ret)}, __size: {self.__size}")
self.__pos += len(ret)
return ret

def peek(self, size: int):
b = bytearray()
for i in range(0, min(size, len(self.__buf))):
b.extend(self.__buf[i])
return bytes(b)[:size]

def flush(self):
pass

def close(self):
pass

def tell(self):
return self.__pos

def __bool__(self):
return True


class GzipStream(BytesIO):
"""A stream that gzips a file in chunks.
"""

def __init__(self, fileobj):
self.__input = fileobj
self.__buffer = BytesBuffer()
self.__gzip = gzip.GzipFile(None, mode='wb', fileobj=self.__buffer)
self.__size = 0

def _fill_buf_bytes(self, num_bytes=None):
while num_bytes is None or len(self.__buffer) < num_bytes:
s = self.__input.read(num_bytes)
print(f"In GzipStream _fill_buf_bytes, num_bytes = {num_bytes}, read in length = {len(s)}, length of buffer = {len(self.__buffer)}")
if not s:
self.__gzip.close()
break
self.__gzip.write(s) # gzip the current file

def read(self, num_bytes=None):
self._fill_buf_bytes(num_bytes)
# print(f"In GzipStream read(). num_bytes = {num_bytes}")
data = self.__buffer.read(num_bytes)
# print(f"In GzipStream read(). num_bytes = {num_bytes}, read out data from GzipStream length = {len(data)}")
self.__size += len(data)
print(f"Size changed to {self.__size}, num_bytes was {num_bytes}, read out {len(data)}")
return data

def close(self):
self.__input.close()

def peek(self, num_bytes):
self._fill_buf_bytes(num_bytes)
return self.__buffer.peek(num_bytes)

def tell(self):
print("In GzipStream, tell() is called")
return self.__size

def seekable(self):
return False

def error_fn(*args, **kwargs):
raise Exception("Error")

Expand Down Expand Up @@ -918,7 +1020,7 @@ def test_build_index_from_unseekable():
idxfname = op.join(td, 'test.gzidx')

# make a test file
data = np.arange(524288, dtype=np.uint64)
data = np.arange(5242, dtype=np.uint64)
with gzip.open(fname, 'wb') as f:
f.write(data.tostring())

Expand All @@ -928,20 +1030,27 @@ def test_build_index_from_unseekable():
b = f.read()
fileobj = BytesIO(b)

def new_seek(*args, **kwargs):
fileobj = GzipStream(BytesIO(data.tostring()))

"""def new_seek(*args, **kwargs):
raise OSError()
def new_tell(*args, **kwargs):
raise OSError()
old_seek = fileobj.seek
old_tell = fileobj.tell
fileobj.seekable = lambda: False
fileobj.seek = new_seek
fileobj.tell = new_tell
fileobj.tell = new_tell"""



# generate an index file
with igzip._IndexedGzipFile(fileobj, spacing=131072) as f:
f.build_full_index()
f.export_index(idxfname)
points = list(f.seek_points())

return
fileobj.seek = old_seek
fileobj.tell = old_tell
fileobj.seekable = lambda: True
Expand Down
Loading