Skip to content

Commit 4cb98ab

Browse files
Add async wrapper for sync FS (#1745)
Co-authored-by: Martin Durant <[email protected]>
1 parent 9a16171 commit 4cb98ab

File tree

4 files changed

+272
-1
lines changed

4 files changed

+272
-1
lines changed

.github/workflows/main.yaml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ jobs:
1414
fail-fast: false
1515
matrix:
1616
PY:
17-
- "3.8"
1817
- "3.9"
1918
- "3.10"
2019
- "3.11"

docs/source/async.rst

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,3 +152,37 @@ available as the attribute ``.loop``.
152152

153153
<script data-goatcounter="https://fsspec.goatcounter.com/count"
154154
async src="//gc.zgo.at/count.js"></script>
155+
156+
AsyncFileSystemWrapper
157+
----------------------
158+
159+
The `AsyncFileSystemWrapper` class is an experimental feature that allows you to convert
160+
a synchronous filesystem into an asynchronous one. This is useful for quickly integrating
161+
synchronous filesystems into workflows that may expect `AsyncFileSystem` instances.
162+
163+
Basic Usage
164+
~~~~~~~~~~~
165+
166+
To use `AsyncFileSystemWrapper`, wrap any synchronous filesystem to work in an asynchronous context.
167+
In this example, the synchronous `LocalFileSystem` is wrapped, creating an `AsyncFileSystem` instance
168+
backed by the normal, synchronous methods of `LocalFileSystem`:
169+
170+
.. code-block:: python
171+
172+
import asyncio
173+
import fsspec
174+
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
175+
176+
async def async_copy_file():
177+
sync_fs = fsspec.filesystem('file') # by-default synchronous, local filesystem
178+
async_fs = AsyncFileSystemWrapper(sync_fs)
179+
return await async_fs._copy('/source/file.txt', '/destination/file.txt')
180+
181+
asyncio.run(async_copy_file())
182+
183+
Limitations
184+
-----------
185+
186+
This is experimental. Users should not expect this wrapper to magically make things faster.
187+
It is primarily provided to allow usage of synchronous filesystems with interfaces that expect
188+
`AsyncFileSystem` instances.
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
import asyncio
2+
import inspect
3+
import functools
4+
from fsspec.asyn import AsyncFileSystem
5+
6+
7+
def async_wrapper(func, obj=None):
8+
"""
9+
Wraps a synchronous function to make it awaitable.
10+
11+
Parameters
12+
----------
13+
func : callable
14+
The synchronous function to wrap.
15+
obj : object, optional
16+
The instance to bind the function to, if applicable.
17+
18+
Returns
19+
-------
20+
coroutine
21+
An awaitable version of the function.
22+
"""
23+
24+
@functools.wraps(func)
25+
async def wrapper(*args, **kwargs):
26+
return await asyncio.to_thread(func, *args, **kwargs)
27+
28+
return wrapper
29+
30+
31+
class AsyncFileSystemWrapper(AsyncFileSystem):
32+
"""
33+
A wrapper class to convert a synchronous filesystem into an asynchronous one.
34+
35+
This class takes an existing synchronous filesystem implementation and wraps all
36+
its methods to provide an asynchronous interface.
37+
38+
Parameters
39+
----------
40+
sync_fs : AbstractFileSystem
41+
The synchronous filesystem instance to wrap.
42+
"""
43+
44+
def __init__(self, sync_fs, *args, **kwargs):
45+
super().__init__(*args, **kwargs)
46+
self.asynchronous = True
47+
self.fs = sync_fs
48+
self._wrap_all_sync_methods()
49+
50+
@property
51+
def fsid(self):
52+
return f"async_{self.fs.fsid}"
53+
54+
def _wrap_all_sync_methods(self):
55+
"""
56+
Wrap all synchronous methods of the underlying filesystem with asynchronous versions.
57+
"""
58+
for method_name in dir(self.fs):
59+
if method_name.startswith("_"):
60+
continue
61+
62+
attr = inspect.getattr_static(self.fs, method_name)
63+
if isinstance(attr, property):
64+
continue
65+
66+
method = getattr(self.fs, method_name)
67+
if callable(method) and not asyncio.iscoroutinefunction(method):
68+
async_method = async_wrapper(method, obj=self)
69+
setattr(self, f"_{method_name}", async_method)
70+
71+
@classmethod
72+
def wrap_class(cls, sync_fs_class):
73+
"""
74+
Create a new class that can be used to instantiate an AsyncFileSystemWrapper
75+
with lazy instantiation of the underlying synchronous filesystem.
76+
77+
Parameters
78+
----------
79+
sync_fs_class : type
80+
The class of the synchronous filesystem to wrap.
81+
82+
Returns
83+
-------
84+
type
85+
A new class that wraps the provided synchronous filesystem class.
86+
"""
87+
88+
class GeneratedAsyncFileSystemWrapper(cls):
89+
def __init__(self, *args, **kwargs):
90+
sync_fs = sync_fs_class(*args, **kwargs)
91+
super().__init__(sync_fs)
92+
93+
GeneratedAsyncFileSystemWrapper.__name__ = (
94+
f"Async{sync_fs_class.__name__}Wrapper"
95+
)
96+
return GeneratedAsyncFileSystemWrapper
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import asyncio
2+
import pytest
3+
import os
4+
5+
import fsspec
6+
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
7+
from fsspec.implementations.local import LocalFileSystem
8+
from .test_local import csv_files, filetexts
9+
10+
11+
def test_is_async():
12+
fs = fsspec.filesystem("file")
13+
async_fs = AsyncFileSystemWrapper(fs)
14+
assert async_fs.async_impl
15+
16+
17+
def test_class_wrapper():
18+
fs_cls = LocalFileSystem
19+
async_fs_cls = AsyncFileSystemWrapper.wrap_class(fs_cls)
20+
assert async_fs_cls.__name__ == "AsyncLocalFileSystemWrapper"
21+
async_fs = async_fs_cls()
22+
assert async_fs.async_impl
23+
24+
25+
@pytest.mark.asyncio
26+
async def test_cats():
27+
with filetexts(csv_files, mode="b"):
28+
fs = fsspec.filesystem("file")
29+
async_fs = AsyncFileSystemWrapper(fs)
30+
31+
result = await async_fs._cat(".test.fakedata.1.csv")
32+
assert result == b"a,b\n1,2\n"
33+
34+
out = set(
35+
(
36+
await async_fs._cat([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
37+
).values()
38+
)
39+
assert out == {b"a,b\n1,2\n", b"a,b\n3,4\n"}
40+
41+
result = await async_fs._cat(".test.fakedata.1.csv", None, None)
42+
assert result == b"a,b\n1,2\n"
43+
44+
result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=6)
45+
assert result == b"a,b\n1,2\n"[1:6]
46+
47+
result = await async_fs._cat(".test.fakedata.1.csv", start=-1)
48+
assert result == b"a,b\n1,2\n"[-1:]
49+
50+
result = await async_fs._cat(".test.fakedata.1.csv", start=1, end=-2)
51+
assert result == b"a,b\n1,2\n"[1:-2]
52+
53+
# test synchronous API is available as expected
54+
result = async_fs.cat(".test.fakedata.1.csv", start=1, end=-2)
55+
assert result == b"a,b\n1,2\n"[1:-2]
56+
57+
out = set(
58+
(
59+
await async_fs._cat(
60+
[".test.fakedata.1.csv", ".test.fakedata.2.csv"], start=1, end=-1
61+
)
62+
).values()
63+
)
64+
assert out == {b"a,b\n1,2\n"[1:-1], b"a,b\n3,4\n"[1:-1]}
65+
66+
67+
@pytest.mark.asyncio
68+
async def test_basic_crud_operations():
69+
with filetexts(csv_files, mode="b"):
70+
fs = fsspec.filesystem("file")
71+
async_fs = AsyncFileSystemWrapper(fs)
72+
73+
await async_fs._touch(".test.fakedata.3.csv")
74+
assert await async_fs._exists(".test.fakedata.3.csv")
75+
76+
data = await async_fs._cat(".test.fakedata.1.csv")
77+
assert data == b"a,b\n1,2\n"
78+
79+
await async_fs._pipe(".test.fakedata.1.csv", b"a,b\n5,6\n")
80+
data = await async_fs._cat(".test.fakedata.1.csv")
81+
assert data == b"a,b\n5,6\n"
82+
83+
await async_fs._rm(".test.fakedata.1.csv")
84+
assert not await async_fs._exists(".test.fakedata.1.csv")
85+
86+
87+
@pytest.mark.asyncio
88+
async def test_error_handling():
89+
fs = fsspec.filesystem("file")
90+
async_fs = AsyncFileSystemWrapper(fs)
91+
92+
with pytest.raises(FileNotFoundError):
93+
await async_fs._cat(".test.non_existent.csv")
94+
95+
with pytest.raises(FileNotFoundError):
96+
await async_fs._rm(".test.non_existent.csv")
97+
98+
99+
@pytest.mark.asyncio
100+
async def test_concurrent_operations():
101+
with filetexts(csv_files, mode="b"):
102+
fs = fsspec.filesystem("file")
103+
async_fs = AsyncFileSystemWrapper(fs)
104+
105+
async def read_file(file_path):
106+
return await async_fs._cat(file_path)
107+
108+
results = await asyncio.gather(
109+
read_file(".test.fakedata.1.csv"),
110+
read_file(".test.fakedata.2.csv"),
111+
read_file(".test.fakedata.1.csv"),
112+
)
113+
114+
assert results == [b"a,b\n1,2\n", b"a,b\n3,4\n", b"a,b\n1,2\n"]
115+
116+
117+
@pytest.mark.asyncio
118+
async def test_directory_operations():
119+
with filetexts(csv_files, mode="b"):
120+
fs = fsspec.filesystem("file")
121+
async_fs = AsyncFileSystemWrapper(fs)
122+
123+
await async_fs._makedirs("new_directory")
124+
assert await async_fs._isdir("new_directory")
125+
126+
files = await async_fs._ls(".")
127+
filenames = [os.path.basename(file) for file in files]
128+
129+
assert ".test.fakedata.1.csv" in filenames
130+
assert ".test.fakedata.2.csv" in filenames
131+
assert "new_directory" in filenames
132+
133+
134+
@pytest.mark.asyncio
135+
async def test_batch_operations():
136+
with filetexts(csv_files, mode="b"):
137+
fs = fsspec.filesystem("file")
138+
async_fs = AsyncFileSystemWrapper(fs)
139+
140+
await async_fs._rm([".test.fakedata.1.csv", ".test.fakedata.2.csv"])
141+
assert not await async_fs._exists(".test.fakedata.1.csv")
142+
assert not await async_fs._exists(".test.fakedata.2.csv")

0 commit comments

Comments
 (0)