Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changed source scanner to support gzip compressed images #720

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions dfvfs/helpers/source_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,13 @@ def _ScanNode(self, scan_context, scan_node, auto_recurse=True):
if not auto_recurse:
return

else:
# Check for a compressed RAW storage media image.
source_path_spec = self._ScanForCompressedStream(scan_node.path_spec)
if source_path_spec:
scan_node.scanned = True
scan_node = scan_context.AddScanNode(source_path_spec, scan_node)

# In case we did not find a storage media image type we keep looking
# since not all RAW storage media image naming schemas are known and
# its type can only detected by its content.
Expand Down Expand Up @@ -595,6 +602,43 @@ def _ScanEncryptedVolumeNode(self, scan_context, scan_node):
if path_spec:
scan_context.AddScanNode(path_spec, scan_node.parent_node)

def _ScanForCompressedStream(self, source_path_spec):
"""Scans the path specification for a supported compressed stream type.

Args:
source_path_spec (PathSpec): source path specification.

Returns:
PathSpec: file system path specification or None if no supported
compressed stream type was found.

Raises:
BackEndError: if the source cannot be scanned or more than one compressed
stream type is found.
"""
try:
type_indicators = analyzer.Analyzer.GetCompressedStreamTypeIndicators(
source_path_spec, resolver_context=self._resolver_context)
except RuntimeError as exception:
raise errors.BackEndError((
f'Unable to process source path specification with error: '
f'{exception!s}'))

if not type_indicators:
return None

type_indicator = type_indicators[0]
if len(type_indicators) > 1:
raise errors.BackEndError(
'Unsupported source found more than one compressed stream types.')

if type_indicator != definitions.TYPE_INDICATOR_GZIP:
raise errors.BackEndError(
f'Unsupported compressed stream type indicator: {type_indicator:s}')

return path_spec_factory.Factory.NewPathSpec(
definitions.TYPE_INDICATOR_GZIP, parent=source_path_spec)

def _ScanVolumeSystemRootNode(
self, scan_context, scan_node, auto_recurse=True):
"""Scans a volume system root node for supported formats.
Expand Down
Binary file added test_data/ext2.raw.gz
Binary file not shown.
Binary file added test_data/mbr.raw.gz
Binary file not shown.
42 changes: 42 additions & 0 deletions tests/helpers/source_scanner.py
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,48 @@ def testScanOnRAW(self):

self.assertEqual(len(scan_node.sub_nodes), 0)

def testScanOnGzipCompressedRAW(self):
"""Test the Scan function on a gzip compressed RAW image."""
test_path = self._GetTestFilePath(['ext2.raw.gz'])
self._SkipIfPathNotExists(test_path)

scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(test_path)

self._source_scanner.Scan(scan_context)
self.assertEqual(
scan_context.source_type, definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE)

scan_node = self._GetTestScanNode(scan_context)
self.assertIsNotNone(scan_node)
self.assertIsNotNone(scan_node.path_spec)
self.assertEqual(
scan_node.type_indicator, definitions.PREFERRED_EXT_BACK_END)

self.assertEqual(len(scan_node.sub_nodes), 0)

test_path = self._GetTestFilePath(['mbr.raw.gz'])
self._SkipIfPathNotExists(test_path)

scan_context = source_scanner.SourceScannerContext()
scan_context.OpenSourcePath(test_path)

self._source_scanner.Scan(scan_context)
self.assertEqual(
scan_context.source_type, definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE)

scan_node = self._GetTestScanNode(scan_context)
self.assertIsNotNone(scan_node)
self.assertEqual(
scan_node.type_indicator, definitions.TYPE_INDICATOR_TSK_PARTITION)

self.assertEqual(len(scan_node.sub_nodes), 8)

scan_node = scan_node.sub_nodes[6].GetSubNodeByLocation('/')
self.assertIsNotNone(scan_node)
self.assertEqual(
scan_node.type_indicator, definitions.PREFERRED_EXT_BACK_END)

def testScanOnNonExisting(self):
"""Test the Scan function on non-existing image file."""
test_path = self._GetTestFilePath(['nosuchfile.raw'])
Expand Down