diff --git a/dfindexeddb/indexeddb/cli.py b/dfindexeddb/indexeddb/cli.py index 17b6fac..5246dce 100644 --- a/dfindexeddb/indexeddb/cli.py +++ b/dfindexeddb/indexeddb/cli.py @@ -20,6 +20,7 @@ import json import pathlib +from dfindexeddb import utils from dfindexeddb import version from dfindexeddb.indexeddb.chromium import blink from dfindexeddb.indexeddb.chromium import record as chromium_record @@ -36,7 +37,7 @@ class Encoder(json.JSONEncoder): """A JSON encoder class for dfindexeddb fields.""" def default(self, o): if dataclasses.is_dataclass(o): - o_dict = dataclasses.asdict(o) + o_dict = utils.asdict(o) return o_dict if isinstance(o, bytes): out = [] diff --git a/dfindexeddb/leveldb/cli.py b/dfindexeddb/leveldb/cli.py index ab4250e..8c23cda 100644 --- a/dfindexeddb/leveldb/cli.py +++ b/dfindexeddb/leveldb/cli.py @@ -19,11 +19,13 @@ import json import pathlib +from dfindexeddb import utils from dfindexeddb import version from dfindexeddb.leveldb import descriptor from dfindexeddb.leveldb import ldb from dfindexeddb.leveldb import log from dfindexeddb.leveldb import record +from dfindexeddb.leveldb.plugins import manager _VALID_PRINTABLE_CHARACTERS = ( @@ -37,7 +39,7 @@ class Encoder(json.JSONEncoder): def default(self, o): """Returns a serializable object for o.""" if dataclasses.is_dataclass(o): - o_dict = dataclasses.asdict(o) + o_dict = utils.asdict(o) return o_dict if isinstance(o, bytes): out = [] @@ -66,15 +68,37 @@ def _Output(structure, output): def DbCommand(args): """The CLI for processing leveldb folders.""" + if args.plugin and args.plugin == 'list': + for plugin, _ in manager.LeveldbPluginManager.GetPlugins(): + print(plugin) + return + elif args.plugin: + plugin_class = manager.LeveldbPluginManager.GetPlugin(args.plugin) + else: + plugin_class = None + for leveldb_record in record.FolderReader( args.source).GetRecords( use_manifest=args.use_manifest, use_sequence_number=args.use_sequence_number): - _Output(leveldb_record, output=args.output) + if plugin_class: + plugin_record = plugin_class.FromKeyValueRecord(leveldb_record) + _Output(plugin_record, output=args.output) + else: + _Output(leveldb_record, output=args.output) def LdbCommand(args): """The CLI for processing ldb files.""" + if args.plugin and args.plugin == 'list': + for plugin, _ in manager.LeveldbPluginManager.GetPlugins(): + print(plugin) + return + elif args.plugin: + plugin_class = manager.LeveldbPluginManager.GetPlugin(args.plugin) + else: + plugin_class = None + ldb_file = ldb.FileReader(args.source) if args.structure_type == 'blocks': @@ -85,7 +109,11 @@ def LdbCommand(args): elif args.structure_type == 'records' or not args.structure_type: # Prints key value record information. for key_value_record in ldb_file.GetKeyValueRecords(): - _Output(key_value_record, output=args.output) + if plugin_class: + plugin_record = plugin_class.FromKeyValueRecord(key_value_record) + _Output(plugin_record, output=args.output) + else: + _Output(key_value_record, output=args.output) else: print(f'{args.structure_type} is not supported for ldb files.') @@ -93,6 +121,15 @@ def LdbCommand(args): def LogCommand(args): """The CLI for processing log files.""" + if args.plugin and args.plugin == 'list': + for plugin, _ in manager.LeveldbPluginManager.GetPlugins(): + print(plugin) + return + elif args.plugin: + plugin_class = manager.LeveldbPluginManager.GetPlugin(args.plugin) + else: + plugin_class = None + log_file = log.FileReader(args.source) if args.structure_type == 'blocks': @@ -114,7 +151,11 @@ def LogCommand(args): or not args.structure_type): # Prints key value record information. for internal_key_record in log_file.GetParsedInternalKeys(): - _Output(internal_key_record, output=args.output) + if plugin_class: + plugin_record = plugin_class.FromKeyValueRecord(internal_key_record) + _Output(plugin_record, output=args.output) + else: + _Output(internal_key_record, output=args.output) else: print(f'{args.structure_type} is not supported for log files.') @@ -146,6 +187,7 @@ def DescriptorCommand(args): else: print(f'{args.structure_type} is not supported for descriptor files.') + def App(): """The CLI app entrypoint for parsing leveldb files.""" parser = argparse.ArgumentParser( @@ -182,6 +224,9 @@ def App(): 'repr'], default='json', help='Output format. Default is json') + parser_db.add_argument( + '--plugin', + help='Use plugin to parse records.') parser_db.set_defaults(func=DbCommand) parser_log = subparsers.add_parser( @@ -200,6 +245,9 @@ def App(): 'repr'], default='json', help='Output format. Default is json') + parser_log.add_argument( + '--plugin', + help='Use plugin to parse records.') parser_log.add_argument( '-t', '--structure_type', @@ -227,6 +275,9 @@ def App(): 'repr'], default='json', help='Output format. Default is json') + parser_ldb.add_argument( + '--plugin', + help='Use plugin to parse records.') parser_ldb.add_argument( '-t', '--structure_type', diff --git a/dfindexeddb/leveldb/plugins/__init__.py b/dfindexeddb/leveldb/plugins/__init__.py new file mode 100644 index 0000000..241442e --- /dev/null +++ b/dfindexeddb/leveldb/plugins/__init__.py @@ -0,0 +1,17 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Leveldb Plugin module.""" + +from dfindexeddb.leveldb.plugins import chrome_notifications diff --git a/dfindexeddb/leveldb/plugins/chrome_notifications.py b/dfindexeddb/leveldb/plugins/chrome_notifications.py new file mode 100644 index 0000000..d2955d1 --- /dev/null +++ b/dfindexeddb/leveldb/plugins/chrome_notifications.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Parser plugin for Chrome Notifications.""" +from __future__ import annotations + +import dataclasses +import logging + +from typing import Any, Union + +try: + from dfdatetime import webkit_time + from google.protobuf.json_format import MessageToJson + from dfindexeddb.leveldb.plugins import notification_database_data_pb2 as notification_pb2 + _has_import_dependencies = True +except ImportError as err: + _has_import_dependencies = False + logging.warning(f'Could not import dependencies for leveldb.plugins.chrome_notifications: %s', err) + +from dfindexeddb.indexeddb.chromium import blink +from dfindexeddb.leveldb.plugins import interface +from dfindexeddb.leveldb.plugins import manager +from dfindexeddb.leveldb import record + + +@dataclasses.dataclass +class ChromeNotificationRecord(interface.LeveldbPlugin): + src_file: str = None + offset: int = None + key: str = None + sequence_number: int = None + type: int = None + origin: str = None + service_worker_registration_id: int = None + notification_title: str = None + notification_direction: str = None + notification_lang: str = None + notification_body: str = None + notification_tag: str = None + notification_icon: str = None + notification_silent: bool = None + notification_data: str = None + notification_require_interaction: bool = None + notification_time: str = None + notification_renotify: bool = None + notification_badge: str = None + notification_image: str = None + notification_id: str = None + replaced_existing_notification: bool = None + num_clicks: int = None + num_action_button_clicks: int = None + creation_time: str = None + closed_reason: str = None + has_triggered: bool = None + + @classmethod + def FromKeyValueRecord( + cls, + ldb_record + ) -> ChromeNotificationRecord: + record = cls() + record.offset = ldb_record.offset + record.key = ldb_record.key.decode() + record.sequence_number = ldb_record.sequence_number + record.type = ldb_record.record_type + + if not ldb_record.value: + return record + + notification_proto = notification_pb2.NotificationDatabaseDataProto() + notification_proto.ParseFromString(ldb_record.value) + + record.origin = notification_proto.origin + record.service_worker_registration_id = ( + notification_proto.service_worker_registration_id) + record.notification_title = notification_proto.notification_data.title + record.notification_direction = ( + notification_proto.notification_data.direction) + record.notification_lang = notification_proto.notification_data.lang + record.notification_body = notification_proto.notification_data.body + record.notification_tag = notification_proto.notification_data.tag + record.notification_icon = notification_proto.notification_data.icon + record.notification_silent = notification_proto.notification_data.silent + record.notification_data = notification_proto.notification_data.data + record.notification_require_interaction = ( + notification_proto.notification_data.require_interaction) + record.notification_time = webkit_time.WebKitTime( + timestamp=notification_proto.notification_data.timestamp + ).CopyToDateTimeString() + record.notification_renotify = notification_proto.notification_data.renotify + record.notification_badge = notification_proto.notification_data.badge + record.notification_image = notification_proto.notification_data.image + record.notification_id = notification_proto.notification_id + record.replaced_existing_notification = ( + notification_proto.replaced_existing_notification) + record.num_clicks = notification_proto.num_clicks + record.num_action_button_clicks = ( + notification_proto.num_action_button_clicks) + record.creation_time = webkit_time.WebKitTime( + timestamp=notification_proto.creation_time_millis + ).CopyToDateTimeString() + record.closed_reason = notification_proto.closed_reason + record.has_triggered = notification_proto.has_triggered + + if not notification_proto.notification_data.data: + return record + + notification_data = blink.V8ScriptValueDecoder( + raw_data=notification_proto.notification_data.data).Deserialize() + record.notification_data = notification_data + + return record + + +# check if dependencies are in existence.. + +if _has_import_dependencies: + manager.PluginManager.RegisterPlugin(ChromeNotificationRecord) diff --git a/dfindexeddb/leveldb/plugins/interface.py b/dfindexeddb/leveldb/plugins/interface.py new file mode 100644 index 0000000..d003af3 --- /dev/null +++ b/dfindexeddb/leveldb/plugins/interface.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Interface for leveldb plugins.""" +from typing import Union + +from dfindexeddb.leveldb import record + + +class LeveldbPlugin: + + @classmethod + def FromLevelDBRecord(cls, + ldb_record: record.LevelDBRecord): + """Parses a leveldb record.""" + parsed_record = cls.FromKeyValueRecord(ldb_record.record) + ldb_record.record = parsed_record + return ldb_record + + @classmethod + def FromKeyValueRecord(cls, ldb_record): + """Parses a leveldb key value record.""" diff --git a/dfindexeddb/leveldb/plugins/manager.py b/dfindexeddb/leveldb/plugins/manager.py new file mode 100644 index 0000000..a9121d9 --- /dev/null +++ b/dfindexeddb/leveldb/plugins/manager.py @@ -0,0 +1,74 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Leveldb plugin manager.""" + +from dfindexeddb.leveldb.plugins import interface + + +class LeveldbPluginManager: + """The leveldb plugin manager.""" + + _class_registry = {} + + @classmethod + def GetPlugins(cls): + """Retrieves the registered leveldb plugins. + + Yields: + tuple: containing: + str: the name of the leveldb plugin. + class: the plugin class. + """ + yield from cls._class_registry.items() + + @classmethod + def GetPlugin(cls, plugin_name: str) -> interface.LeveldbPlugin: + """Retrieves a class object of a specific plugin. + + Args: + plugin_name: name of the plugin. + + Returns: + the LeveldbPlugin class. + + Raises: + KeyError: if the plugin is not found/registered in the manager. + """ + try: + return cls._class_registry[plugin_name] + except KeyError: + raise KeyError(f'Plugin not found: {plugin_name}') + + @classmethod + def RegisterPlugin(cls, plugin_class: interface.LeveldbPlugin): + """Registers a leveldb plugin. + + Args: + plugin_class (class): the plugin class to register. + + Raises: + KeyError: if class is already set for the corresponding name. + """ + plugin_name = plugin_class.__name__ + if plugin_name in cls._class_registry: + raise KeyError(f'Plugin already registered {plugin_name}') + cls._class_registry[plugin_name] = plugin_class + + @classmethod + def ClearPlugins(cls): + """Clears all plugin registrations.""" + cls._class_registry = {} + +PluginManager = LeveldbPluginManager() diff --git a/dfindexeddb/leveldb/plugins/notification_database_data_pb2.py b/dfindexeddb/leveldb/plugins/notification_database_data_pb2.py new file mode 100644 index 0000000..8ab8666 --- /dev/null +++ b/dfindexeddb/leveldb/plugins/notification_database_data_pb2.py @@ -0,0 +1,38 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: notification_database_data.proto +# Protobuf Python Version: 4.25.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n notification_database_data.proto\"\xff\t\n\x1dNotificationDatabaseDataProto\x12\"\n\x1apersistent_notification_id\x18\x01 \x01(\x03\x12\x17\n\x0fnotification_id\x18\x05 \x01(\t\x12\x0e\n\x06origin\x18\x02 \x01(\t\x12&\n\x1eservice_worker_registration_id\x18\x03 \x01(\x03\x12&\n\x1ereplaced_existing_notification\x18\x06 \x01(\x08\x12\x12\n\nnum_clicks\x18\x07 \x01(\x05\x12 \n\x18num_action_button_clicks\x18\x08 \x01(\x05\x12\x1c\n\x14\x63reation_time_millis\x18\t \x01(\x03\x12%\n\x1dtime_until_first_click_millis\x18\n \x01(\x03\x12$\n\x1ctime_until_last_click_millis\x18\x0b \x01(\x03\x12\x1f\n\x17time_until_close_millis\x18\x0c \x01(\x03\x12\x42\n\rclosed_reason\x18\r \x01(\x0e\x32+.NotificationDatabaseDataProto.ClosedReason\x12J\n\x11notification_data\x18\x04 \x01(\x0b\x32/.NotificationDatabaseDataProto.NotificationData\x12\x15\n\rhas_triggered\x18\x0e \x01(\x08\x1a\xba\x01\n\x12NotificationAction\x12\x0e\n\x06\x61\x63tion\x18\x01 \x01(\t\x12\r\n\x05title\x18\x02 \x01(\t\x12\x0c\n\x04icon\x18\x03 \x01(\t\x12\x44\n\x04type\x18\x04 \x01(\x0e\x32\x36.NotificationDatabaseDataProto.NotificationAction.Type\x12\x13\n\x0bplaceholder\x18\x05 \x01(\t\"\x1c\n\x04Type\x12\n\n\x06\x42UTTON\x10\x00\x12\x08\n\x04TEXT\x10\x01\x1a\xe4\x03\n\x10NotificationData\x12\r\n\x05title\x18\x01 \x01(\t\x12L\n\tdirection\x18\x02 \x01(\x0e\x32\x39.NotificationDatabaseDataProto.NotificationData.Direction\x12\x0c\n\x04lang\x18\x03 \x01(\t\x12\x0c\n\x04\x62ody\x18\x04 \x01(\t\x12\x0b\n\x03tag\x18\x05 \x01(\t\x12\r\n\x05image\x18\x0f \x01(\t\x12\x0c\n\x04icon\x18\x06 \x01(\t\x12\r\n\x05\x62\x61\x64ge\x18\x0e \x01(\t\x12\x1d\n\x11vibration_pattern\x18\t \x03(\x05\x42\x02\x10\x01\x12\x11\n\ttimestamp\x18\x0c \x01(\x03\x12\x10\n\x08renotify\x18\r \x01(\x08\x12\x0e\n\x06silent\x18\x07 \x01(\x08\x12\x1b\n\x13require_interaction\x18\x0b \x01(\x08\x12\x0c\n\x04\x64\x61ta\x18\x08 \x01(\x0c\x12\x42\n\x07\x61\x63tions\x18\n \x03(\x0b\x32\x31.NotificationDatabaseDataProto.NotificationAction\x12\x1e\n\x16show_trigger_timestamp\x18\x10 \x01(\x03\";\n\tDirection\x12\x11\n\rLEFT_TO_RIGHT\x10\x00\x12\x11\n\rRIGHT_TO_LEFT\x10\x01\x12\x08\n\x04\x41UTO\x10\x02\"4\n\x0c\x43losedReason\x12\x08\n\x04USER\x10\x00\x12\r\n\tDEVELOPER\x10\x01\x12\x0b\n\x07UNKNOWN\x10\x02') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'notification_database_data_pb2', _globals) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONDATA'].fields_by_name['vibration_pattern']._options = None + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONDATA'].fields_by_name['vibration_pattern']._serialized_options = b'\020\001' + _globals['_NOTIFICATIONDATABASEDATAPROTO']._serialized_start=37 + _globals['_NOTIFICATIONDATABASEDATAPROTO']._serialized_end=1316 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONACTION']._serialized_start=589 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONACTION']._serialized_end=775 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONACTION_TYPE']._serialized_start=747 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONACTION_TYPE']._serialized_end=775 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONDATA']._serialized_start=778 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONDATA']._serialized_end=1262 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONDATA_DIRECTION']._serialized_start=1203 + _globals['_NOTIFICATIONDATABASEDATAPROTO_NOTIFICATIONDATA_DIRECTION']._serialized_end=1262 + _globals['_NOTIFICATIONDATABASEDATAPROTO_CLOSEDREASON']._serialized_start=1264 + _globals['_NOTIFICATIONDATABASEDATAPROTO_CLOSEDREASON']._serialized_end=1316 +# @@protoc_insertion_point(module_scope) diff --git a/dfindexeddb/utils.py b/dfindexeddb/utils.py index dbdb796..0b7c012 100644 --- a/dfindexeddb/utils.py +++ b/dfindexeddb/utils.py @@ -14,6 +14,8 @@ # limitations under the License. """Utilities for dfindexeddb.""" from __future__ import annotations +import copy +import dataclasses import io import os import struct @@ -259,3 +261,31 @@ def FromBytes( """ stream = io.BytesIO(raw_data) return cls.FromStream(stream=stream, base_offset=base_offset) + + +def asdict(obj, *, dict_factory=dict): + """Custom implementation of the asdict dataclasses method to include the + class name under the __type__ attribute name. + """ + if not dataclasses.is_dataclass(obj): + raise TypeError("asdict() should be called on dataclass instances") + return _asdict_inner(obj, dict_factory) + + +def _asdict_inner(obj, dict_factory): + if dataclasses.is_dataclass(obj): + result = [('__type__', obj.__class__.__name__)] + for f in dataclasses.fields(obj): + value = _asdict_inner(getattr(obj, f.name), dict_factory) + result.append((f.name, value)) + return dict_factory(result) + elif isinstance(obj, tuple) and hasattr(obj, '_fields'): + return type(obj)(*[_asdict_inner(v, dict_factory) for v in obj]) + elif isinstance(obj, (list, tuple)): + return type(obj)(_asdict_inner(v, dict_factory) for v in obj) + elif isinstance(obj, dict): + return type(obj)((_asdict_inner(k, dict_factory), + _asdict_inner(v, dict_factory)) + for k, v in obj.items()) + else: + return copy.deepcopy(obj)