Skip to content

Commit

Permalink
recursiveloader: use less memory in assert_directory_verifies
Browse files Browse the repository at this point in the history
In order to avoid loading all manifest entries into memory
at once, make assert_directory_verifies use a shared sort_key
to iterate over directories and manifest entries in the same
order.

For verification of the entire gentoo tree, my tests have
shown that this change reduces the memory footprint by about
63%, while consuming about 20% more time.
  • Loading branch information
zmedico committed Dec 18, 2017
1 parent 21a1d34 commit a0ce724
Showing 1 changed file with 161 additions and 37 deletions.
198 changes: 161 additions & 37 deletions gemato/recursiveloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ def _iter_unordered_manifests_for_path(self, path, recursive=False):
elif recursive and gemato.util.path_starts_with(d, path):
yield (k, d, v)

def _iter_manifests_for_path(self, path, recursive=False):
def _iter_manifests_for_path(self, path, recursive=False, sort_key=lambda kdv: len(kdv[1])):
"""
Iterate over loaded Manifests that can apply to path.
If @recursive is True, returns also Manifests for subdirectories
Expand All @@ -354,7 +354,7 @@ def _iter_manifests_for_path(self, path, recursive=False):
return sorted(
self._iter_unordered_manifests_for_path(
path, recursive=recursive),
key=lambda kdv: len(kdv[1]),
key=sort_key,
reverse=True)

def load_manifests_for_path(self, path, recursive=False, verify=True):
Expand All @@ -368,20 +368,38 @@ def load_manifests_for_path(self, path, recursive=False, verify=True):
on mismatch. Otherwise, sub-Manifests will be loaded
unconditionally of whether they match parent checksums.
"""
for curmpath, relpath, m in self._iter_load_manifests_for_path(
path, recursive=recursive, verify=verify):
self.loaded_manifests[curmpath] = m

def _iter_load_manifests_for_path(self, path, recursive=False, verify=True,
sort_key=lambda kdv: kdv[1].split(os.sep)):
"""
Traverse manifests in depth-first order with directories sorted by
name. Only hold references to a minimum number of ManifestFile
instances, in order to conserve memory.
The caller can traverse manifest and directory iterators in unison,
minimizing the amount of data in memory.
"""
pool = multiprocessing.Pool(processes=self.max_jobs)

# Manifests pop from the stack in depth-first order
manifest_stack = list(self._iter_manifests_for_path(path,
recursive=recursive, sort_key=sort_key))
traversed = set(curmpath for curmpath, relpath, m in manifest_stack)
try:
# TODO: figure out how to avoid confusing uses of 'recursive'
while True:
while manifest_stack:
to_load = []
for curmpath, relpath, m in self._iter_manifests_for_path(
path, recursive):
for e in m.entries:
curmpath, relpath, m = manifest_stack.pop()
yield (curmpath, relpath, m)

for e in m.entries:
if e.tag != 'MANIFEST':
continue
mpath = os.path.join(relpath, e.path)
if curmpath == mpath or mpath in self.loaded_manifests:
if curmpath == mpath or mpath in traversed:
continue
mdir = os.path.dirname(mpath)
if not verify:
Expand All @@ -390,12 +408,19 @@ def load_manifests_for_path(self, path, recursive=False, verify=True):
to_load.append((mpath, e))
elif recursive and gemato.util.path_starts_with(mdir, path):
to_load.append((mpath, e))
if not to_load:
break

manifests = pool.imap_unordered(self.manifest_loader, to_load,
chunksize=16)
self.loaded_manifests.update(manifests)

manifests = [(mpath, os.path.dirname(mpath), e)
for mpath, e in manifests]

# Manifests pop from the stack in depth-first order
manifests.sort(key=sort_key, reverse=True)
for mpath, mdir, m in manifests:
traversed.add(mpath)
manifest_stack.append(
(mpath, os.path.dirname(mpath), m))

