From 09e4978d1de38906efb43c93d8d9373857b63658 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 14 Feb 2025 19:17:10 +0530 Subject: [PATCH 01/13] update --- src/litdata/streaming/reader.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 0dfcfcbc..60e73fa6 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -439,10 +439,13 @@ def _get_folder_size(path: str) -> int: """ size = 0 + _SUPPORTED_EXTENSIONS = (".bin", ".json", ".parquet", ".npy") + for dirpath, _, filenames in os.walk(str(path)): for filename in filenames: - with contextlib.suppress(FileNotFoundError): - size += os.stat(os.path.join(dirpath, filename)).st_size + if filename.endswith(_SUPPORTED_EXTENSIONS): + with contextlib.suppress(FileNotFoundError): + size += os.stat(os.path.join(dirpath, filename)).st_size return size From 712b2a7765cfb7ce1a483203ea9cdd5c623d838e Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 14 Feb 2025 19:30:16 +0530 Subject: [PATCH 02/13] use config --- src/litdata/streaming/config.py | 3 +++ src/litdata/streaming/reader.py | 15 ++++++++------- tests/streaming/test_reader.py | 9 +-------- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/src/litdata/streaming/config.py b/src/litdata/streaming/config.py index af96ede6..f2e29f37 100644 --- a/src/litdata/streaming/config.py +++ b/src/litdata/streaming/config.py @@ -98,6 +98,9 @@ def __init__( self._skip_chunk_indexes_deletion: Optional[List[int]] = None self.zero_based_roi: Optional[List[Tuple[int, int]]] = None + self.filename_to_size_map: Dict[str, int] = {} + for cnk in _original_chunks: + self.filename_to_size_map[cnk["filename"]] = cnk["chunk_bytes"] def can_delete(self, chunk_index: int) -> bool: if self._skip_chunk_indexes_deletion is None: diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 60e73fa6..bd1528d0 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -182,7 +182,10 @@ def _maybe_delete_chunks(self) -> None: def _can_delete_chunk(self) -> bool: if self._delete_chunks_when_processed: return self._pre_download_counter >= self._max_pre_download - 1 - return self._max_cache_size is not None and _get_folder_size(self._parent_cache_dir) >= self._max_cache_size + return ( + self._max_cache_size is not None + and _get_folder_size(self._parent_cache_dir, self._config) >= self._max_cache_size + ) def _pre_load_chunk(self, chunk_index: int) -> None: chunk_filepath, _, _ = self._config[ChunkedIndex(index=-1, chunk_index=chunk_index)] @@ -432,20 +435,18 @@ def __del__(self) -> None: self._prepare_thread = None -def _get_folder_size(path: str) -> int: +def _get_folder_size(path: str, config: ChunksConfig) -> int: """Collect the size of each files within a folder. This method is robust to file deletion races """ size = 0 - _SUPPORTED_EXTENSIONS = (".bin", ".json", ".parquet", ".npy") - - for dirpath, _, filenames in os.walk(str(path)): + for dirpath, _, filenames in os.walk(path): for filename in filenames: - if filename.endswith(_SUPPORTED_EXTENSIONS): + if filename in config.filename_to_size_map: with contextlib.suppress(FileNotFoundError): - size += os.stat(os.path.join(dirpath, filename)).st_size + size += config.filename_to_size_map[filename] return size diff --git a/tests/streaming/test_reader.py b/tests/streaming/test_reader.py index a6354b4c..1d741052 100644 --- a/tests/streaming/test_reader.py +++ b/tests/streaming/test_reader.py @@ -8,7 +8,7 @@ from litdata.streaming.cache import Cache from litdata.streaming.config import ChunkedIndex from litdata.streaming.item_loader import PyTreeLoader -from litdata.streaming.reader import _END_TOKEN, PrepareChunksThread, _get_folder_size +from litdata.streaming.reader import _END_TOKEN, PrepareChunksThread from litdata.streaming.resolver import Dir from litdata.utilities.env import _DistributedEnv from tests.streaming.utils import filter_lock_files, get_lock_files @@ -86,13 +86,6 @@ def test_reader_chunk_removal_compressed(tmpdir): assert len(filter_lock_files(os.listdir(cache_dir))) in [2, 3] -def test_get_folder_size(tmpdir): - array = np.zeros((10, 10)) - - np.save(os.path.join(tmpdir, "array_1.npy"), array) - np.save(os.path.join(tmpdir, "array_2.npy"), array) - - assert _get_folder_size(tmpdir) == 928 * 2 def test_prepare_chunks_thread_eviction(tmpdir, monkeypatch): From 0c5c84968b694d65c6d5c96941956dbe0966b802 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 14 Feb 2025 14:00:35 +0000 Subject: [PATCH 03/13] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- tests/streaming/test_reader.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/streaming/test_reader.py b/tests/streaming/test_reader.py index 1d741052..75e2e81a 100644 --- a/tests/streaming/test_reader.py +++ b/tests/streaming/test_reader.py @@ -2,8 +2,6 @@ import shutil from time import sleep -import numpy as np - from litdata.streaming import reader from litdata.streaming.cache import Cache from litdata.streaming.config import ChunkedIndex @@ -86,8 +84,6 @@ def test_reader_chunk_removal_compressed(tmpdir): assert len(filter_lock_files(os.listdir(cache_dir))) in [2, 3] - - def test_prepare_chunks_thread_eviction(tmpdir, monkeypatch): monkeypatch.setattr(reader, "_LONG_DEFAULT_TIMEOUT", 0.1) From cc957377207132b0a94a653f1e9e9b433259c795 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 14 Feb 2025 19:38:38 +0530 Subject: [PATCH 04/13] update --- src/litdata/streaming/reader.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index bd1528d0..e65f1da6 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -442,11 +442,10 @@ def _get_folder_size(path: str, config: ChunksConfig) -> int: """ size = 0 - for dirpath, _, filenames in os.walk(path): - for filename in filenames: - if filename in config.filename_to_size_map: - with contextlib.suppress(FileNotFoundError): - size += config.filename_to_size_map[filename] + for filename in os.listdir(path): + if filename in config.filename_to_size_map: + with contextlib.suppress(FileNotFoundError): + size += config.filename_to_size_map[filename] return size From 012be9ff04839c833db2e8e0c0baf21d77ec18ed Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Tue, 18 Feb 2025 00:40:17 +0530 Subject: [PATCH 05/13] need to do quick fixing --- src/litdata/streaming/reader.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index e65f1da6..3b5362ef 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -182,6 +182,9 @@ def _maybe_delete_chunks(self) -> None: def _can_delete_chunk(self) -> bool: if self._delete_chunks_when_processed: return self._pre_download_counter >= self._max_pre_download - 1 + print( + f"Current cache size: {_get_folder_size(self._parent_cache_dir, self._config)}; and max cache size: {self._max_cache_size}" + ) return ( self._max_cache_size is not None and _get_folder_size(self._parent_cache_dir, self._config) >= self._max_cache_size @@ -442,10 +445,12 @@ def _get_folder_size(path: str, config: ChunksConfig) -> int: """ size = 0 - for filename in os.listdir(path): + for filename in os.listdir(os.path.join(path, "cache_dir")): if filename in config.filename_to_size_map: with contextlib.suppress(FileNotFoundError): - size += config.filename_to_size_map[filename] + # size += config.filename_to_size_map[filename] + print(f"File: {filename}") + size += os.path.getsize(os.path.join(path, "cache_dir", filename)) return size From 08e00152b0b9a68182929148c23df502faf263bb Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Thu, 20 Feb 2025 23:40:53 +0530 Subject: [PATCH 06/13] fixed failing tests --- src/litdata/streaming/reader.py | 8 ++------ tests/streaming/test_reader.py | 19 ++++++++++++++++--- 2 files changed, 18 insertions(+), 9 deletions(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 3b5362ef..0120a888 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -182,9 +182,6 @@ def _maybe_delete_chunks(self) -> None: def _can_delete_chunk(self) -> bool: if self._delete_chunks_when_processed: return self._pre_download_counter >= self._max_pre_download - 1 - print( - f"Current cache size: {_get_folder_size(self._parent_cache_dir, self._config)}; and max cache size: {self._max_cache_size}" - ) return ( self._max_cache_size is not None and _get_folder_size(self._parent_cache_dir, self._config) >= self._max_cache_size @@ -448,9 +445,8 @@ def _get_folder_size(path: str, config: ChunksConfig) -> int: for filename in os.listdir(os.path.join(path, "cache_dir")): if filename in config.filename_to_size_map: with contextlib.suppress(FileNotFoundError): - # size += config.filename_to_size_map[filename] - print(f"File: {filename}") - size += os.path.getsize(os.path.join(path, "cache_dir", filename)) + size += config.filename_to_size_map[filename] + print(f"File: {filename}; size: {config.filename_to_size_map[filename]}") return size diff --git a/tests/streaming/test_reader.py b/tests/streaming/test_reader.py index 75e2e81a..6d1a3dea 100644 --- a/tests/streaming/test_reader.py +++ b/tests/streaming/test_reader.py @@ -16,6 +16,7 @@ def test_reader_chunk_removal(tmpdir): cache_dir = os.path.join(tmpdir, "cache_dir") remote_dir = os.path.join(tmpdir, "remote_dir") os.makedirs(cache_dir, exist_ok=True) + # we don't care about the max cache size here (so very large number) cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=28020) for i in range(25): @@ -35,12 +36,18 @@ def test_reader_chunk_removal(tmpdir): assert len(filter_lock_files(os.listdir(cache_dir))) == 14 assert len(get_lock_files(os.listdir(cache_dir))) == 0 - cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=2800) + # Let's test if cache actually respects the max cache size + # each chunk is 40 bytes if it has 2 items + # a chunk with only 1 item is 24 bytes (values determined by checking actual chunk sizes) + cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=90) shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=True) for i in range(25): + # we expect at max 3 files to be present (2 chunks and 1 index file) + # why 2 chunks? Bcoz max cache size is 90 bytes and each chunk is 40 bytes or 24 bytes (1 item) + # So any additional chunk will go over the max cache size assert len(filter_lock_files(os.listdir(cache_dir))) <= 3 index = ChunkedIndex(*cache._get_chunk_index_from_index(i), is_last_index=i == 24) assert cache[index] == i @@ -52,6 +59,7 @@ def test_reader_chunk_removal_compressed(tmpdir): cache_dir = os.path.join(tmpdir, "cache_dir") remote_dir = os.path.join(tmpdir, "remote_dir") os.makedirs(cache_dir, exist_ok=True) + # we don't care about the max cache size here (so very large number) cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=28020, compression="zstd") for i in range(25): @@ -70,13 +78,18 @@ def test_reader_chunk_removal_compressed(tmpdir): assert len(filter_lock_files(os.listdir(cache_dir))) == 14 assert len(get_lock_files(os.listdir(cache_dir))) == 0 - - cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=2800, compression="zstd") + # Let's test if cache actually respects the max cache size + # each chunk is 40 bytes if it has 2 items + # a chunk with only 1 item is 24 bytes (values determined by checking actual chunk sizes) + cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=90, compression="zstd") shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=True) for i in range(25): + # we expect at max 3 files to be present (2 chunks and 1 index file) + # why 2 chunks? Bcoz max cache size is 90 bytes and each chunk is 40 bytes or 24 bytes (1 item) + # So any additional chunk will go over the max cache size assert len(filter_lock_files(os.listdir(cache_dir))) <= 3 index = ChunkedIndex(*cache._get_chunk_index_from_index(i), is_last_index=i == 24) assert cache[index] == i From fef180be74d0438d9e8428b0b5a23e88a24beccc Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Thu, 20 Feb 2025 23:43:59 +0530 Subject: [PATCH 07/13] update --- src/litdata/streaming/reader.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 0120a888..6301bef3 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -446,7 +446,6 @@ def _get_folder_size(path: str, config: ChunksConfig) -> int: if filename in config.filename_to_size_map: with contextlib.suppress(FileNotFoundError): size += config.filename_to_size_map[filename] - print(f"File: {filename}; size: {config.filename_to_size_map[filename]}") return size From a039ce38e38657b4667751ff5eca0cc85f216a0f Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 21 Feb 2025 11:03:22 +0530 Subject: [PATCH 08/13] tests passing --- src/litdata/streaming/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 6301bef3..e65f1da6 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -442,7 +442,7 @@ def _get_folder_size(path: str, config: ChunksConfig) -> int: """ size = 0 - for filename in os.listdir(os.path.join(path, "cache_dir")): + for filename in os.listdir(path): if filename in config.filename_to_size_map: with contextlib.suppress(FileNotFoundError): size += config.filename_to_size_map[filename] From c49a631207adfba684af2321e65b883456e3e574 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 21 Feb 2025 11:20:03 +0530 Subject: [PATCH 09/13] update --- src/litdata/streaming/reader.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index e65f1da6..6b423f8e 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -184,7 +184,7 @@ def _can_delete_chunk(self) -> bool: return self._pre_download_counter >= self._max_pre_download - 1 return ( self._max_cache_size is not None - and _get_folder_size(self._parent_cache_dir, self._config) >= self._max_cache_size + and _get_folder_size(self._config._cache_dir, self._config) >= self._max_cache_size ) def _pre_load_chunk(self, chunk_index: int) -> None: From 7cfe82f1a9a53095b834668cb4b6a3efc697c433 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 21 Feb 2025 11:32:11 +0530 Subject: [PATCH 10/13] also handle compressed chunks --- src/litdata/streaming/config.py | 4 +++- tests/streaming/test_reader.py | 20 +++++++++++++++++--- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/src/litdata/streaming/config.py b/src/litdata/streaming/config.py index acd93fca..b417ac46 100644 --- a/src/litdata/streaming/config.py +++ b/src/litdata/streaming/config.py @@ -100,7 +100,9 @@ def __init__( self.zero_based_roi: Optional[List[Tuple[int, int]]] = None self.filename_to_size_map: Dict[str, int] = {} for cnk in _original_chunks: - self.filename_to_size_map[cnk["filename"]] = cnk["chunk_bytes"] + # since files downloaded while reading will be decompressed, we need to store the size of the original file + filename_without_compression = cnk["filename"].replace(f".{self._compressor_name}", "") + self.filename_to_size_map[filename_without_compression] = cnk["chunk_bytes"] def can_delete(self, chunk_index: int) -> bool: if self._skip_chunk_indexes_deletion is None: diff --git a/tests/streaming/test_reader.py b/tests/streaming/test_reader.py index 6d1a3dea..1da224e3 100644 --- a/tests/streaming/test_reader.py +++ b/tests/streaming/test_reader.py @@ -2,6 +2,9 @@ import shutil from time import sleep +import pytest + +from litdata.constants import _ZSTD_AVAILABLE from litdata.streaming import reader from litdata.streaming.cache import Cache from litdata.streaming.config import ChunkedIndex @@ -12,12 +15,21 @@ from tests.streaming.utils import filter_lock_files, get_lock_files -def test_reader_chunk_removal(tmpdir): +@pytest.mark.parametrize( + "compression", + [ + pytest.param(None), + pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), + ], +) +def test_reader_chunk_removal(tmpdir, compression): cache_dir = os.path.join(tmpdir, "cache_dir") remote_dir = os.path.join(tmpdir, "remote_dir") os.makedirs(cache_dir, exist_ok=True) # we don't care about the max cache size here (so very large number) - cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=28020) + cache = Cache( + input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=28020, compression=compression + ) for i in range(25): cache[i] = i @@ -39,7 +51,9 @@ def test_reader_chunk_removal(tmpdir): # Let's test if cache actually respects the max cache size # each chunk is 40 bytes if it has 2 items # a chunk with only 1 item is 24 bytes (values determined by checking actual chunk sizes) - cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=90) + cache = Cache( + input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=90, compression=compression + ) shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=True) From f9b82ac0afc95783e9c18e9acab17c1e9f6db8b0 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 21 Feb 2025 11:34:15 +0530 Subject: [PATCH 11/13] update --- src/litdata/streaming/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/litdata/streaming/config.py b/src/litdata/streaming/config.py index b417ac46..3babdb1b 100644 --- a/src/litdata/streaming/config.py +++ b/src/litdata/streaming/config.py @@ -100,7 +100,7 @@ def __init__( self.zero_based_roi: Optional[List[Tuple[int, int]]] = None self.filename_to_size_map: Dict[str, int] = {} for cnk in _original_chunks: - # since files downloaded while reading will be decompressed, we need to store the size of the original file + # since files downloaded while reading will be decompressed, we need to store the name without compression filename_without_compression = cnk["filename"].replace(f".{self._compressor_name}", "") self.filename_to_size_map[filename_without_compression] = cnk["chunk_bytes"] From d27c455db0a882b8a33dfe985965407af991c5c1 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 21 Feb 2025 11:39:28 +0530 Subject: [PATCH 12/13] my bad, there's already test for compressed --- tests/streaming/test_reader.py | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/tests/streaming/test_reader.py b/tests/streaming/test_reader.py index 1da224e3..a48db614 100644 --- a/tests/streaming/test_reader.py +++ b/tests/streaming/test_reader.py @@ -2,9 +2,6 @@ import shutil from time import sleep -import pytest - -from litdata.constants import _ZSTD_AVAILABLE from litdata.streaming import reader from litdata.streaming.cache import Cache from litdata.streaming.config import ChunkedIndex @@ -15,21 +12,12 @@ from tests.streaming.utils import filter_lock_files, get_lock_files -@pytest.mark.parametrize( - "compression", - [ - pytest.param(None), - pytest.param("zstd", marks=pytest.mark.skipif(condition=not _ZSTD_AVAILABLE, reason="Requires: ['zstd']")), - ], -) -def test_reader_chunk_removal(tmpdir, compression): +def test_reader_chunk_removal(tmpdir): cache_dir = os.path.join(tmpdir, "cache_dir") remote_dir = os.path.join(tmpdir, "remote_dir") os.makedirs(cache_dir, exist_ok=True) # we don't care about the max cache size here (so very large number) - cache = Cache( - input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=28020, compression=compression - ) + cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=28020, compression="zstd") for i in range(25): cache[i] = i @@ -51,9 +39,7 @@ def test_reader_chunk_removal(tmpdir, compression): # Let's test if cache actually respects the max cache size # each chunk is 40 bytes if it has 2 items # a chunk with only 1 item is 24 bytes (values determined by checking actual chunk sizes) - cache = Cache( - input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=90, compression=compression - ) + cache = Cache(input_dir=Dir(path=cache_dir, url=remote_dir), chunk_size=2, max_cache_size=90) shutil.rmtree(cache_dir) os.makedirs(cache_dir, exist_ok=True) From 4c59221ad09c0cbea7762d9d3d24239079e2d7d0 Mon Sep 17 00:00:00 2001 From: Deependu Jha Date: Fri, 21 Feb 2025 13:12:34 +0530 Subject: [PATCH 13/13] update --- src/litdata/streaming/reader.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/litdata/streaming/reader.py b/src/litdata/streaming/reader.py index 6b423f8e..e3f76b71 100644 --- a/src/litdata/streaming/reader.py +++ b/src/litdata/streaming/reader.py @@ -446,6 +446,8 @@ def _get_folder_size(path: str, config: ChunksConfig) -> int: if filename in config.filename_to_size_map: with contextlib.suppress(FileNotFoundError): size += config.filename_to_size_map[filename] + elif not filename.endswith((".cnt", ".lock")): + logger.warning(f"File {filename} is not a valid chunk file. It will be ignored.") return size