From 812b8517b8166ff063587d90b0c4eb904d200657 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 23 Mar 2021 15:39:49 -0400 Subject: [PATCH 1/3] Remove nested async by copying code --- fsspec/asyn.py | 248 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 213 insertions(+), 35 deletions(-) diff --git a/fsspec/asyn.py b/fsspec/asyn.py index 931edfe05..7625b2001 100644 --- a/fsspec/asyn.py +++ b/fsspec/asyn.py @@ -4,6 +4,7 @@ import os import re import threading +from glob import has_magic from .spec import AbstractFileSystem from .utils import is_exception, other_paths @@ -12,35 +13,13 @@ lock = threading.Lock() -def _run_until_done(loop, coro): - """execute coroutine, when already in the event loop""" - # raise Nested - with lock: - task = asyncio.current_task(loop=loop) - if task: - asyncio.tasks._unregister_task(task) - asyncio.tasks._current_tasks.pop(loop, None) - runner = loop.create_task(coro) - try: - while not runner.done(): - try: - loop._run_once() - except (IndexError, RuntimeError): - pass - finally: - if task: - with lock: - asyncio.tasks._current_tasks[loop] = task - return runner.result() - - def sync(loop, func, *args, callback_timeout=None, **kwargs): """ Make loop run coroutine until it returns. Runs in this thread """ coro = func(*args, **kwargs) if loop.is_running(): - result = _run_until_done(loop, coro) + raise NotImplementedError else: result = loop.run_until_complete(coro) return result @@ -60,12 +39,8 @@ def maybe_sync(func, self, *args, **kwargs): except RuntimeError: loop0 = None if loop0 is not None and loop0.is_running(): - if inspect.iscoroutinefunction(func): - # run coroutine while pausing this one (because we are within async) - return _run_until_done(loop, func(*args, **kwargs)) - else: - # make awaitable which then calls the blocking function - raise NotImplementedError() + # TEMPORARY - to be removed + raise NotImplementedError() else: if inspect.iscoroutinefunction(func): # run the awaitable on the loop @@ -119,11 +94,6 @@ def get_loop(): "_rm_file", "_cp_file", "_pipe_file", -] -# these methods could be overridden, but have default sync versions which rely on _ls -# the sync methods below all call expand_path, which in turn may call walk or glob -# (if passed paths with glob characters, or for recursive=True, respectively) -default_async_methods = [ "_expand_path", "_info", "_isfile", @@ -133,6 +103,7 @@ def get_loop(): "_glob", "_find", "_du", + "_size", ] @@ -167,6 +138,9 @@ def loop(self): self._loop.loop = get_loop() return self._loop.loop + async def _rm_file(self, path, **kwargs): + raise NotImplementedError + async def _rm(self, path, recursive=False, **kwargs): await asyncio.gather(*[self._rm_file(p, **kwargs) for p in path]) @@ -269,6 +243,210 @@ def get(self, rpath, lpath, recursive=False, **kwargs): [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] return sync(self.loop, self._get, rpaths, lpaths) + async def _info(self, path): + raise NotImplementedError + + async def _isfile(self, path): + try: + return (await self._info(path))["type"] == "file" + except: # noqa: E722 + return False + + async def _isdir(self, path): + try: + return (await self._info(path))["type"] == "directory" + except IOError: + return False + + async def _size(self, path): + return (await self._info(path)).get("size", None) + + async def _exists(self, path): + try: + await self._info(path) + return True + except FileNotFoundError: + return False + + async def _ls(self, path, **kwargs): + raise NotImplementedError + + async def _walk(self, path, maxdepth=None, **kwargs): + path = self._strip_protocol(path) + full_dirs = {} + dirs = {} + files = {} + + detail = kwargs.pop("detail", False) + try: + listing = await self._ls(path, detail=True, **kwargs) + except (FileNotFoundError, IOError): + yield [], [], [] + return + + for info in listing: + # each info name must be at least [path]/part , but here + # we check also for names like [path]/part/ + pathname = info["name"].rstrip("/") + name = pathname.rsplit("/", 1)[-1] + if info["type"] == "directory" and pathname != path: + # do not include "self" path + full_dirs[pathname] = info + dirs[name] = info + elif pathname == path: + # file-like with same name as give path + files[""] = info + else: + files[name] = info + + if detail: + yield path, dirs, files + else: + yield path, list(dirs), list(files) + + if maxdepth is not None: + maxdepth -= 1 + if maxdepth < 1: + return + + for d in full_dirs: + async for _ in self._walk(d, maxdepth=maxdepth, detail=detail, **kwargs): + yield _ + + async def _glob(self, path, **kwargs): + import re + + ends = path.endswith("/") + path = self._strip_protocol(path) + indstar = path.find("*") if path.find("*") >= 0 else len(path) + indques = path.find("?") if path.find("?") >= 0 else len(path) + indbrace = path.find("[") if path.find("[") >= 0 else len(path) + + ind = min(indstar, indques, indbrace) + + detail = kwargs.pop("detail", False) + + if not has_magic(path): + root = path + depth = 1 + if ends: + path += "/*" + elif await self._exists(path): + if not detail: + return [path] + else: + return {path: await self._info(path)} + else: + if not detail: + return [] # glob of non-existent returns empty + else: + return {} + elif "/" in path[:ind]: + ind2 = path[:ind].rindex("/") + root = path[: ind2 + 1] + depth = None if "**" in path else path[ind2 + 1 :].count("/") + 1 + else: + root = "" + depth = None if "**" in path else path[ind + 1 :].count("/") + 1 + + allpaths = await self._find( + root, maxdepth=depth, withdirs=True, detail=True, **kwargs + ) + # Escape characters special to python regex, leaving our supported + # special characters in place. + # See https://www.gnu.org/software/bash/manual/html_node/Pattern-Matching.html + # for shell globbing details. + pattern = ( + "^" + + ( + path.replace("\\", r"\\") + .replace(".", r"\.") + .replace("+", r"\+") + .replace("//", "/") + .replace("(", r"\(") + .replace(")", r"\)") + .replace("|", r"\|") + .replace("^", r"\^") + .replace("$", r"\$") + .replace("{", r"\{") + .replace("}", r"\}") + .rstrip("/") + .replace("?", ".") + ) + + "$" + ) + pattern = re.sub("[*]{2}", "=PLACEHOLDER=", pattern) + pattern = re.sub("[*]", "[^/]*", pattern) + pattern = re.compile(pattern.replace("=PLACEHOLDER=", ".*")) + out = { + p: allpaths[p] + for p in sorted(allpaths) + if pattern.match(p.replace("//", "/").rstrip("/")) + } + if detail: + return out + else: + return list(out) + + async def _du(self, path, total=True, maxdepth=None, **kwargs): + sizes = {} + # async for? + for f in await self._find(path, maxdepth=maxdepth, **kwargs): + info = await self._info(f) + sizes[info["name"]] = info["size"] + if total: + return sum(sizes.values()) + else: + return sizes + + async def _find(self, path, maxdepth=None, withdirs=False, **kwargs): + path = self._strip_protocol(path) + out = dict() + detail = kwargs.pop("detail", False) + # async for? + async for _, dirs, files in self._walk(path, maxdepth, detail=True, **kwargs): + if withdirs: + files.update(dirs) + out.update({info["name"]: info for name, info in files.items()}) + if (await self._isfile(path)) and path not in out: + # walk works on directories, but find should also return [path] + # when path happens to be a file + out[path] = {} + names = sorted(out) + if not detail: + return names + else: + return {name: out[name] for name in names} + + async def _expand_path(self, path, recursive=False, maxdepth=None): + if isinstance(path, str): + out = await self._expand_path([path], recursive, maxdepth) + else: + # reduce depth on each recursion level unless None or 0 + maxdepth = maxdepth if not maxdepth else maxdepth - 1 + out = set() + path = [self._strip_protocol(p) for p in path] + for p in path: # can gather here + if has_magic(p): + bit = set(await self._glob(p)) + out |= bit + if recursive: + out |= set( + await self._expand_path( + list(bit), recursive=recursive, maxdepth=maxdepth + ) + ) + continue + elif recursive: + rec = set(await self._find(p, maxdepth=maxdepth, withdirs=True)) + out |= rec + if p not in out and (recursive is False or (await self._exists(p))): + # should only check once, for the root + out.add(p) + if not out: + raise FileNotFoundError(path) + return list(sorted(out)) + def mirror_sync_methods(obj): """Populate sync and async methods for obj @@ -285,7 +463,7 @@ def mirror_sync_methods(obj): """ from fsspec import AbstractFileSystem - for method in async_methods + default_async_methods + dir(AsyncFileSystem): + for method in async_methods + dir(AsyncFileSystem): if not method.startswith("_"): continue smethod = method[1:] From 742b1eb8f0176be6f1202795da34c0c8f885adc9 Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 23 Mar 2021 16:13:31 -0400 Subject: [PATCH 2/3] Async test --- fsspec/tests/test_async.py | 28 +++++++--------------------- 1 file changed, 7 insertions(+), 21 deletions(-) diff --git a/fsspec/tests/test_async.py b/fsspec/tests/test_async.py index bb995f065..7a83cb609 100644 --- a/fsspec/tests/test_async.py +++ b/fsspec/tests/test_async.py @@ -1,24 +1,10 @@ -import asyncio -import sys +import inspect -import pytest +import fsspec.asyn -from fsspec.asyn import _run_until_done - -async def inner(): - await asyncio.sleep(1) - return True - - -async def outer(): - await asyncio.sleep(1) - return _run_until_done(asyncio.get_running_loop(), inner()) - - -@pytest.mark.skipif(sys.version_info < (3, 7), reason="Async fails on py36") -def test_runtildone(): - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - assert loop.run_until_complete(outer()) - loop.close() +def test_sync_methods(): + inst = fsspec.asyn.AsyncFileSystem() + assert inspect.iscoroutinefunction(inst._info) + assert hasattr(inst, "info") + assert not inspect.iscoroutinefunction(inst.info) From 8ebdb4f8ebfec27acb07315d57f7debe16f48a2d Mon Sep 17 00:00:00 2001 From: Martin Durant Date: Tue, 23 Mar 2021 16:26:39 -0400 Subject: [PATCH 3/3] fitx test --- fsspec/implementations/reference.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fsspec/implementations/reference.py b/fsspec/implementations/reference.py index 2ac555b9c..4ca958569 100644 --- a/fsspec/implementations/reference.py +++ b/fsspec/implementations/reference.py @@ -178,7 +178,7 @@ def _dircache_from_items(self): self.dircache[par].append({"name": path, "type": "file", "size": size}) - def ls(self, path, detail=True, **kwargs): + async def _ls(self, path, detail=True, **kwargs): path = self._strip_protocol(path) out = self._ls_from_cache(path) if detail: