diff --git a/dfvfs/helpers/source_scanner.py b/dfvfs/helpers/source_scanner.py index 3cbd43bb..4234db91 100644 --- a/dfvfs/helpers/source_scanner.py +++ b/dfvfs/helpers/source_scanner.py @@ -460,6 +460,13 @@ def _ScanNode(self, scan_context, scan_node, auto_recurse=True): if not auto_recurse: return + else: + # Check for a compressed RAW storage media image. + source_path_spec = self._ScanForCompressedStream(scan_node.path_spec) + if source_path_spec: + scan_node.scanned = True + scan_node = scan_context.AddScanNode(source_path_spec, scan_node) + # In case we did not find a storage media image type we keep looking # since not all RAW storage media image naming schemas are known and # its type can only detected by its content. @@ -595,6 +602,43 @@ def _ScanEncryptedVolumeNode(self, scan_context, scan_node): if path_spec: scan_context.AddScanNode(path_spec, scan_node.parent_node) + def _ScanForCompressedStream(self, source_path_spec): + """Scans the path specification for a supported compressed stream type. + + Args: + source_path_spec (PathSpec): source path specification. + + Returns: + PathSpec: file system path specification or None if no supported + compressed stream type was found. + + Raises: + BackEndError: if the source cannot be scanned or more than one compressed + stream type is found. + """ + try: + type_indicators = analyzer.Analyzer.GetCompressedStreamTypeIndicators( + source_path_spec, resolver_context=self._resolver_context) + except RuntimeError as exception: + raise errors.BackEndError(( + f'Unable to process source path specification with error: ' + f'{exception!s}')) + + if not type_indicators: + return None + + type_indicator = type_indicators[0] + if len(type_indicators) > 1: + raise errors.BackEndError( + 'Unsupported source found more than one compressed stream types.') + + if type_indicator != definitions.TYPE_INDICATOR_GZIP: + raise errors.BackEndError( + f'Unsupported compressed stream type indicator: {type_indicator:s}') + + return path_spec_factory.Factory.NewPathSpec( + definitions.TYPE_INDICATOR_GZIP, parent=source_path_spec) + def _ScanVolumeSystemRootNode( self, scan_context, scan_node, auto_recurse=True): """Scans a volume system root node for supported formats. diff --git a/test_data/ext2.raw.gz b/test_data/ext2.raw.gz new file mode 100644 index 00000000..a395ca4e Binary files /dev/null and b/test_data/ext2.raw.gz differ diff --git a/test_data/mbr.raw.gz b/test_data/mbr.raw.gz new file mode 100644 index 00000000..82912cf4 Binary files /dev/null and b/test_data/mbr.raw.gz differ diff --git a/tests/helpers/source_scanner.py b/tests/helpers/source_scanner.py index 47068e28..63ec41bb 100644 --- a/tests/helpers/source_scanner.py +++ b/tests/helpers/source_scanner.py @@ -782,6 +782,48 @@ def testScanOnRAW(self): self.assertEqual(len(scan_node.sub_nodes), 0) + def testScanOnGzipCompressedRAW(self): + """Test the Scan function on a gzip compressed RAW image.""" + test_path = self._GetTestFilePath(['ext2.raw.gz']) + self._SkipIfPathNotExists(test_path) + + scan_context = source_scanner.SourceScannerContext() + scan_context.OpenSourcePath(test_path) + + self._source_scanner.Scan(scan_context) + self.assertEqual( + scan_context.source_type, definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE) + + scan_node = self._GetTestScanNode(scan_context) + self.assertIsNotNone(scan_node) + self.assertIsNotNone(scan_node.path_spec) + self.assertEqual( + scan_node.type_indicator, definitions.PREFERRED_EXT_BACK_END) + + self.assertEqual(len(scan_node.sub_nodes), 0) + + test_path = self._GetTestFilePath(['mbr.raw.gz']) + self._SkipIfPathNotExists(test_path) + + scan_context = source_scanner.SourceScannerContext() + scan_context.OpenSourcePath(test_path) + + self._source_scanner.Scan(scan_context) + self.assertEqual( + scan_context.source_type, definitions.SOURCE_TYPE_STORAGE_MEDIA_IMAGE) + + scan_node = self._GetTestScanNode(scan_context) + self.assertIsNotNone(scan_node) + self.assertEqual( + scan_node.type_indicator, definitions.TYPE_INDICATOR_TSK_PARTITION) + + self.assertEqual(len(scan_node.sub_nodes), 8) + + scan_node = scan_node.sub_nodes[6].GetSubNodeByLocation('/') + self.assertIsNotNone(scan_node) + self.assertEqual( + scan_node.type_indicator, definitions.PREFERRED_EXT_BACK_END) + def testScanOnNonExisting(self): """Test the Scan function on non-existing image file.""" test_path = self._GetTestFilePath(['nosuchfile.raw'])