Skip to content

Commit 0b28f04

Browse files
authored
Merge pull request #581 from martindurant/no_nesting
WIP: Remove nested async by copying code
2 parents f8399bf + 8ebdb4f commit 0b28f04

File tree

3 files changed

+221
-57
lines changed

3 files changed

+221
-57
lines changed

fsspec/asyn.py

+213-35
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import re
66
import threading
7+
from glob import has_magic
78

89
from .spec import AbstractFileSystem
910
from .utils import is_exception, other_paths
@@ -12,35 +13,13 @@
1213
lock = threading.Lock()
1314

1415

15-
def _run_until_done(loop, coro):
16-
"""execute coroutine, when already in the event loop"""
17-
# raise Nested
18-
with lock:
19-
task = asyncio.current_task(loop=loop)
20-
if task:
21-
asyncio.tasks._unregister_task(task)
22-
asyncio.tasks._current_tasks.pop(loop, None)
23-
runner = loop.create_task(coro)
24-
try:
25-
while not runner.done():
26-
try:
27-
loop._run_once()
28-
except (IndexError, RuntimeError):
29-
pass
30-
finally:
31-
if task:
32-
with lock:
33-
asyncio.tasks._current_tasks[loop] = task
34-
return runner.result()
35-
36-
3716
def sync(loop, func, *args, callback_timeout=None, **kwargs):
3817
"""
3918
Make loop run coroutine until it returns. Runs in this thread
4019
"""
4120
coro = func(*args, **kwargs)
4221
if loop.is_running():
43-
result = _run_until_done(loop, coro)
22+
raise NotImplementedError
4423
else:
4524
result = loop.run_until_complete(coro)
4625
return result
@@ -60,12 +39,8 @@ def maybe_sync(func, self, *args, **kwargs):
6039
except RuntimeError:
6140
loop0 = None
6241
if loop0 is not None and loop0.is_running():
63-
if inspect.iscoroutinefunction(func):
64-
# run coroutine while pausing this one (because we are within async)
65-
return _run_until_done(loop, func(*args, **kwargs))
66-
else:
67-
# make awaitable which then calls the blocking function
68-
raise NotImplementedError()
42+
# TEMPORARY - to be removed
43+
raise NotImplementedError()
6944
else:
7045
if inspect.iscoroutinefunction(func):
7146
# run the awaitable on the loop
@@ -119,11 +94,6 @@ def get_loop():
11994
"_rm_file",
12095
"_cp_file",
12196
"_pipe_file",
122-
]
123-
# these methods could be overridden, but have default sync versions which rely on _ls
124-
# the sync methods below all call expand_path, which in turn may call walk or glob
125-
# (if passed paths with glob characters, or for recursive=True, respectively)
126-
default_async_methods = [
12797
"_expand_path",
12898
"_info",
12999
"_isfile",
@@ -133,6 +103,7 @@ def get_loop():
133103
"_glob",
134104
"_find",
135105
"_du",
106+
"_size",
136107
]
137108

138109

@@ -167,6 +138,9 @@ def loop(self):
167138
self._loop.loop = get_loop()
168139
return self._loop.loop
169140

141+
async def _rm_file(self, path, **kwargs):
142+
raise NotImplementedError
143+
170144
async def _rm(self, path, recursive=False, **kwargs):
171145
await asyncio.gather(*[self._rm_file(p, **kwargs) for p in path])
172146

@@ -269,6 +243,210 @@ def get(self, rpath, lpath, recursive=False, **kwargs):
269243
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
270244
return sync(self.loop, self._get, rpaths, lpaths)
271245

