diff --git a/src/azanium/install.py b/src/azanium/install.py index 7b7e784..4ac185a 100644 --- a/src/azanium/install.py +++ b/src/azanium/install.py @@ -140,7 +140,26 @@ def pseudoace(context, afct, **kw): to_directory=download_dir) tempdir = tempfile.mkdtemp() with tarfile.open(dl_path) as tf: - tf.extractall(path=tempdir) + def is_within_directory(directory, target): + + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return prefix == abs_directory + + def safe_extract(tar, path=".", members=None, *, numeric_owner=False): + + for member in tar.getmembers(): + member_path = os.path.join(path, member.name) + if not is_within_directory(path, member_path): + raise Exception("Attempted Path Traversal in Tar File") + + tar.extractall(path, members, numeric_owner=numeric_owner) + + + safe_extract(tf, path=tempdir) archive_filename = os.path.split(dl_path)[-1] fullname = archive_filename.rsplit('.', 2)[0] tmp_src_path = os.path.join(tempdir, fullname) diff --git a/src/azanium/runcommand.py b/src/azanium/runcommand.py index 53a2012..db390bf 100644 --- a/src/azanium/runcommand.py +++ b/src/azanium/runcommand.py @@ -83,7 +83,26 @@ def acedb_database(context, afct, file_selector_regexp, for path in downloaded: with tarfile.open(path) as tf: logger.info('Extracting {} to {}', path, afct.install_dir) - tf.extractall(path=afct.install_dir) + def is_within_directory(directory, target): + + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return prefix == abs_directory + + def safe_extract(tar, path=".", members=None, *, numeric_owner=False): + + for member in tar.getmembers(): + member_path = os.path.join(path, member.name) + if not is_within_directory(path, member_path): + raise Exception("Attempted Path Traversal in Tar File") + + tar.extractall(path, members, numeric_owner=numeric_owner) + + + safe_extract(tf, path=afct.install_dir) # Enable the Dump command (requires adding user to ACeDB pw file) passwd_path = os.path.join(wspec_dir, 'passwd.wrm')