Skip to content

Commit

Permalink
Merge branch 'develop' into feature/data-renderer
Browse files Browse the repository at this point in the history
  • Loading branch information
ikelos authored Feb 15, 2025
2 parents 07b0503 + 81bac4d commit 9d03f50
Show file tree
Hide file tree
Showing 7 changed files with 384 additions and 59 deletions.
263 changes: 262 additions & 1 deletion volatility3/framework/plugins/linux/pagecache.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,15 @@
import math
import logging
import datetime
import time
import tarfile
from dataclasses import dataclass, astuple
from typing import IO, List, Set, Type, Iterable, Tuple
from io import BytesIO
from pathlib import PurePath

from volatility3.framework.constants import architectures
from volatility3.framework import renderers, interfaces, exceptions
from volatility3.framework import constants, renderers, interfaces, exceptions
from volatility3.framework.renderers import format_hints
from volatility3.framework.interfaces import plugins
from volatility3.framework.configuration import requirements
Expand Down Expand Up @@ -625,3 +629,260 @@ def run(self):
return renderers.TreeGrid(
headers, Files.format_fields_with_headers(headers, self._generator())
)


class RecoverFs(plugins.PluginInterface):
"""Recovers the cached filesystem (directories, files, symlinks) into a compressed tarball.
Details: level 0 directories are named after the UUID of the parent superblock; metadata aren't replicated to extracted objects; objects modification time is set to the plugin run time; absolute symlinks
are converted to relative symlinks to prevent referencing the analyst's filesystem.
Troubleshooting: to fix extraction errors related to long paths, please consider using https://github.com/mxmlnkn/ratarmount.
"""

_version = (1, 0, 0)
_required_framework_version = (2, 21, 0)

@classmethod
def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]:
return [
requirements.ModuleRequirement(
name="kernel",
description="Linux kernel",
architectures=architectures.LINUX_ARCHS,
),
requirements.PluginRequirement(
name="files", plugin=Files, version=(1, 1, 0)
),
requirements.PluginRequirement(
name="inodepages", plugin=InodePages, version=(3, 0, 0)
),
requirements.ChoiceRequirement(
name="compression_format",
description="Compression format (default: gz)",
choices=["gz", "bz2", "xz"],
default="gz",
optional=True,
),
]

def _tar_add_reg_inode(
self,
context: interfaces.context.ContextInterface,
layer_name: str,
tar: tarfile.TarFile,
reg_inode_in: InodeInternal,
path_prefix: str = "",
mtime: float = None,
) -> int:
"""Extracts a REG inode content and writes it to a TarFile object.
Args:
context: The context on which to operate
layer_name: The name of the layer on which to operate
tar: The TarFile object to write to
reg_inode_in: The inode to extract content from
path_prefix: A custom path prefix to prepend the inode path with
mtime: The modification time to set the TarInfo object to
Returns:
The number of extracted bytes
"""
inode_content_buffer = BytesIO()
InodePages.write_inode_content_to_stream(
context, layer_name, reg_inode_in.inode, inode_content_buffer
)
inode_content_buffer.seek(0)
handle_buffer_size = inode_content_buffer.getbuffer().nbytes

tar_info = tarfile.TarInfo(path_prefix + reg_inode_in.path)
# The tarfile module only has read support for sparse files:
# https://docs.python.org/3.12/library/tarfile.html#tarfile.LNKTYPE:~:text=and%20longlink%20extensions%2C-,read%2Donly%20support,-for%20all%20variants
tar_info.type = tarfile.REGTYPE
tar_info.size = handle_buffer_size
tar_info.mode = 0o444
if mtime is not None:
tar_info.mtime = mtime
tar.addfile(tar_info, inode_content_buffer)

return handle_buffer_size

def _tar_add_dir(
self,
tar: tarfile.TarFile,
directory_path: str,
mtime: float = None,
) -> None:
"""Adds a directory path to a TarFile object, based on a DIR inode.
Args:
tar: The TarFile object to write to
directory_path: The directory path to create
mtime: The modification time to set the TarInfo object to
"""
tar_info = tarfile.TarInfo(directory_path)
tar_info.type = tarfile.DIRTYPE
tar_info.mode = 0o755
if mtime is not None:
tar_info.mtime = mtime
tar.addfile(tar_info)