246+
async def _info(self, path):
247+
raise NotImplementedError
248+
249+
async def _isfile(self, path):
250+
try:
251+
return (await self._info(path))["type"] == "file"
252+
except: # noqa: E722
253+
return False
254+
255+
async def _isdir(self, path):
256+
try:
257+
return (await self._info(path))["type"] == "directory"
258+
except IOError:
259+
return False
260+
261+
async def _size(self, path):
262+
return (await self._info(path)).get("size", None)
263+
264+
async def _exists(self, path):
265+
try:
266+
await self._info(path)
267+
return True
268+
except FileNotFoundError:
269+
return False
270+
271+
async def _ls(self, path, **kwargs):
272+
raise NotImplementedError
273+
274+
async def _walk(self, path, maxdepth=None, **kwargs):
275+
path = self._strip_protocol(path)
276+
full_dirs = {}
277+
dirs = {}
278+
files = {}
279+
280+
detail = kwargs.pop("detail", False)
281+
try:
282+
listing = await self._ls(path, detail=True, **kwargs)
283+
except (FileNotFoundError, IOError):
284+
yield [], [], []
285+
return
286+
287+
for info in listing:
288+
# each info name must be at least [path]/part , but here
289+
# we check also for names like [path]/part/
290+
pathname = info["name"].rstrip("/")
291+
name = pathname.rsplit("/", 1)[-1]
292+
if info["type"] == "directory" and pathname != path:
293+
# do not include "self" path
294+
full_dirs[pathname] = info
295+
dirs[name] = info
296+
elif pathname == path:
297+
# file-like with same name as give path
298+
files[""] = info
299+
else:
300+
files[name] = info
301+
302+
if detail:
303+
yield path, dirs, files
304+
else:
305+
yield path, list(dirs), list(files)
306+
307+
if maxdepth is not None:
308+
maxdepth -= 1
309+
if maxdepth < 1:
310+
return
311+
312+
for d in full_dirs:
313+
async for _ in self._walk(d, maxdepth=maxdepth, detail=detail, **kwargs):
314+
yield _
315+
316+
async def _glob(self, path, **kwargs):
317+
import re
318+
319+
ends = path.endswith("/")
320+
path = self._strip_protocol(path)
321+
indstar = path.find("*") if path.find("*") >= 0 else len(path)
322+
indques = path.find("?") if path.find("?") >= 0 else len(path)
323+
indbrace = path.find("[") if path.find("[") >= 0 else len(path)
324+
325+
ind = min(indstar, indques, indbrace)
326+
327+
detail = kwargs.pop("detail", False)
328+
329+
if not has_magic(path):
330+
root = path
331+
depth = 1
332+
if ends:
333+
path += "/*"
334+
elif await self._exists(path):
335+
if not detail:
336+
return [path]
337+
else:
338+
return {path: await self._info(path)}
339+
else:
340+
if not detail:
341+
return [] # glob of non-existent returns empty
342+
else:
343+
return {}
344+
elif "/" in path[:ind]:
345+
ind2 = path[:ind].rindex("/")
346+
root = path[: ind2 + 1]
347+
depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
348+
else:
349+
root = ""
350+
depth = None if "**" in path else path[ind + 1 :].count("/") + 1
351+
352+
allpaths = await self._find(
353+
root, maxdepth=depth, withdirs=True, detail=True, **kwargs
354+
)
355+
# Escape characters special to python regex, leaving our supported
356+
# special characters in place.
357+
# See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
358+
# for shell globbing details.
359+
pattern = (
360+
"^"
361+
+ (
362+
path.replace("\\", r"\\")
363+
.replace(".", r"\.")
364+
.replace("+", r"\+")
365+
.replace("//", "/")
366+
.replace("(", r"\(")
367+
.replace(")", r"\)")
368+
.replace("|", r"\|")
369+
.replace("^", r"\^")
370+
.replace("$", r"\$")
371+
.replace("{", r"\{")
372+
.replace("}", r"\}")
373+
.rstrip("/")
374+
.replace("?", ".")
375+
)
376+
+ "$"
377+
)
378+
pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
379+
pattern = re.sub("[*]", "[^/]*", pattern)
380+
pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
381+
out = {
382+
p: allpaths[p]
383+
for p in sorted(allpaths)
384+
if pattern.match(p.replace("//", "/").rstrip("/"))
385+
}
386+
if detail:
387+
return out
388+
else:
389+
return list(out)
390+
391+
async def _du(self, path, total=True, maxdepth=None, **kwargs):
392+
sizes = {}
393+
# async for?
394+
for f in await self._find(path, maxdepth=maxdepth, **kwargs):
395+
info = await self._info(f)
396+
sizes[info["name"]] = info["size"]
397+
if total:
398+
return sum(sizes.values())
399+
else:
400+
return sizes
401+
402+
async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
403+
path = self._strip_protocol(path)
404+
out = dict()
405+
detail = kwargs.pop("detail", False)
406+
# async for?
407+
async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
408+
if withdirs:
409+
files.update(dirs)
410+
out.update({info["name"]: info for name, info in files.items()})
411+
if (await self._isfile(path)) and path not in out:
412+
# walk works on directories, but find should also return [path]
413+
# when path happens to be a file
414+
out[path] = {}
415+
names = sorted(out)
416+
if not detail:
417+
return names
418+
else:
419+
return {name: out[name] for name in names}
420+
421+
async def _expand_path(self, path, recursive=False, maxdepth=None):
422+
if isinstance(path, str):
423+
out = await self._expand_path([path], recursive, maxdepth)
424+
else:
425+
# reduce depth on each recursion level unless None or 0
426+
maxdepth = maxdepth if not maxdepth else maxdepth - 1
427+
out = set()
428+
path = [self._strip_protocol(p) for p in path]
429+
for p in path: # can gather here
430+
if has_magic(p):
431+
bit = set(await self._glob(p))
432+
out |= bit
433+
if recursive:
434+
out |= set(
435+
await self._expand_path(
436+
list(bit), recursive=recursive, maxdepth=maxdepth
437+
)
438+
)
439+
continue
440+
elif recursive:
441+
rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
442+
out |= rec
443+
if p not in out and (recursive is False or (await self._exists(p))):
444+
# should only check once, for the root
445+
out.add(p)
446+
if not out:
447+
raise FileNotFoundError(path)
448+
return list(sorted(out))
449+
272450

