Skip to content

WIP: Remove nested async by copying code #581

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 25, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
248 changes: 213 additions & 35 deletions fsspec/asyn.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import re
import threading
from glob import has_magic

from .spec import AbstractFileSystem
from .utils import is_exception, other_paths
Expand All @@ -12,35 +13,13 @@
lock = threading.Lock()


def _run_until_done(loop, coro):
"""execute coroutine, when already in the event loop"""
# raise Nested
with lock:
task = asyncio.current_task(loop=loop)
if task:
asyncio.tasks._unregister_task(task)
asyncio.tasks._current_tasks.pop(loop, None)
runner = loop.create_task(coro)
try:
while not runner.done():
try:
loop._run_once()
except (IndexError, RuntimeError):
pass
finally:
if task:
with lock:
asyncio.tasks._current_tasks[loop] = task
return runner.result()


def sync(loop, func, *args, callback_timeout=None, **kwargs):
"""
Make loop run coroutine until it returns. Runs in this thread
"""
coro = func(*args, **kwargs)
if loop.is_running():
result = _run_until_done(loop, coro)
raise NotImplementedError
else:
result = loop.run_until_complete(coro)
return result
Expand All @@ -60,12 +39,8 @@ def maybe_sync(func, self, *args, **kwargs):
except RuntimeError:
loop0 = None
if loop0 is not None and loop0.is_running():
if inspect.iscoroutinefunction(func):
# run coroutine while pausing this one (because we are within async)
return _run_until_done(loop, func(*args, **kwargs))
else:
# make awaitable which then calls the blocking function
raise NotImplementedError()
# TEMPORARY - to be removed
raise NotImplementedError()
else:
if inspect.iscoroutinefunction(func):
# run the awaitable on the loop
Expand Down Expand Up @@ -119,11 +94,6 @@ def get_loop():
"_rm_file",
"_cp_file",
"_pipe_file",
]
# these methods could be overridden, but have default sync versions which rely on _ls
# the sync methods below all call expand_path, which in turn may call walk or glob
# (if passed paths with glob characters, or for recursive=True, respectively)
default_async_methods = [
"_expand_path",
"_info",
"_isfile",
Expand All @@ -133,6 +103,7 @@ def get_loop():
"_glob",
"_find",
"_du",
"_size",
]


Expand Down Expand Up @@ -167,6 +138,9 @@ def loop(self):
self._loop.loop = get_loop()
return self._loop.loop

async def _rm_file(self, path, **kwargs):
raise NotImplementedError

async def _rm(self, path, recursive=False, **kwargs):
await asyncio.gather(*[self._rm_file(p, **kwargs) for p in path])

Expand Down Expand Up @@ -269,6 +243,210 @@ def get(self, rpath, lpath, recursive=False, **kwargs):
[os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths]
return sync(self.loop, self._get, rpaths, lpaths)

async def _info(self, path):
raise NotImplementedError

async def _isfile(self, path):
try:
return (await self._info(path))["type"] == "file"
except: # noqa: E722
return False

async def _isdir(self, path):
try:
return (await self._info(path))["type"] == "directory"
except IOError:
return False

async def _size(self, path):
return (await self._info(path)).get("size", None)

async def _exists(self, path):
try:
await self._info(path)
return True
except FileNotFoundError:
return False

async def _ls(self, path, **kwargs):
raise NotImplementedError

async def _walk(self, path, maxdepth=None, **kwargs):
path = self._strip_protocol(path)
full_dirs = {}
dirs = {}
files = {}

detail = kwargs.pop("detail", False)
try:
listing = await self._ls(path, detail=True, **kwargs)
except (FileNotFoundError, IOError):
yield [], [], []
return

for info in listing:
# each info name must be at least [path]/part , but here
# we check also for names like [path]/part/
pathname = info["name"].rstrip("/")
name = pathname.rsplit("/", 1)[-1]
if info["type"] == "directory" and pathname != path:
# do not include "self" path
full_dirs[pathname] = info
dirs[name] = info
elif pathname == path:
# file-like with same name as give path
files[""] = info
else:
files[name] = info

if detail:
yield path, dirs, files
else:
yield path, list(dirs), list(files)

if maxdepth is not None:
maxdepth -= 1
if maxdepth < 1:
return

for d in full_dirs:
async for _ in self._walk(d, maxdepth=maxdepth, detail=detail, **kwargs):
yield _

async def _glob(self, path, **kwargs):
import re

ends = path.endswith("/")
path = self._strip_protocol(path)
indstar = path.find("*") if path.find("*") >= 0 else len(path)
indques = path.find("?") if path.find("?") >= 0 else len(path)
indbrace = path.find("[") if path.find("[") >= 0 else len(path)

