Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do a multithreaded walk when fingerprinting directories #67

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 16 additions & 88 deletions src/fscacher/cache.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
from collections import deque, namedtuple
from functools import wraps
from hashlib import md5
from inspect import Parameter, signature
import logging
import os
import os.path as op
import shutil
import sys
import time
import appdirs
import joblib
from .fastio import walk
from .util import DirFingerprint, FileFingerprint

lgr = logging.getLogger(__name__)

DEFAULT_THREADS = 60


class PersistentCache(object):
"""Persistent cache providing @memoize and @memoize_path decorators
Expand All @@ -23,7 +23,9 @@ class PersistentCache(object):

_cache_var_values = (None, "", "clear", "ignore")

def __init__(self, name=None, *, path=None, tokens=None, envvar=None):
def __init__(
self, name=None, *, path=None, tokens=None, envvar=None, walk_threads=None
):
"""
Parameters
----------
Expand All @@ -42,6 +44,8 @@ def __init__(self, name=None, *, path=None, tokens=None, envvar=None):
envvar: str, optional
Name of the environment variable to query for cache settings; if not
set, `FSCACHER_CACHE` is used
walk_threads: int, optional
Number of threads to use when traversing directory hierarchies
"""
if path is None:
dirs = appdirs.AppDirs("fscacher")
Expand All @@ -65,6 +69,7 @@ def __init__(self, name=None, *, path=None, tokens=None, envvar=None):
self.clear()
self._ignore_cache = cntrl_value == "ignore"
self._tokens = tokens
self._walk_threads = walk_threads or DEFAULT_THREADS

def clear(self):
try:
Expand Down Expand Up @@ -128,7 +133,7 @@ def fingerprinter(*args, **kwargs):
if op.isdir(path):
fprint = self._get_dir_fingerprint(path)
else:
fprint = self._get_file_fingerprint(path)
fprint = FileFingerprint.for_file(path)
if fprint is None:
lgr.debug("Calling %s directly since no fingerprint for %r", f, path)
# just call the function -- we have no fingerprint,
Expand Down Expand Up @@ -156,85 +161,8 @@ def fingerprinter(*args, **kwargs):
# and we memoize actually that function
return fingerprinter

@staticmethod
def _get_file_fingerprint(path):
"""Simplistic generic file fingerprinting based on ctime, mtime, and size
"""
try:
# we can't take everything, since atime can change, etc.
# So let's take some
s = os.stat(path, follow_symlinks=True)
fprint = FileFingerprint.from_stat(s)
lgr.log(5, "Fingerprint for %s: %s", path, fprint)
return fprint
except Exception as exc:
lgr.debug(f"Cannot fingerprint {path}: {exc}")

@staticmethod
def _get_dir_fingerprint(path):
fprint = DirFingerprint()
dirqueue = deque([path])
try:
while dirqueue:
d = dirqueue.popleft()
with os.scandir(d) as entries:
for e in entries:
if e.is_dir(follow_symlinks=True):
dirqueue.append(e.path)
else:
s = e.stat(follow_symlinks=True)
fprint.add_file(e.path, FileFingerprint.from_stat(s))
except Exception as exc:
lgr.debug(f"Cannot fingerprint {path}: {exc}")
return None
else:
return fprint


class FileFingerprint(namedtuple("FileFingerprint", "mtime_ns ctime_ns size inode")):
@classmethod
def from_stat(cls, s):
return cls(s.st_mtime_ns, s.st_ctime_ns, s.st_size, s.st_ino)

def modified_in_window(self, min_dtime):
return abs(time.time() - self.mtime_ns * 1e-9) < min_dtime

def to_tuple(self):
return tuple(self)


class DirFingerprint:
def __init__(self):
self.last_modified = None
self.hash = None

def add_file(self, path, fprint: FileFingerprint):
fprint_hash = md5(
ascii((str(path), fprint.to_tuple())).encode("us-ascii")
).digest()
if self.hash is None:
self.hash = fprint_hash
self.last_modified = fprint.mtime_ns
else:
self.hash = xor_bytes(self.hash, fprint_hash)
if self.last_modified < fprint.mtime_ns:
self.last_modified = fprint.mtime_ns

def modified_in_window(self, min_dtime):
if self.last_modified is None:
return False
else:
return abs(time.time() - self.last_modified * 1e-9) < min_dtime

def to_tuple(self):
if self.hash is None:
return (None,)
else:
return (self.hash.hex(),)


def xor_bytes(b1: bytes, b2: bytes) -> bytes:
length = max(len(b1), len(b2))
i1 = int.from_bytes(b1, sys.byteorder)
i2 = int.from_bytes(b2, sys.byteorder)
return (i1 ^ i2).to_bytes(length, sys.byteorder)
def _get_dir_fingerprint(self, dirpath):
dprint = DirFingerprint()
for path, fprint in walk(dirpath, threads=self._walk_threads):
dprint.add_file(path, fprint)
return dprint
103 changes: 103 additions & 0 deletions src/fscacher/fastio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
# Copyright 2016 The TensorFlow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================
"""Routines for multi-threaded i/o."""

import logging
import os
import threading
from .util import FileFingerprint

lgr = logging.getLogger(__name__)


def walk(top, threads=60):
"""
Multi-threaded version of os.walk().