273451
def mirror_sync_methods(obj):
274452
"""Populate sync and async methods for obj
@@ -285,7 +463,7 @@ def mirror_sync_methods(obj):
285463
"""
286464
from fsspec import AbstractFileSystem
287465

288-
for method in async_methods + default_async_methods + dir(AsyncFileSystem):
466+
for method in async_methods + dir(AsyncFileSystem):
289467
if not method.startswith("_"):
290468
continue
291469
smethod = method[1:]

fsspec/implementations/reference.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ def _dircache_from_items(self):
186186

187187
self.dircache[par].append({"name": path, "type": "file", "size": size})
188188

189-
def ls(self, path, detail=True, **kwargs):
189+
async def _ls(self, path, detail=True, **kwargs):
190190
path = self._strip_protocol(path)
191191
out = self._ls_from_cache(path)
192192
if detail:

fsspec/tests/test_async.py

+7-21
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,10 @@
1-
import asyncio
2-
import sys
1+
import inspect
32

4-
import pytest
3+
import fsspec.asyn
54

6-
from fsspec.asyn import _run_until_done
75

8-
9-
async def inner():
10-
await asyncio.sleep(1)
11-
return True
12-
13-
14-
async def outer():
15-
await asyncio.sleep(1)
16-
return _run_until_done(asyncio.get_running_loop(), inner())
17-
18-
19-
@pytest.mark.skipif(sys.version_info < (3, 7), reason="Async fails on py36")
20-
def test_runtildone():
21-
loop = asyncio.new_event_loop()
22-
asyncio.set_event_loop(loop)
23-
assert loop.run_until_complete(outer())
24-
loop.close()
6+
def test_sync_methods():
7+
inst = fsspec.asyn.AsyncFileSystem()
8+
assert inspect.iscoroutinefunction(inst._info)
9+
assert hasattr(inst, "info")
10+
assert not inspect.iscoroutinefunction(inst.info)

0 commit comments

Comments
 (0)