Skip to content

Commit

Permalink
Adjust for our purposes
Browse files Browse the repository at this point in the history
  • Loading branch information
jwodder committed Feb 14, 2022
1 parent fa0b29e commit 7ec27c1
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 104 deletions.
75 changes: 8 additions & 67 deletions src/fscacher/cache.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from collections import deque, namedtuple
from functools import wraps
from inspect import Parameter, signature
import logging
import os
import os.path as op
import shutil
import time
import appdirs
import joblib
from .fastio import walk
from .util import DirFingerprint, FileFingerprint

lgr = logging.getLogger(__name__)

Expand Down Expand Up @@ -114,7 +114,7 @@ def fingerprinter(*args, **kwargs):
if op.isdir(path):
fprint = self._get_dir_fingerprint(path)
else:
fprint = self._get_file_fingerprint(path)
fprint = FileFingerprint.for_file(path)
if fprint is None:
lgr.debug("Calling %s directly since no fingerprint for %r", f, path)
# just call the function -- we have no fingerprint,
Expand Down Expand Up @@ -143,67 +143,8 @@ def fingerprinter(*args, **kwargs):
return fingerprinter

@staticmethod
def _get_file_fingerprint(path):
"""Simplistic generic file fingerprinting based on ctime, mtime, and size
"""
try:
# we can't take everything, since atime can change, etc.
# So let's take some
s = os.stat(path, follow_symlinks=True)
fprint = FileFingerprint.from_stat(s)
lgr.log(5, "Fingerprint for %s: %s", path, fprint)
return fprint
except Exception as exc:
lgr.debug(f"Cannot fingerprint {path}: {exc}")

@staticmethod
def _get_dir_fingerprint(path):
fprint = DirFingerprint()
dirqueue = deque([path])
try:
while dirqueue:
d = dirqueue.popleft()
with os.scandir(d) as entries:
for e in entries:
if e.is_dir(follow_symlinks=True):
dirqueue.append(e.path)
else:
s = e.stat(follow_symlinks=True)
fprint.add_file(e.path, FileFingerprint.from_stat(s))
except Exception as exc:
lgr.debug(f"Cannot fingerprint {path}: {exc}")
return None
else:
return fprint


class FileFingerprint(namedtuple("FileFingerprint", "mtime_ns ctime_ns size inode")):
@classmethod
def from_stat(cls, s):
return cls(s.st_mtime_ns, s.st_ctime_ns, s.st_size, s.st_ino)

def modified_in_window(self, min_dtime):
return abs(time.time() - self.mtime_ns * 1e-9) < min_dtime

def to_tuple(self):
return tuple(self)


class DirFingerprint:
def __init__(self):
self.last_modified = None
self.tree_fprints = {}

def add_file(self, path, fprint: FileFingerprint):
self.tree_fprints[path] = fprint
if self.last_modified is None or self.last_modified < fprint.mtime_ns:
self.last_modified = fprint.mtime_ns

def modified_in_window(self, min_dtime):
if self.last_modified is None:
return False
else:
return abs(time.time() - self.last_modified * 1e-9) < min_dtime

def to_tuple(self):
return sum(sorted(self.tree_fprints.items()), ())
def _get_dir_fingerprint(dirpath):
dprint = DirFingerprint()
for path, fprint in walk(dirpath):
dprint.add_file(path, fprint)
return dprint
55 changes: 18 additions & 37 deletions src/fscacher/fastio.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@
# ==============================================================================
"""Routines for multi-threaded i/o."""

from hashlib import md5
import logging
import os
import sys
import threading
from .util import FileFingerprint

lgr = logging.getLogger(__name__)


def walk(top, threads=60):
Expand All @@ -38,24 +40,24 @@ def walk(top, threads=60):
threads: Size of fixed thread pool.
Yields:
A (path, subdirs, files) tuple for each directory within top, including
itself. These tuples come in no particular order; however, the contents
of each tuple itself is sorted.
A (path, fingerprint) pair for each file within top. These pairs come in
no particular order.
"""
if not os.path.isdir(top):
return
lock = threading.Lock()
on_input = threading.Condition(lock)
on_output = threading.Condition(lock)
state = {"tasks": 1}
tasks = 1
paths = [top]
output = []

def worker():
nonlocal tasks
while True:
with lock:
while True:
if not state["tasks"]:
if not tasks:
output.append(None)
on_output.notify()
return
Expand All @@ -65,34 +67,27 @@ def worker():
path = paths.pop()
break
try:
dirs = []
files = []
for item in sorted(os.listdir(path)):
subpath = os.path.join(path, item)
if os.path.isdir(subpath):
dirs.append(item)
with lock:
state["tasks"] += 1
tasks += 1
paths.append(subpath)
on_input.notify()
else:
with open(subpath, "rb") as fp:
digest = md5()
digest.update(fp.read())
files.append((item, digest.hexdigest()))
with lock:
output.append((path, dirs, files))
on_output.notify()
except OSError as e:
print(e, file=sys.stderr)
with lock:
output.append((subpath, FileFingerprint.for_file(subpath)))
on_output.notify()
except OSError:
lgr.exception("Error scanning directory %s", path)
finally:
with lock:
state["tasks"] -= 1
if not state["tasks"]:
tasks -= 1
if not tasks:
on_input.notify_all()

workers = [
threading.Thread(target=worker, name="fastio.walk %d %s" % (i, top))
threading.Thread(target=worker, name=f"fastio.walk {i} {top}", daemon=True)
for i in range(threads)
]
for w in workers:
Expand All @@ -106,17 +101,3 @@ def worker():
yield item
else:
threads -= 1


if __name__ == "__main__":
loc = sys.argv[1]
if len(sys.argv) > 2:
nthreads = int(sys.argv[2])
gen = walk(loc, threads=nthreads)
else:
gen = walk(loc)
filecount = 0
for val in gen:
filecount += len(val[2])
print(val)
print(f"Total: {filecount}")
52 changes: 52 additions & 0 deletions src/fscacher/util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from collections import namedtuple
import logging
import os
import time

lgr = logging.getLogger(__name__)


class FileFingerprint(namedtuple("FileFingerprint", "mtime_ns ctime_ns size inode")):
@classmethod
def for_file(cls, path):
"""Simplistic generic file fingerprinting based on ctime, mtime, and size
"""
try:
# we can't take everything, since atime can change, etc.
# So let's take some
s = os.stat(path, follow_symlinks=True)
fprint = cls.from_stat(s)
lgr.log(5, "Fingerprint for %s: %s", path, fprint)
return fprint
except Exception as exc:
lgr.debug(f"Cannot fingerprint {path}: {exc}")

@classmethod
def from_stat(cls, s):
return cls(s.st_mtime_ns, s.st_ctime_ns, s.st_size, s.st_ino)

def modified_in_window(self, min_dtime):
return abs(time.time() - self.mtime_ns * 1e-9) < min_dtime

def to_tuple(self):
return tuple(self)


class DirFingerprint:
def __init__(self):
self.last_modified = None
self.tree_fprints = {}

def add_file(self, path, fprint: FileFingerprint):
self.tree_fprints[path] = fprint
if self.last_modified is None or self.last_modified < fprint.mtime_ns:
self.last_modified = fprint.mtime_ns

def modified_in_window(self, min_dtime):
if self.last_modified is None:
return False
else:
return abs(time.time() - self.last_modified * 1e-9) < min_dtime

def to_tuple(self):
return sum(sorted(self.tree_fprints.items()), ())

0 comments on commit 7ec27c1

Please sign in to comment.