def _tar_add_lnk(
self,
tar: tarfile.TarFile,
symlink_source: str,
symlink_dest: str,
symlink_source_prefix: str = "",
mtime: float = None,
) -> None:
"""Adds a symlink to a TarFile object.
Args:
tar: The TarFile object to write to
symlink_source: The symlink source path
symlink_dest: The symlink target/destination
symlink_source_prefix: A custom path prefix to prepend the symlink source with
mtime: The modification time to set the TarInfo object to
"""
# Patch symlinks pointing to absolute paths,
# to prevent referencing the host filesystem.
if symlink_dest.startswith("/"):
relative_dest = PurePath(symlink_dest).relative_to(PurePath("/"))
# Remove the leading "/" to prevent an extra undesired "../" in the output
symlink_dest = (
PurePath(
*[".."] * len(PurePath(symlink_source.lstrip("/")).parent.parts)
)
/ relative_dest
).as_posix()
tar_info = tarfile.TarInfo(symlink_source_prefix + symlink_source)
tar_info.type = tarfile.SYMTYPE
tar_info.linkname = symlink_dest
tar_info.mode = 0o444
if mtime is not None:
tar_info.mtime = mtime
tar.addfile(tar_info)

def _generator(self):
vmlinux_module_name = self.config["kernel"]
vmlinux = self.context.modules[vmlinux_module_name]
vmlinux_layer = self.context.layers[vmlinux.layer_name]
tar_buffer = BytesIO()
tar = tarfile.open(
fileobj=tar_buffer,
mode=f"w:{self.config['compression_format']}",
)
# Set a unique timestamp for all extracted files
mtime = time.time()

inodes_iter = Files.get_inodes(
context=self.context,
vmlinux_module_name=vmlinux_module_name,
follow_symlinks=False,
)

# Prefix paths with the superblock UUID's to prevent overlaps.
# Switch to device major and device minor for older kernels (< 2.6.39-rc1).
uuid_as_prefix = vmlinux.get_type("super_block").has_member("s_uuid")
if not uuid_as_prefix:
vollog.warning(
"super_block struct does not support s_uuid attribute. Consequently, level 0 directories won't refer to the superblock uuid's, but to its device_major:device_minor numbers."
)

visited_paths = seen_prefixes = set()
for inode_in in inodes_iter:

# Code is slightly duplicated here with the if-block below.
# However this prevents unneeded tar manipulation if fifo
# or sock inodes come through for example.
if not (
inode_in.inode.is_reg or inode_in.inode.is_dir or inode_in.inode.is_link
):
continue

if not inode_in.path.startswith("/"):
vollog.debug(
f'Skipping processing of potentially smeared "{inode_in.path}" inode name as it does not starts with a "/".'
)
continue

# Construct the output path
if uuid_as_prefix:
prefix = f"/{inode_in.superblock.uuid}"
else:
prefix = f"/{inode_in.superblock.major}:{inode_in.superblock.minor}"
prefixed_path = prefix + inode_in.path

# Sanity check for already processed paths
if prefixed_path in visited_paths:
vollog.log(
constants.LOGLEVEL_VV,
f'Already processed prefixed inode path: "{prefixed_path}".',
)
continue
elif prefix not in seen_prefixes:
self._tar_add_dir(tar, prefix, mtime)
seen_prefixes.add(prefix)

visited_paths.add(prefixed_path)
extracted_file_size = renderers.NotApplicableValue()

# Inodes parent directory is yielded first, which
# ensures that a file parent path will exist beforehand.
# tarfile will take care of creating it anyway.
if inode_in.inode.is_reg:
extracted_file_size = self._tar_add_reg_inode(
self.context,
vmlinux_layer.name,
tar,
inode_in,
prefix,
mtime,
)
elif inode_in.inode.is_dir:
self._tar_add_dir(tar, prefixed_path, mtime)
elif (
inode_in.inode.is_link
and inode_in.inode.has_member("i_link")
and inode_in.inode.i_link
and inode_in.inode.i_link.is_readable()
):
symlink_dest = inode_in.inode.i_link.dereference().cast(
"string", max_length=255, encoding="utf-8", errors="replace"
)
self._tar_add_lnk(tar, inode_in.path, symlink_dest, prefix, mtime)
# Set path to a user friendly representation before yielding
inode_in.path = InodeUser.format_symlink(inode_in.path, symlink_dest)
else:
continue

inode_out = inode_in.to_user(vmlinux_layer)
yield (0, astuple(inode_out) + (extracted_file_size,))

tar.close()
tar_buffer.seek(0)
output_filename = f"recovered_fs.tar.{self.config['compression_format']}"
with self.open(output_filename) as f:
f.write(tar_buffer.getvalue())