This routine provides multiple orders of a magnitude performance
improvement when top is mapped to a network filesystem where i/o operations
are slow, but unlimited. For spinning disks it should still run faster
regardless of thread count because it uses a LIFO scheduler that guarantees
locality. For SSDs it will go tolerably slower.

The more exotic coroutine features of os.walk() can not be supported, such
as the ability to selectively inhibit recursion by mutating subdirs.

Args:
top: Path of parent directory to search recursively.
threads: Size of fixed thread pool.

Yields:
A (path, fingerprint) pair for each file within top. These pairs come in
no particular order.
"""
if not os.path.isdir(top):
return
lock = threading.Lock()
on_input = threading.Condition(lock)
on_output = threading.Condition(lock)
tasks = 1
paths = [top]
output = []

def worker():
nonlocal tasks
while True:
with lock:
while True:
if not tasks:
output.append(None)
on_output.notify()
return
if not paths:
on_input.wait()
continue
path = paths.pop()
break
try:
for item in os.listdir(path):
subpath = os.path.join(path, item)
if os.path.isdir(subpath):
with lock:
tasks += 1
paths.append(subpath)
on_input.notify()
else:
with lock:
output.append((subpath, FileFingerprint.for_file(subpath)))
on_output.notify()
except OSError:
lgr.exception("Error scanning directory %s", path)
finally:
with lock:
tasks -= 1
if not tasks:
on_input.notify_all()

workers = [
threading.Thread(target=worker, name=f"fastio.walk {i} {top}", daemon=True)
for i in range(threads)
]
for w in workers:
w.start()
while threads or output: # TODO(jart): Why is 'or output' necessary?
with lock:
while not output:
on_output.wait()
item = output.pop()
if item:
yield item
else:
threads -= 1
74 changes: 74 additions & 0 deletions src/fscacher/tests/test_cache.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import os
import os.path as op
from pathlib import Path
import platform
import shutil
import subprocess
Expand Down Expand Up @@ -239,6 +240,79 @@ def check_new_memoread(arg, content, expect_new=False):
check_new_memoread(1, 14)


def test_memoize_path_recursive_dir(cache, tmp_path):
calls = []

@cache.memoize_path
def memoread(path: Path):
calls.append(path)
file_qty = 0
for p in sorted(path.iterdir()):
if p.is_dir():
file_qty += memoread(p)
else:
file_qty += 1
return file_qty

(tmp_path / "file1.txt").touch()
(tmp_path / "file2.txt").touch()
sub1 = tmp_path / "sub1"
sub1.mkdir()
(sub1 / "file3.txt").touch()
(sub1 / "file4.txt").touch()
(sub1 / "file5.txt").touch()
subsub = sub1 / "subsub"
subsub.mkdir()
(subsub / "file6.txt").touch()
sub2 = tmp_path / "sub2"
sub2.mkdir()
(sub2 / "file7.txt").touch()
(sub2 / "file8.txt").touch()

time.sleep(cache._min_dtime * 1.1)

assert memoread(tmp_path) == 8
assert calls == [tmp_path, sub1, subsub, sub2]
assert memoread(tmp_path) == 8
assert calls == [tmp_path, sub1, subsub, sub2]

(sub2 / "file8.txt").touch()
time.sleep(cache._min_dtime * 1.1)

assert memoread(tmp_path) == 8
assert calls == [tmp_path, sub1, subsub, sub2, tmp_path, sub2]
assert memoread(tmp_path) == 8
assert calls == [tmp_path, sub1, subsub, sub2, tmp_path, sub2]

(subsub / "file9.txt").touch()
time.sleep(cache._min_dtime * 1.1)

assert memoread(tmp_path) == 9
assert calls == [
tmp_path,
sub1,
subsub,
sub2,
tmp_path,
sub2,
tmp_path,
sub1,
subsub,
]
assert memoread(tmp_path) == 9
assert calls == [
tmp_path,
sub1,
subsub,
sub2,
tmp_path,
sub2,
tmp_path,
sub1,
subsub,
]


def test_memoize_path_persist(tmp_path):
from subprocess import PIPE, run

Expand Down
2 changes: 1 addition & 1 deletion src/fscacher/tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import pytest
from ..cache import xor_bytes
from ..util import xor_bytes


@pytest.mark.parametrize(
Expand Down
Loading