ind = min(indstar, indques, indbrace)

detail = kwargs.pop("detail", False)

if not has_magic(path):
root = path
depth = 1
if ends:
path += "/*"
elif await self._exists(path):
if not detail:
return [path]
else:
return {path: await self._info(path)}
else:
if not detail:
return [] # glob of non-existent returns empty
else:
return {}
elif "/" in path[:ind]:
ind2 = path[:ind].rindex("/")
root = path[: ind2 + 1]
depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1
else:
root = ""
depth = None if "**" in path else path[ind + 1 :].count("/") + 1

allpaths = await self._find(
root, maxdepth=depth, withdirs=True, detail=True, **kwargs
)
# Escape characters special to python regex, leaving our supported
# special characters in place.
# See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html
# for shell globbing details.
pattern = (
"^"
+ (
path.replace("\\", r"\\")
.replace(".", r"\.")
.replace("+", r"\+")
.replace("//", "/")
.replace("(", r"\(")
.replace(")", r"\)")
.replace("|", r"\|")
.replace("^", r"\^")
.replace("$", r"\$")
.replace("{", r"\{")
.replace("}", r"\}")
.rstrip("/")
.replace("?", ".")
)
+ "$"
)
pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern)
pattern = re.sub("[*]", "[^/]*", pattern)
pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*"))
out = {
p: allpaths[p]
for p in sorted(allpaths)
if pattern.match(p.replace("//", "/").rstrip("/"))
}
if detail:
return out
else:
return list(out)

async def _du(self, path, total=True, maxdepth=None, **kwargs):
sizes = {}
# async for?
for f in await self._find(path, maxdepth=maxdepth, **kwargs):
info = await self._info(f)
sizes[info["name"]] = info["size"]
if total:
return sum(sizes.values())
else:
return sizes

async def _find(self, path, maxdepth=None, withdirs=False, **kwargs):
path = self._strip_protocol(path)
out = dict()
detail = kwargs.pop("detail", False)
# async for?
async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs):
if withdirs:
files.update(dirs)
out.update({info["name"]: info for name, info in files.items()})
if (await self._isfile(path)) and path not in out:
# walk works on directories, but find should also return [path]
# when path happens to be a file
out[path] = {}
names = sorted(out)
if not detail:
return names
else:
return {name: out[name] for name in names}

async def _expand_path(self, path, recursive=False, maxdepth=None):
if isinstance(path, str):
out = await self._expand_path([path], recursive, maxdepth)
else:
# reduce depth on each recursion level unless None or 0
maxdepth = maxdepth if not maxdepth else maxdepth - 1
out = set()
path = [self._strip_protocol(p) for p in path]
for p in path: # can gather here
if has_magic(p):
bit = set(await self._glob(p))
out |= bit
if recursive:
out |= set(
await self._expand_path(
list(bit), recursive=recursive, maxdepth=maxdepth
)
)
continue
elif recursive:
rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True))
out |= rec
if p not in out and (recursive is False or (await self._exists(p))):
# should only check once, for the root
out.add(p)
if not out:
raise FileNotFoundError(path)
return list(sorted(out))


def mirror_sync_methods(obj):
"""Populate sync and async methods for obj
Expand All @@ -285,7 +463,7 @@ def mirror_sync_methods(obj):
"""
from fsspec import AbstractFileSystem

for method in async_methods + default_async_methods + dir(AsyncFileSystem):
for method in async_methods + dir(AsyncFileSystem):
if not method.startswith("_"):
continue
smethod = method[1:]
Expand Down
2 changes: 1 addition & 1 deletion fsspec/implementations/reference.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ def _dircache_from_items(self):

self.dircache[par].append({"name": path, "type": "file", "size": size})

def ls(self, path, detail=True, **kwargs):
async def _ls(self, path, detail=True, **kwargs):
path = self._strip_protocol(path)
out = self._ls_from_cache(path)
if detail:
Expand Down
28 changes: 7 additions & 21 deletions fsspec/tests/test_async.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,10 @@
import asyncio
import sys
import inspect

import pytest
import fsspec.asyn

from fsspec.asyn import _run_until_done


async def inner():
await asyncio.sleep(1)
return True


async def outer():
await asyncio.sleep(1)
return _run_until_done(asyncio.get_running_loop(), inner())


@pytest.mark.skipif(sys.version_info < (3, 7), reason="Async fails on py36")
def test_runtildone():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
assert loop.run_until_complete(outer())
loop.close()
def test_sync_methods():
inst = fsspec.asyn.AsyncFileSystem()
assert inspect.iscoroutinefunction(inst._info)
assert hasattr(inst, "info")
assert not inspect.iscoroutinefunction(inst.info)