Skip to content

Commit 48f7c9a

Browse files
carshadijhammandcherian
authored
Feat: improves delete_dir for s3fs-backed FsspecStore (#2661)
* Implement asynchronous directory deletion in FsspecStore - override Store.delete_dir default method, which deletes keys one by one, to support bulk deletion for fsspec implementations that support a list of paths in the fs._rm method. - This can greatly reduce the number of requests to S3, which reduces likelihood of running into throttling errors and improves delete performance. - Currently, only s3fs is supported. * Use async batched _rm() for FsspecStore.delete_dir() * Suppress allowed exceptions instead of try-except-pass * Adds note on possibly redundant condition in FsspecStore.delete_dir() * Fix: unpack allowed arguments list * Adds tests for FsspecStore.delete_dir * Update src/zarr/storage/_fsspec.py Co-authored-by: Joe Hamman <[email protected]> * Remove supports_listing condition from FsspecStore.delete_dir * use f-string for url formatting * assert `store.fs.asynchronous` instead of `store.fs.async_impl` * updates release notes * remove unused import * Explicitly construct wrapped local filesystem for test --------- Co-authored-by: Joe Hamman <[email protected]> Co-authored-by: Joe Hamman <[email protected]> Co-authored-by: Deepak Cherian <[email protected]>
1 parent 23abb5b commit 48f7c9a

File tree

3 files changed

+48
-0
lines changed

3 files changed

+48
-0
lines changed

changes/2661.feature.1.rst

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Improves performance of FsspecStore.delete_dir for remote filesystems supporting concurrent/batched deletes, e.g., s3fs.

src/zarr/storage/_fsspec.py

+14
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from __future__ import annotations
22

33
import warnings
4+
from contextlib import suppress
45
from typing import TYPE_CHECKING, Any
56

67
from zarr.abc.store import (
@@ -286,6 +287,19 @@ async def delete(self, key: str) -> None:
286287
except self.allowed_exceptions:
287288
pass
288289

290+
async def delete_dir(self, prefix: str) -> None:
291+
# docstring inherited
292+
if not self.supports_deletes:
293+
raise NotImplementedError(
294+
"This method is only available for stores that support deletes."
295+
)
296+
self._check_writable()
297+
298+
path_to_delete = _dereference_path(self.path, prefix)
299+
300+
with suppress(*self.allowed_exceptions):
301+
await self.fs._rm(path_to_delete, recursive=True)
302+
289303
async def exists(self, key: str) -> bool:
290304
# docstring inherited
291305
path = _dereference_path(self.path, key)

tests/test_store/test_fsspec.py

+33
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,14 @@ async def test_empty_nonexistent_path(self, store_kwargs) -> None:
217217
store = await self.store_cls.open(**store_kwargs)
218218
assert await store.is_empty("")
219219

220+
async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None:
221+
store.supports_deletes = False
222+
with pytest.raises(
223+
NotImplementedError,
224+
match="This method is only available for stores that support deletes.",
225+
):
226+
await store.delete_dir("test_prefix")
227+
220228

221229
@pytest.mark.skipif(
222230
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
@@ -244,3 +252,28 @@ def test_no_wrap_async_filesystem():
244252

245253
assert not isinstance(store.fs, AsyncFileSystemWrapper)
246254
assert store.fs.async_impl
255+
256+
257+
@pytest.mark.skipif(
258+
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
259+
reason="No AsyncFileSystemWrapper",
260+
)
261+
async def test_delete_dir_wrapped_filesystem(tmpdir) -> None:
262+
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
263+
from fsspec.implementations.local import LocalFileSystem
264+
265+
wrapped_fs = AsyncFileSystemWrapper(LocalFileSystem(auto_mkdir=True))
266+
store = FsspecStore(wrapped_fs, read_only=False, path=f"{tmpdir}/test/path")
267+
268+
assert isinstance(store.fs, AsyncFileSystemWrapper)
269+
assert store.fs.asynchronous
270+
271+
await store.set("zarr.json", cpu.Buffer.from_bytes(b"root"))
272+
await store.set("foo-bar/zarr.json", cpu.Buffer.from_bytes(b"root"))
273+
await store.set("foo/zarr.json", cpu.Buffer.from_bytes(b"bar"))
274+
await store.set("foo/c/0", cpu.Buffer.from_bytes(b"chunk"))
275+
await store.delete_dir("foo")
276+
assert await store.exists("zarr.json")
277+
assert await store.exists("foo-bar/zarr.json")
278+
assert not await store.exists("foo/zarr.json")
279+
assert not await store.exists("foo/c/0")

0 commit comments

Comments
 (0)