pool.close()
pool.join()
Expand Down Expand Up @@ -511,13 +536,54 @@ def get_file_entry_dict(self, path='', only_types=None,
be verified against MANIFEST entries. Pass False only when
doing updates.
"""
out = {}
for dirpath, dirout in self._iter_file_entry_dict(
path=path, only_types=only_types,
verify_manifests=verify_manifests):
other = out.get(dirpath)
if other is None:
out[dirpath] = dirout
else:
# This happens due to the relpath = '' setting
# for all DIST entries.
for filename, e in dirout.items():
if filename in other:
e = self._merge_entries(other[filename], e)
other[filename] = e
return out

self.load_manifests_for_path(path, recursive=True,
verify=verify_manifests)
@staticmethod
def _merge_entries(e1, e2):
# compare the two entries
ret, diff = gemato.verify.verify_entry_compatibility(
e1, e2)
if not ret:
raise gemato.exceptions.ManifestIncompatibleEntry(
e1, e2, diff)
# we need to construct a single entry with both checksums
if diff:
new_checksums = dict(e2.checksums)
for k, d1, d2 in diff:
if d2 is None:
new_checksums[k] = d1
e1 = type(e1)(e1.path, e1.size, new_checksums)
return e1

def _iter_file_entry_dict(self, path='', only_types=None,
verify_manifests=True,
sort_key=lambda p: p.split(os.sep)):
out = {}
for mpath, relpath, m in self._iter_manifests_for_path(path,
recursive=True):
for e in m.entries:
dir_stack = [path]
iter_load = self._iter_load_manifests_for_path(path,
recursive=True, verify=verify_manifests)
mpath, mdir, m = next(iter_load, (None, None, None))

while dir_stack or mdir is not None:
if not dir_stack or (mdir is not None and
sort_key(mdir) <= sort_key(dir_stack[-1])):
subdirs = []
relpath = mdir
for e in m.entries:
if only_types is not None:
if e.tag not in only_types:
continue
Expand All @@ -533,23 +599,20 @@ def get_file_entry_dict(self, path='', only_types=None,
if gemato.util.path_starts_with(fullpath, path):
dirpath = os.path.dirname(fullpath)
filename = os.path.basename(e.path)
subdirs.append(dirpath)
dirout = out.setdefault(dirpath, {})
if filename in dirout:
# compare the two entries
ret, diff = gemato.verify.verify_entry_compatibility(
dirout[filename], e)
if not ret:
raise gemato.exceptions.ManifestIncompatibleEntry(
dirout[filename], e, diff)
# we need to construct a single entry with both checksums
if diff:
new_checksums = dict(e.checksums)
for k, d1, d2 in diff:
if d2 is None:
new_checksums[k] = d1
e = type(e)(e.path, e.size, new_checksums)
e = self._merge_entries(dirout[filename], e)
dirout[filename] = e
return out
subdirs.sort(key=sort_key, reverse=True)
dir_stack.extend(subdirs)
mpath, mdir, m = next(iter_load, (None, None, None))
else:
dirpath = dir_stack.pop()
try:
yield dirpath, out.pop(dirpath)
except KeyError:
pass

def assert_directory_verifies(self, path='',
fail_handler=gemato.util.throw_exception,
Expand Down Expand Up @@ -580,22 +643,83 @@ def assert_directory_verifies(self, path='',
to None (the default), the number of system CPUs will be used.
"""

entry_dict = self.get_file_entry_dict(path)
remaining_entries = {}
entry_iter = self._iter_file_entry_dict(path)
it = os.walk(os.path.join(self.root_directory, path),
onerror=gemato.util.throw_exception,
followlinks=True)
sort_key = lambda p: p.split(os.sep)
dir_stack = []

def _walk_directory(it):
"""
Pre-process os.walk() result for verification. Yield objects
suitable to passing to subprocesses.
"""
for dirpath, dirnames, filenames in it:
relpath = os.path.relpath(dirpath, self.root_directory)
# strip dot to avoid matching problems
if relpath == '.':
relpath = ''
dirdict = entry_dict.pop(relpath, {})
pop_until = None
entry_dir, entry_dict = next(entry_iter, (None, None))
while True:
if pop_until is not None:
dirpath, dirnames, filenames, relpath = dir_stack.pop()
if pop_until is relpath:
pop_until = None
elif (dir_stack and entry_dir is not None and
gemato.util.path_starts_with(dir_stack[-1][-1], entry_dir)):
dirpath, dirnames, filenames, relpath = dir_stack.pop()
else:
try:
dirpath, dirnames, filenames = next(it)
except StopIteration:
while entry_dir is not None:
remaining_entries[entry_dir] = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))
break

relpath = os.path.relpath(dirpath, self.root_directory)

# strip dot to avoid matching problems
if relpath == '.':
relpath = ''

dirnames.sort()

if relpath == entry_dir:
dirdict = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))
elif entry_dir is not None and gemato.util.path_starts_with(relpath, entry_dir):
dirdict = {}
else:
relpath_key = sort_key(relpath)
if dir_stack and entry_dir is not None:
entry_dir_key = sort_key(entry_dir)
if relpath_key > entry_dir_key and entry_dir_key <= sort_key(dir_stack[-1][-1]):
# Try to insert it into the stack for later processing.
for i, item in enumerate(dir_stack):
if item[-1] and relpath_key > sort_key(item[-1]):
dir_stack.insert(i, (dirpath, dirnames, filenames, relpath))
dirpath = None
break
if dirpath is None:
if pop_until is None:
pop_until = relpath
continue
while entry_dir is not None and relpath_key > sort_key(entry_dir):
remaining_entries[entry_dir] = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))

if relpath == entry_dir:
dirdict = entry_dict
entry_dir, entry_dict = next(entry_iter, (None, None))
elif entry_dir is not None:
relpath_key = sort_key(relpath)
entry_dir_key = sort_key(entry_dir)
if relpath_key < entry_dir_key and len(relpath_key) <= len(entry_dir_key):
dir_stack.append((dirpath, dirnames, filenames, relpath))
continue
else:
dirdict = {}
else:
dirdict = {}

skip_dirs = []
for d in dirnames:
Expand Down Expand Up @@ -643,7 +767,7 @@ def _walk_directory(it):
pool.close()

# check for missing directories
for relpath, dirdict in entry_dict.items():
for relpath, dirdict in remaining_entries.items():
for f, e in dirdict.items():
fpath = os.path.join(relpath, f)
syspath = os.path.join(self.root_directory, fpath)
Expand Down

0 comments on commit a0ce724

Please sign in to comment.