def run(self):
headers = [
("SuperblockAddr", format_hints.Hex),
("MountPoint", str),
("Device", str),
("InodeNum", int),
("InodeAddr", format_hints.Hex),
("FileType", str),
("InodePages", int),
("CachedPages", int),
("FileMode", str),
("AccessTime", datetime.datetime),
("ModificationTime", datetime.datetime),
("ChangeTime", datetime.datetime),
("FilePath", str),
("InodeSize", int),
("Recovered FileSize", int),
]

return renderers.TreeGrid(
headers, Files.format_fields_with_headers(headers, self._generator())
)
16 changes: 13 additions & 3 deletions volatility3/framework/plugins/windows/cmdline.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def _generator(self, procs):
for proc in procs:
process_name = utility.array_to_string(proc.ImageFileName)
proc_id = "Unknown"
result_text = None

try:
proc_id = proc.UniqueProcessId
Expand All @@ -78,13 +79,22 @@ def _generator(self, procs):
)

except exceptions.SwappedInvalidAddressException as exp:
result_text = f"Required memory at {exp.invalid_address:#x} is inaccessible (swapped)"
vollog.debug(
f"Required memory at {exp.invalid_address:#x} is inaccessible (swapped)"
)

except exceptions.PagedInvalidAddressException as exp:
result_text = f"Required memory at {exp.invalid_address:#x} is not valid (process exited?)"
vollog.debug(
f"Required memory at {exp.invalid_address:#x} is not valid (process exited?)"
)

except exceptions.InvalidAddressException as exp:
result_text = f"Process {proc_id}: Required memory at {exp.invalid_address:#x} is not valid (incomplete layer {exp.layer_name}?)"
vollog.debug(
f"Process {proc_id}: Required memory at {exp.invalid_address:#x} is not valid (incomplete layer {exp.layer_name}?)"
)

if not result_text:
result_text = renderers.UnreadableValue()

yield (0, (proc.UniqueProcessId, process_name, result_text))

Expand Down
46 changes: 27 additions & 19 deletions volatility3/framework/plugins/windows/dumpfiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,16 @@ def get_requirements(cls) -> List[interfaces.configuration.RequirementInterface]
description="Process ID to include (all other processes are excluded)",
optional=True,
),
requirements.IntRequirement(
requirements.ListRequirement(
name="virtaddr",
description="Dump a single _FILE_OBJECT at this virtual address",
element_type=int,
description="Dump the _FILE_OBJECTs at the given virtual address(es)",
optional=True,
),
requirements.IntRequirement(
requirements.ListRequirement(
name="physaddr",
description="Dump a single _FILE_OBJECT at this physical address",
element_type=int,
description="Dump a single _FILE_OBJECTs at the given physical address(es)",
optional=True,
),
requirements.StringRequirement(
Expand Down Expand Up @@ -318,24 +320,26 @@ def _generator(self, procs: List, offsets: List):
)

elif offsets:
virtual_layer_name = kernel.layer_name

# FIXME - change this after standard access to physical layer
physical_layer_name = self.context.layers[virtual_layer_name].config[
"memory_layer"
]

# Now process any offsets explicitly requested by the user.
for offset, is_virtual in offsets:
try:
layer_name = kernel.layer_name
# switch to a memory layer if the user provided --physaddr instead of --virtaddr
if not is_virtual:
layer_name = self.context.layers[layer_name].config[
"memory_layer"
]

file_obj = self.context.object(
kernel.symbol_table_name + constants.BANG + "_FILE_OBJECT",
layer_name=layer_name,
native_layer_name=kernel.layer_name,
layer_name=(
virtual_layer_name if is_virtual else physical_layer_name
),
native_layer_name=virtual_layer_name,
offset=offset,
)
for result in self.process_file_object(
self.context, kernel.layer_name, self.open, file_obj
self.context, virtual_layer_name, self.open, file_obj
):
yield (0, result)
except exceptions.InvalidAddressException:
Expand All @@ -355,11 +359,15 @@ def run(self):
):
raise ValueError("Cannot use filter flag with an address flag")

if self.config.get("virtaddr", None) is not None:
offsets.append((self.config["virtaddr"], True))
elif self.config.get("physaddr", None) is not None:
offsets.append((self.config["physaddr"], False))
else:
if self.config.get("virtaddr"):
for virtaddr in self.config["virtaddr"]:
offsets.append((virtaddr, True))

if self.config.get("physaddr"):
for physaddr in self.config["physaddr"]:
offsets.append((physaddr, False))

if not offsets:
filter_func = pslist.PsList.create_pid_filter(
[self.config.get("pid", None)]
)
Expand Down
Loading

0 comments on commit 9d03f50

Please sign in to comment.