diff --git a/dfindexeddb/indexeddb/chromium/blink.py b/dfindexeddb/indexeddb/chromium/blink.py index 5959986..f4ad8fc 100644 --- a/dfindexeddb/indexeddb/chromium/blink.py +++ b/dfindexeddb/indexeddb/chromium/blink.py @@ -13,14 +13,340 @@ # See the License for the specific language governing permissions and # limitations under the License. """Parsers for blink javascript serialized objects.""" +from __future__ import annotations +from dataclasses import dataclass import io -from typing import Any +from typing import Any, Optional, Union +from dfindexeddb import errors from dfindexeddb import utils from dfindexeddb.indexeddb.chromium import definitions from dfindexeddb.indexeddb.chromium import v8 +_MS_PER_SECOND = 1000 + + +@dataclass +class AudioData: + """A Javascript AudioData class. + + Attributes: + audio_frame_index: the Audio Frame index. + """ + audio_frame_index: int + + +@dataclass +class BaseIndex: + """A base index class.""" + index: int + + +@dataclass +class BitMap: + """A Javascript BitMap.""" + + +@dataclass +class Blob: + """A Javascript Blob class type. + + Attributes: + uuid: the UUID of the Blob. + type: the type of the Blob. + size: the size of the Blob. + """ + uuid: str + type: str + size: int + + +@dataclass +class BlobIndex(BaseIndex): + """A Javascript BlobIndex class type.""" + + +@dataclass +class CryptoKey: + """A Javascript CryptoKey class type. + + Attributes: + algorithm: the CryptoKey algorithm. + key_type: the CryptoKey type. + extractable: True if the CryptoKey is extractable. + usages: the CryptoKey usage. + key_data: the raw key data. + """ + algorithm_parameters: dict[str, Any] + key_type: Union[ + definitions.WebCryptoKeyType, definitions.AsymmetricCryptoKeyType] + extractable: bool + usages: definitions.CryptoKeyUsage + key_data: bytes + + +@dataclass +class DOMException: + """A Javascript DOMException. + + Attributes: + name: the name. + message: the message. + stack_unused: the stack unused. + """ + name: str + message: str + stack_unused: str + + +@dataclass +class DOMFileSystem: + """A Javascript DOMFileSystem. + + Attributes: + raw_type: the raw type. + name: the name. + root_url: the root URL. + """ + raw_type: int + name: str + root_url: str + + +@dataclass +class DOMMatrix2D: + """A Javascript DOMMatrix2D. + + Attributes: + values: the values. + """ + values: list[float] + + +@dataclass +class DOMMatrix2DReadOnly(DOMMatrix2D): + """A Javascript Read-Only DOMMatrix2D.""" + + +@dataclass +class DOMMatrix: + """A Javascript DOMMatrix. + + Attributes: + values: the values. + """ + values: list[float] + + +@dataclass +class DOMMatrixReadOnly(DOMMatrix): + """A Javascript Read-Only DOMMatrix.""" + + +@dataclass +class DOMPoint: + """A Javascript DOMPoint. + + Attributes: + x: the X coordinate. + y: the Y coordinate. + z: the Z coordinate. + w: the W coordinate. + """ + x: float + y: float + z: float + w: float + + +@dataclass +class DOMPointReadOnly(DOMPoint): + """A Javascript Read-Only DOMPoint.""" + + +@dataclass +class DOMQuad: + """A Javascript DOMQuad. + + Attributes: + p1: the first point. + p2: the second point. + p3: the third point. + p4: the fourth point. + """ + p1: DOMPoint + p2: DOMPoint + p3: DOMPoint + p4: DOMPoint + + +@dataclass +class DOMRect: + """A Javascript DOMRect. + + Attributes: + x: the X coordinate. + y: the Y coordinate. + width: the width. + height: the height. + """ + x: float + y: float + width: float + height: float + + +@dataclass +class DOMRectReadOnly(DOMRect): + """A Javascript Read-only DOMRect.""" + + +@dataclass +class EncodedAudioChunk(BaseIndex): + """A Javascript EncodedAudioChunk.""" + + +@dataclass +class EncodedVideoChunk(BaseIndex): + """A Javascript EncodedVideoChunk.""" + + +@dataclass +class File: + """A Javascript File. + + Attributes: + path: the file path. + name: the file name. + relative_path: the file relative path. + uuid: the file UUID. + type: the file type. + has_snapshot: True if the file has a snapshot. + size: the file size. + last_modified_ms: the file last modified in milliseconds. + is_user_visible: True if the file is user visible. + """ + path: str + name: Optional[str] + relative_path: Optional[str] + uuid: str + type: str + has_snapshot: int + size: Optional[int] + last_modified_ms: Optional[float] + is_user_visible: int + + +@dataclass +class FileIndex(BaseIndex): + """A Javascript FileIndex.""" + + +@dataclass +class FileList: + """A Javascript FileList. + + Attributes: + files: the list of files. + """ + files: list[File] + + +@dataclass +class FileListIndex: + """A Javascript FileListIndex. + + Attributes: + file_indexes: the list of file indexes. + """ + file_indexes: list[FileIndex] + + +@dataclass +class FileSystemHandle: + """A Javascript FileSystemHandle. + + Attributes: + name: the file system handle name. + token_index: the file system token index. + """ + name: str + token_index: int + + +@dataclass +class ImageBitmapTransfer(BaseIndex): + """A Javascript ImageBitmapTransfer.""" + + +@dataclass +class MediaSourceHandle(BaseIndex): + """A Javascript MediaSourceHandle.""" + + +@dataclass +class MessagePort(BaseIndex): + """A Javascript MessagePort.""" + + +@dataclass +class MojoHandle(BaseIndex): + """A Javascript MojoHandle.""" + + +@dataclass +class ReadableStreamTransfer(BaseIndex): + """A Javascript ReadableStreamTransfer.""" + + +@dataclass +class RTCEncodedAudioFrame(BaseIndex): + """A Javascript RTCEncodedAudioFrame.""" + + +@dataclass +class RTCEncodedVideoFrame(BaseIndex): + """A Javascript RTCEncodedVideoFrame.""" + + +@dataclass +class WriteableStreamTransfer(BaseIndex): + """A Javascript WriteableStreamTransfer.""" + + +@dataclass +class TransformStreamTransfer(BaseIndex): + """A Javascript TransformStreamTransfer.""" + + +@dataclass +class OffscreenCanvasTransfer: + """A Javascript Offscreen Canvas Transfer.""" + width: int + height: int + canvas_id: int + client_id: int + sink_id: int + filter_quality: int + + +@dataclass +class UnguessableToken: + """A Javascript Unguessable Token. + + Attributes: + high: the high part. + low: the low part. + """ + high: int + low: int + + +@dataclass +class VideoFrame(BaseIndex): + """A Javascript VideoFrame.""" + class V8ScriptValueDecoder: """A Blink V8 Serialized Script Value (SSV) decoder. @@ -40,6 +366,8 @@ def __init__(self, raw_data: bytes): self.deserializer = None self.raw_data = raw_data self.version = 0 + self.trailer_offset = None + self.trailer_size = None def _ReadVersionEnvelope(self) -> int: """Reads the Blink version envelope. @@ -52,42 +380,600 @@ def _ReadVersionEnvelope(self) -> int: return 0 decoder = utils.StreamDecoder(io.BytesIO(self.raw_data)) - _, tag_value = decoder.DecodeUint8() - tag = definitions.BlinkSerializationTag(tag_value) + _, tag = decoder.DecodeUint8() if tag != definitions.BlinkSerializationTag.VERSION: return 0 - _, version = decoder.DecodeUint32Varint() - if version < self._MIN_VERSION_FOR_SEPARATE_ENVELOPE: + _, self.version = decoder.DecodeUint32Varint() + if self.version < self._MIN_VERSION_FOR_SEPARATE_ENVELOPE: return 0 consumed_bytes = decoder.stream.tell() - if version >= self._MIN_WIRE_FORMAT_VERSION: + if self.version >= self._MIN_WIRE_FORMAT_VERSION: + _, trailer_offset_tag = decoder.DecodeUint8() + if (trailer_offset_tag != + definitions.BlinkSerializationTag.TRAILER_OFFSET): + raise errors.ParserError('Trailer offset not found') + _, self.trailer_offset = decoder.DecodeInt( + byte_count=8, byte_order='big', signed=False) + _, self.trailer_size = decoder.DecodeInt( + byte_count=4, byte_order='big', signed=False) trailer_offset_data_size = 1 + 8 + 4 # 1 + sizeof(uint64) + sizeof(uint32) consumed_bytes += trailer_offset_data_size if consumed_bytes >= len(self.raw_data): return 0 return consumed_bytes + def _ReadAESKey(self) -> tuple[definitions.WebCryptoKeyType, dict[str, Any]]: + """Reads an AES CryptoKey. + + Returns: + The AES key algorithm parameters + """ + _, raw_id = self.deserializer.decoder.DecodeUint32Varint() + crypto_key_algorithm_id = definitions.CryptoKeyAlgorithm(raw_id) + + _, length_bytes = self.deserializer.decoder.DecodeUint32Varint() + + algorithm_parameters = { + 'id': crypto_key_algorithm_id, + 'length_bits': length_bytes*8 + } + + return definitions.WebCryptoKeyType.SECRET, algorithm_parameters + + def _ReadHMACKey(self) -> tuple[definitions.WebCryptoKeyType, dict[str, Any]]: + """Reads a HMAC CryptoKey. + + Returns: + The HMAC key algorithm parameters + """ + _, length_bytes = self.deserializer.decoder.DecodeUint32Varint() + _, raw_hash = self.deserializer.decoder.DecodeUint32Varint() + crypto_key_algorithm = definitions.CryptoKeyAlgorithm(raw_hash) + + algorithm_parameters = { + 'id': crypto_key_algorithm, + 'length_bits': length_bytes*8 + } + + return definitions.WebCryptoKeyType.SECRET, algorithm_parameters + + def _ReadRSAHashedKey( + self + ) -> tuple[definitions.AsymmetricCryptoKeyType, dict[str, Any]]: + """Reads an RSA Hashed CryptoKey. + + Returns: + The RSA Hashed key algorithm parameters + """ + _, raw_id = self.deserializer.decoder.DecodeUint32Varint() + crypto_key_algorithm = definitions.CryptoKeyAlgorithm(raw_id) + + _, raw_key_type = self.deserializer.decoder.DecodeUint32Varint() + key_type = definitions.AsymmetricCryptoKeyType(raw_key_type) + + _, modulus_length_bits = self.deserializer.decoder.DecodeUint32Varint() + _, public_exponent_size = self.deserializer.decoder.DecodeUint32Varint() + _, public_exponent_bytes = self.deserializer.decoder.ReadBytes( + count=public_exponent_size) + + _, raw_hash = self.deserializer.decoder.DecodeUint32Varint() + hash_algorithm = definitions.CryptoKeyAlgorithm(raw_hash) + + algorithm_parameters = { + 'id': crypto_key_algorithm, + 'modulus_length_bits': modulus_length_bits, + 'public_exponent_size': public_exponent_size, + 'public_exponent_bytes': public_exponent_bytes, + 'hash': hash_algorithm + } + + return key_type, algorithm_parameters + + def _ReadECKey( + self + ) -> tuple[definitions.AsymmetricCryptoKeyType, dict[str, Any]]: + """Reads an EC Key parameters from the current decoder position. + + Returns: + The EC Key algorithm parameters. + """ + _, raw_id = self.deserializer.decoder.DecodeUint32Varint() + crypto_key_algorithm = definitions.CryptoKeyAlgorithm(raw_id) + + _, raw_key_type = self.deserializer.decoder.DecodeUint32Varint() + key_type = definitions.AsymmetricCryptoKeyType(raw_key_type) + + _, raw_named_curve = self.deserializer.decoder.DecodeUint32Varint() + named_curve_type = definitions.NamedCurve(raw_named_curve) + + algorithm_parameters = { + 'crypto_key_algorithm': crypto_key_algorithm, + 'named_curve_type': named_curve_type + } + + return key_type, algorithm_parameters + + def _ReadED25519Key( + self + ) -> tuple[definitions.AsymmetricCryptoKeyType, dict[str, Any]]: + """Reads a ED25519 CryptoKey from the current decoder position. + + Returns: + The ED25519 key algorithm parameters. + """ + _, raw_id = self.deserializer.decoder.DecodeUint32Varint() + crypto_key_algorithm = definitions.CryptoKeyAlgorithm(raw_id) + + _, raw_key_type = self.deserializer.decoder.DecodeUint32Varint() + key_type = definitions.AsymmetricCryptoKeyType( + raw_key_type) + + algorithm_parameters = { + 'crypto_key_algorithm': crypto_key_algorithm + } + + return key_type, algorithm_parameters + + def ReadNoParamsKey( + self + ) -> tuple[definitions.WebCryptoKeyType, dict[str, Any]]: + """Reads a No Params CryptoKey from the current decoder position. + + Returns: + The No Parameters key algorithm parameters. + """ + _, raw_id = self.deserializer.decoder.DecodeUint32Varint() + crypto_key_algorithm = definitions.CryptoKeyAlgorithm(raw_id) + + algorithm_parameters = { + 'crypto_key_algorithm': crypto_key_algorithm + } + + return definitions.WebCryptoKeyType.SECRET, algorithm_parameters + + def _ReadBlob(self) -> Optional[Blob]: + """Reads a Blob from the current position. + + Returns: + A Blob if the version is less then 3, None otherwise. + """ + if self.version and self.version < 3: + return None + + uuid = self.deserializer.ReadUTF8String() + blob_type = self.deserializer.ReadUTF8String() + _, size = self.deserializer.decoder.DecodeUint64Varint() + return Blob(uuid=uuid, type=blob_type, size=size) + + def _ReadBlobIndex(self) -> Optional[BlobIndex]: + """Reads a BlobIndex from the current decoder position. + + Returns: + A parsed BlobIndex if the version is greater than 6, None otherwise. + """ + if self.version < 6: + return None + _, index = self.deserializer.decoder.DecodeUint32Varint() + return BlobIndex(index=index) + + def _ReadFile(self) -> Optional[File]: + """Reads a Javascript File from the current position. + + Returns: + A File instance, None if the version is less than 3. + """ + if self.version < 3: + return None + + path = self.deserializer.ReadUTF8String() + name = self.deserializer.ReadUTF8String() if self.version >= 4 else None + relative_path = ( + self.deserializer.ReadUTF8String() if self.version >= 4 else None) + uuid = self.deserializer.ReadUTF8String() + file_type = self.deserializer.ReadUTF8String() + has_snapshot = ( + self.deserializer.decoder.DecodeUint32Varint()[1] + if self.version >= 4 else 0) + + if has_snapshot: + _, size = self.deserializer.decoder.DecodeUint64Varint() + _, last_modified_ms = self.deserializer.decoder.DecodeDouble() + if self.version < 8: + last_modified_ms *= _MS_PER_SECOND + else: + size = None + last_modified_ms = None + + is_user_visible = ( + self.deserializer.decoder.DecodeUint32Varint()[1] + if self.version >= 7 else 1) + + return File( + path=path, + name=name, + relative_path=relative_path, + uuid=uuid, + type=file_type, + has_snapshot=has_snapshot, + size=size, + last_modified_ms=last_modified_ms, + is_user_visible=is_user_visible) + + def _ReadFileIndex(self) -> Optional[FileIndex]: + """Reads a FileIndex from the current position. + + Returns: + A FileIndex, or None if the version is less than 6. + """ + if self.version < 6: + return None + _, index = self.deserializer.decoder.DecodeUint32Varint() + return FileIndex(index=index) + + def _ReadFileList(self) -> Optional[FileList]: + """Reads a Javascript FileList from the current position. + + Returns: + A FileList, or None if a File could not be read. + """ + _, length = self.deserializer.decoder.DecodeUint32Varint() + files = [] + for _ in range(length): + decoded_file = self._ReadFile() + if not decoded_file: + return None + files.append(decoded_file) + return FileList(files=files) + + def _ReadFileListIndex(self) -> Optional[FileListIndex]: + """Reads a Javascript FileListIndex from the current position. + + Returns: + A FileListIndex, or None if a FileIndex could not be read. + """ + _, length = self.deserializer.decoder.DecodeUint32Varint() + file_indexes = [] + for _ in range(length): + decoded_file_index = self._ReadFileIndex() + if not decoded_file_index: + return None + file_indexes.append(decoded_file_index) + return FileListIndex(file_indexes=file_indexes) + + def _ReadImageBitmap(self): + """Reads an ImageBitmap.""" + raise NotImplementedError('V8ScriptValueDecoder._ReadImageBitmap') + + def _ReadImageBitmapTransfer(self) -> ImageBitmapTransfer: + """Reads an ImageBitmapTransfer.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return ImageBitmapTransfer(index=index) + + def _ReadImageData(self): + """Reads an ImageData from the current position.""" + raise NotImplementedError('V8ScriptValueDecoder._ReadImageData') + + def _ReadDOMPoint(self) -> DOMPoint: + """Reads a DOMPoint from the current position.""" + _, x = self.deserializer.decoder.DecodeDouble() + _, y = self.deserializer.decoder.DecodeDouble() + _, z = self.deserializer.decoder.DecodeDouble() + _, w = self.deserializer.decoder.DecodeDouble() + return DOMPoint(x=x, y=y, z=z, w=w) + + def _ReadDOMPointReadOnly(self) -> DOMPointReadOnly: + """Reads a DOMPointReadOnly from the current position.""" + _, x = self.deserializer.decoder.DecodeDouble() + _, y = self.deserializer.decoder.DecodeDouble() + _, z = self.deserializer.decoder.DecodeDouble() + _, w = self.deserializer.decoder.DecodeDouble() + return DOMPointReadOnly(x=x, y=y, z=z, w=w) + + def _ReadDOMRect(self) -> DOMRect: + """Reads a DOMRect from the current position.""" + _, x = self.deserializer.decoder.DecodeDouble() + _, y = self.deserializer.decoder.DecodeDouble() + _, width = self.deserializer.decoder.DecodeDouble() + _, height = self.deserializer.decoder.DecodeDouble() + return DOMRect(x=x, y=y, width=width, height=height) + + def _ReadDOMRectReadOnly(self) -> DOMRectReadOnly: + """Reads a DOMRectReadOnly from the current position.""" + _, x = self.deserializer.decoder.DecodeDouble() + _, y = self.deserializer.decoder.DecodeDouble() + _, width = self.deserializer.decoder.DecodeDouble() + _, height = self.deserializer.decoder.DecodeDouble() + return DOMRectReadOnly(x=x, y=y, width=width, height=height) + + def _ReadDOMQuad(self) -> DOMQuad: + """Reads a DOMQuad from the current position.""" + p1 = self._ReadDOMPoint() + p2 = self._ReadDOMPoint() + p3 = self._ReadDOMPoint() + p4 = self._ReadDOMPoint() + return DOMQuad(p1=p1, p2=p2, p3=p3, p4=p4) + + def _ReadDOMMatrix2D(self) -> DOMMatrix2D: + """Reads a Javascript DOMMatrix2D from the current position.""" + values = [self.deserializer.decoder.DecodeDouble()[1] for _ in range(6)] + return DOMMatrix2D(values=values) + + def _ReadDOMMatrix2DReadOnly(self) -> DOMMatrix2DReadOnly: + """Reads a Javascript Read-Only DOMMatrix2D from the current position.""" + values = [self.deserializer.decoder.DecodeDouble()[1] for _ in range(6)] + return DOMMatrix2DReadOnly(values=values) + + def _ReadDOMMatrix(self) -> DOMMatrix: + """Reads a Javascript DOMMatrix from the current position.""" + values = [self.deserializer.decoder.DecodeDouble()[1] for _ in range(16)] + return DOMMatrix(values=values) + + def _ReadDOMMatrixReadOnly(self) -> DOMMatrixReadOnly: + """Reads a Javascript Read-Only DOMMatrix from the current position.""" + values = [self.deserializer.decoder.DecodeDouble()[1] for _ in range(16)] + return DOMMatrixReadOnly(values=values) + + def _ReadMessagePort(self) -> MessagePort: + """Reads a MessagePort from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return MessagePort(index=index) + + def _ReadMojoHandle(self) -> MojoHandle: + """Reads a MojoHandle from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return MojoHandle(index=index) + + def _ReadOffscreenCanvasTransfer(self): + """Reads a Offscreen Canvas Transfer from the current position.""" + _, width = self.deserializer.decoder.DecodeUint32Varint() + _, height = self.deserializer.decoder.DecodeUint32Varint() + _, canvas_id = self.deserializer.decoder.DecodeUint32Varint() + _, client_id = self.deserializer.decoder.DecodeUint32Varint() + _, sink_id = self.deserializer.decoder.DecodeUint32Varint() + _, filter_quality = self.deserializer.decoder.DecodeUint32Varint() + + return OffscreenCanvasTransfer( + width=width, + height=height, + canvas_id=canvas_id, + client_id=client_id, + sink_id=sink_id, + filter_quality=filter_quality) + + def _ReadReadableStreamTransfer(self) -> ReadableStreamTransfer: + """Reads a Readable Stream Transfer from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return ReadableStreamTransfer(index=index) + + def _ReadWriteableStreamTransfer(self) -> WriteableStreamTransfer: + """Reads a Writeable Stream Transfer from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return WriteableStreamTransfer(index=index) + + def _ReadTransformStreamTransfer(self) -> TransformStreamTransfer: + """Reads a TransformStreamTransfer from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return TransformStreamTransfer(index=index) + + def _ReadDOMException(self) -> DOMException: + """Reads a DOMException from the current position.""" + name = self.deserializer.ReadUTF8String() + message = self.deserializer.ReadUTF8String() + stack_unused = self.deserializer.ReadUTF8String() + return DOMException(name=name, message=message, stack_unused=stack_unused) + + def _ReadRTCEncodedAudioFrame(self) -> RTCEncodedAudioFrame: + """Reads a RTC Encoded Audio Frame from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return RTCEncodedAudioFrame(index=index) + + def _ReadRTCEncodedVideoFrame(self) -> RTCEncodedVideoFrame: + """Reads a RTC Encoded Video Frame from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return RTCEncodedVideoFrame(index=index) + + def _ReadCryptoKey(self) -> CryptoKey: + """Reads a CryptoKey from the current position. + + Returns: + A parsed CryptoKey. + """ + _, raw_key_byte = self.deserializer.decoder.DecodeUint8() + key_byte = definitions.CryptoKeySubTag(raw_key_byte) + if key_byte == definitions.CryptoKeySubTag.AES_KEY: + key_type, algorithm_parameters = self._ReadAESKey() + elif key_byte == definitions.CryptoKeySubTag.HMAC_KEY: + key_type, algorithm_parameters = self._ReadHMACKey() + elif key_byte == definitions.CryptoKeySubTag.RSA_HASHED_KEY: + key_type, algorithm_parameters = self._ReadRSAHashedKey() + elif key_byte == definitions.CryptoKeySubTag.EC_KEY: + key_type, algorithm_parameters = self._ReadECKey() + elif key_byte == definitions.CryptoKeySubTag.ED25519_KEY: + key_type, algorithm_parameters = self._ReadED25519Key() + elif key_byte == definitions.CryptoKeySubTag.NO_PARAMS_KEY: + key_type, algorithm_parameters = self.ReadNoParamsKey() + + _, raw_usages = self.deserializer.decoder.DecodeUint32Varint() + usages = definitions.CryptoKeyUsage(raw_usages) + + extractable = bool(raw_usages & definitions.CryptoKeyUsage.EXTRACTABLE) + _, key_data_length = self.deserializer.decoder.DecodeUint32Varint() + + _, key_data = self.deserializer.decoder.ReadBytes(count=key_data_length) + + return CryptoKey( + key_type=key_type, + algorithm_parameters=algorithm_parameters, + extractable=extractable, + usages=usages, + key_data=key_data + ) + + def _ReadAudioData(self) -> AudioData: + """Reads an AudioData from the current position.""" + _, audio_frame_index = self.deserializer.decoder.DecodeUint32Varint() + return AudioData(audio_frame_index=audio_frame_index) + + def _ReadDomFileSystem(self) -> DOMFileSystem: + """Reads an DOMFileSystem from the current position.""" + _, raw_type = self.deserializer.decoder.DecodeUint32Varint() + name = self.deserializer.ReadUTF8String() + root_url = self.deserializer.ReadUTF8String() + return DOMFileSystem(raw_type=raw_type, name=name, root_url=root_url) + + def _ReadFileSystemFileHandle(self) -> FileSystemHandle: + """Reads a FileSystemHandle from the current position.""" + name = self.deserializer.ReadUTF8String() + _, token_index = self.deserializer.decoder.DecodeUint32Varint() + return FileSystemHandle(name=name, token_index=token_index) + + def _ReadVideoFrame(self) -> VideoFrame: + """Reads the video frame from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return VideoFrame(index=index) + + def _ReadEncodedAudioChunk(self) -> EncodedAudioChunk: + """Reads the encoded audio chunk from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return EncodedAudioChunk(index=index) + + def _ReadEncodedVideoChunk(self) -> EncodedVideoChunk: + """Reads the encoded video chunk from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return EncodedVideoChunk(index=index) + + def _ReadMediaStreamTrack(self): + """Reads the media stream track from the current position.""" + raise NotImplementedError('V8ScriptValueDecoder._ReadMediaStreamTrack') + + def _ReadCropTarget(self): + """Reads the crop target from the current position.""" + raise NotImplementedError('V8ScriptValueDecoder._ReadCropTarget') + + def _ReadRestrictionTarget(self): + """Reads the restriction target from the current position.""" + raise NotImplementedError('V8ScriptValueDecoder._ReadRestrictionTarget') + + def _ReadMediaSourceHandle(self) -> MediaSourceHandle: + """Reads the media source handle from the current position.""" + _, index = self.deserializer.decoder.DecodeUint32Varint() + return MediaSourceHandle(index=index) + + def _ReadFencedFrameConfig(self): + """Reads the fenced frame target from the current position.""" + raise NotImplementedError('V8ScriptValueDecoder._ReadFencedFrameConfig') + def ReadTag(self) -> definitions.BlinkSerializationTag: - """Reads a blink serialization tag. + """Reads a blink serialization tag from the current position. Returns: The blink serialization tag. + + Raises: + ParserError: if an invalid blink tag is read. """ _, tag_value = self.deserializer.decoder.DecodeUint8() - return definitions.BlinkSerializationTag(tag_value) + try: + return definitions.BlinkSerializationTag(tag_value) + except ValueError as err: + offset = self.deserializer.decoder.stream.tell() + raise errors.ParserError( + f'Invalid blink tag encountered at offset {offset}') from err def ReadHostObject(self) -> Any: - """Reads a host DOM object. + """Reads a host object from the current position. - Raises: - NotImplementedError: when called. + Returns: + The Host Object. """ tag = self.ReadTag() - raise NotImplementedError( - f'V8ScriptValueDecoder.ReadHostObject - {tag.name}') + dom_object = None + + # V8ScriptValueDeserializer + if tag == definitions.BlinkSerializationTag.BLOB: + dom_object = self._ReadBlob() + elif tag == definitions.BlinkSerializationTag.BLOB_INDEX: + dom_object = self._ReadBlobIndex() + elif tag == definitions.BlinkSerializationTag.FILE: + dom_object = self._ReadFile() + elif tag == definitions.BlinkSerializationTag.FILE_INDEX: + dom_object = self._ReadFileIndex() + elif tag == definitions.BlinkSerializationTag.FILE_LIST: + dom_object = self._ReadFileList() + elif tag == definitions.BlinkSerializationTag.FILE_LIST_INDEX: + dom_object = self._ReadFileListIndex() + elif tag == definitions.BlinkSerializationTag.IMAGE_BITMAP: + dom_object = self._ReadImageBitmap() + elif tag == definitions.BlinkSerializationTag.IMAGE_BITMAP_TRANSFER: + dom_object = self._ReadImageBitmapTransfer() + elif tag == definitions.BlinkSerializationTag.IMAGE_DATA: + dom_object = self._ReadImageData() + elif tag == definitions.BlinkSerializationTag.DOM_POINT: + dom_object = self._ReadDOMPoint() + elif tag == definitions.BlinkSerializationTag.DOM_POINT_READ_ONLY: + dom_object = self._ReadDOMPointReadOnly() + elif tag == definitions.BlinkSerializationTag.DOM_RECT: + dom_object = self._ReadDOMRect() + elif tag == definitions.BlinkSerializationTag.DOM_RECT_READ_ONLY: + dom_object = self._ReadDOMRectReadOnly() + elif tag == definitions.BlinkSerializationTag.DOM_QUAD: + dom_object = self._ReadDOMQuad() + elif tag == definitions.BlinkSerializationTag.DOM_MATRIX2D: + dom_object = self._ReadDOMMatrix2D() + elif tag == definitions.BlinkSerializationTag.DOM_MATRIX2D_READ_ONLY: + dom_object = self._ReadDOMMatrix2DReadOnly() + elif tag == definitions.BlinkSerializationTag.DOM_MATRIX: + dom_object = self._ReadDOMMatrix() + elif tag == definitions.BlinkSerializationTag.DOM_MATRIX_READ_ONLY: + dom_object = self._ReadDOMMatrixReadOnly() + elif tag == definitions.BlinkSerializationTag.MESSAGE_PORT: + dom_object = self._ReadMessagePort() + elif tag == definitions.BlinkSerializationTag.MOJO_HANDLE: + dom_object = self._ReadMojoHandle() + elif tag == definitions.BlinkSerializationTag.OFFSCREEN_CANVAS_TRANSFER: + dom_object = self._ReadOffscreenCanvasTransfer() + elif tag == definitions.BlinkSerializationTag.READABLE_STREAM_TRANSFER: + dom_object = self._ReadReadableStreamTransfer() + elif tag == definitions.BlinkSerializationTag.WRITABLE_STREAM_TRANSFER: + dom_object = self._ReadWriteableStreamTransfer() + elif tag == definitions.BlinkSerializationTag.TRANSFORM_STREAM_TRANSFER: + dom_object = self._ReadTransformStreamTransfer() + elif tag == definitions.BlinkSerializationTag.DOM_EXCEPTION: + dom_object = self._ReadDOMException() + + # V8ScriptValueDeserializerForModules + elif tag == definitions.BlinkSerializationTag.CRYPTO_KEY: + dom_object = self._ReadCryptoKey() + elif tag == definitions.BlinkSerializationTag.DOM_FILE_SYSTEM: + dom_object = self._ReadDomFileSystem() + elif tag == definitions.BlinkSerializationTag.FILE_SYSTEM_FILE_HANDLE: + dom_object = self._ReadFileSystemFileHandle() + elif tag == definitions.BlinkSerializationTag.RTC_ENCODED_AUDIO_FRAME: + dom_object = self._ReadRTCEncodedAudioFrame() + elif tag == definitions.BlinkSerializationTag.RTC_ENCODED_VIDEO_FRAME: + dom_object = self._ReadRTCEncodedVideoFrame() + elif tag == definitions.BlinkSerializationTag.AUDIO_DATA: + dom_object = self._ReadAudioData() + elif tag == definitions.BlinkSerializationTag.VIDEO_FRAME: + dom_object = self._ReadVideoFrame() + elif tag == definitions.BlinkSerializationTag.ENCODED_AUDIO_CHUNK: + dom_object = self._ReadEncodedAudioChunk() + elif tag == definitions.BlinkSerializationTag.ENCODED_VIDEO_CHUNK: + dom_object = self._ReadEncodedVideoChunk() + elif tag == definitions.BlinkSerializationTag.MEDIA_STREAM_TRACK: + dom_object = self._ReadMediaStreamTrack() + elif tag == definitions.BlinkSerializationTag.CROP_TARGET: + dom_object = self._ReadCropTarget() + elif tag == definitions.BlinkSerializationTag.RESTRICTION_TARGET: + dom_object = self._ReadRestrictionTarget() + elif tag == definitions.BlinkSerializationTag.MEDIA_SOURCE_HANDLE: + dom_object = self._ReadMediaSourceHandle() + elif tag == definitions.BlinkSerializationTag.FENCED_FRAME_CONFIG: + dom_object = self._ReadFencedFrameConfig() + return dom_object def Deserialize(self) -> Any: """Deserializes a Blink SSV. @@ -97,13 +983,24 @@ def Deserialize(self) -> Any: Returns: The deserialized script value. + + Raises: + ParserError: if an unsupported header version was found. """ version_envelope_size = self._ReadVersionEnvelope() - self.deserializer = v8.ValueDeserializer( - io.BytesIO(self.raw_data[version_envelope_size:]), delegate=self) - v8_version = self.deserializer.ReadHeader() - if not self.version: - self.version = v8_version + if self.trailer_size: + self.deserializer = v8.ValueDeserializer( + io.BytesIO( + self.raw_data[version_envelope_size:self.trailer_offset]), + delegate=self) + else: + self.deserializer = v8.ValueDeserializer( + io.BytesIO( + self.raw_data[version_envelope_size:]), + delegate=self) + is_supported = self.deserializer.ReadHeader() + if not is_supported: + raise errors.ParserError('Unsupported header') return self.deserializer.ReadValue() @classmethod diff --git a/dfindexeddb/indexeddb/chromium/definitions.py b/dfindexeddb/indexeddb/chromium/definitions.py index 180d435..3df700b 100644 --- a/dfindexeddb/indexeddb/chromium/definitions.py +++ b/dfindexeddb/indexeddb/chromium/definitions.py @@ -141,10 +141,12 @@ class BlinkSerializationTag(IntEnum): ENCODED_AUDIO_CHUNK = ord('y') ENCODED_VIDEO_CHUNK = ord('z') CROP_TARGET = ord('c') + RESTRICTION_TARGET = ord('D') MEDIA_SOURCE_HANDLE = ord('S') DEPRECATED_DETECTED_BARCODE = ord('B') DEPRECATED_DETECTED_FACE = ord('F') DEPRECATED_DETECTED_TEXT = ord('t') + FENCED_FRAME_CONFIG = ord('C') DOM_EXCEPTION = ord('x') TRAILER_OFFSET = 0xFE VERSION = 0xFF @@ -304,3 +306,67 @@ class V8ErrorTag(IntEnum): CAUSE = ord('c') STACK = ord('s') END = ord('.') + + +class ImageSerializationTag(IntEnum): + """Image Serialization tags.""" + END = 0 + PREDEFINED_COLOR_SPACE = 1 + CANVAS_PIXEL_FORMAT = 2 + IMAGE_DATA_STORAGE_FORMAT = 3 + ORIGIN_CLEAN = 4 + IS_PREMULTIPLIED = 5 + CANVAS_OPACITY_MODE = 6 + PARAMETRIC_COLOR_SPACE = 7 + IMAGE_ORIENTATION = 8 + LAST = IMAGE_ORIENTATION + + +class SerializedPredefinedColorSpace(IntEnum): + """Serialized Predefined Color Space enumeration.""" + LEGACY_OBSOLETE = 0 + SRGB = 1 + REC2020 = 2 + P3 = 3 + REC2100HLG = 4 + REC2100PQ = 5 + SRGB_LINEAR = 6 + LAST = SRGB_LINEAR + + +class SerializedPixelFormat(IntEnum): + """Serialized Pixel Format enumeration.""" + NATIVE8_LEGACY_OBSOLETE = 0 + F16 = 1 + RGBA8 = 2 + BGRA8 = 3 + RGBX8 = 4 + LAST = RGBX8 + + +class SerializedImageDataStorageFormat(IntEnum): + """The Serialized Image Data Storage Format.""" + UINT8CLAMPED = 0 + UINT16 = 1 + FLOAT32 = 2 + LAST = FLOAT32 + + +class SerializedOpacityMode(IntEnum): + """The Serialized Opacity Mode.""" + KNONOPAQUE = 0 + KOPAQUE = 1 + KLAST = KOPAQUE + + +class SerializedImageOrientation(IntEnum): + """The Serialized Image Orientation.""" + TOP_LEFT = 0 + TOP_RIGHT = 1 + BOTTOM_RIGHT = 2 + BOTTOM_LEFT = 3 + LEFT_TOP = 4 + RIGHT_TOP = 5 + RIGHT_BOTTOM = 6 + LEFT_BOTTOM = 7 + LAST = LEFT_BOTTOM diff --git a/dfindexeddb/indexeddb/chromium/record.py b/dfindexeddb/indexeddb/chromium/record.py index 51f604f..6ee0520 100644 --- a/dfindexeddb/indexeddb/chromium/record.py +++ b/dfindexeddb/indexeddb/chromium/record.py @@ -546,7 +546,7 @@ def FromDecoder( @dataclass -class EarlistCompactionTimeKey(BaseIndexedDBKey): +class EarliestCompactionTimeKey(BaseIndexedDBKey): """An earliest compaction time IndexedDB key.""" def DecodeValue(self, decoder: utils.LevelDBDecoder) -> int: @@ -558,11 +558,11 @@ def DecodeValue(self, decoder: utils.LevelDBDecoder) -> int: def FromDecoder( cls, decoder: utils.LevelDBDecoder, key_prefix: KeyPrefix, base_offset: int = 0 - ) -> EarlistCompactionTimeKey: + ) -> EarliestCompactionTimeKey: """Decodes the earliest compaction time key.""" offset, key_type = decoder.DecodeUint8() if key_type != definitions.GlobalMetadataKeyType.EARLIEST_COMPACTION_TIME: - raise errors.ParserError('Not a EarlistCompactionTimeKey') + raise errors.ParserError('Not a EarliestCompactionTimeKey') return cls(offset=base_offset + offset, key_prefix=key_prefix) @@ -668,7 +668,7 @@ class GlobalMetaDataKey(BaseIndexedDBKey): definitions.GlobalMetadataKeyType .EARLIEST_SWEEP: EarliestSweepKey, definitions.GlobalMetadataKeyType - .EARLIEST_COMPACTION_TIME: EarlistCompactionTimeKey, + .EARLIEST_COMPACTION_TIME: EarliestCompactionTimeKey, definitions.GlobalMetadataKeyType .SCOPES_PREFIX: ScopesPrefixKey, definitions.GlobalMetadataKeyType @@ -692,7 +692,7 @@ def FromDecoder( Type[DatabaseFreeListKey], Type[DatabaseNameKey], Type[EarliestSweepKey], - Type[EarlistCompactionTimeKey], + Type[EarliestCompactionTimeKey], Type[MaxDatabaseIdKey], Type[RecoveryBlobJournalKey], Type[SchemaVersionKey], @@ -972,7 +972,7 @@ class ObjectStoreDataValue: blob_offset: the blob offset, only valid if wrapped. value: the blink serialized value, only valid if not wrapped. """ - unkown: int + unknown: int is_wrapped: bool blob_size: Optional[int] blob_offset: Optional[int] @@ -1003,7 +1003,7 @@ def DecodeValue( _, blob_size = decoder.DecodeVarint() _, blob_offset = decoder.DecodeVarint() return ObjectStoreDataValue( - unkown=unknown_integer, + unknown=unknown_integer, is_wrapped=True, blob_size=blob_size, blob_offset=blob_offset, @@ -1011,7 +1011,7 @@ def DecodeValue( _, blink_bytes = decoder.ReadBytes() blink_value = blink.V8ScriptValueDecoder.FromBytes(blink_bytes) return ObjectStoreDataValue( - unkown=unknown_integer, + unknown=unknown_integer, is_wrapped=False, blob_size=None, blob_offset=None, diff --git a/dfindexeddb/indexeddb/chromium/v8.py b/dfindexeddb/indexeddb/chromium/v8.py index d6ddb30..e178410 100644 --- a/dfindexeddb/indexeddb/chromium/v8.py +++ b/dfindexeddb/indexeddb/chromium/v8.py @@ -152,7 +152,12 @@ def _PeekTag(self) -> Optional[definitions.V8SerializationTag]: _, tag_value = self.decoder.PeekBytes(1) except errors.DecoderError: return None - return definitions.V8SerializationTag(tag_value[0]) + try: + return definitions.V8SerializationTag(tag_value[0]) + except ValueError as error: + raise errors.ParserError( + f'Invalid v8 tag value {tag_value} at offset' + f' {self.decoder.stream.tell()}') from error def _ReadTag(self) -> definitions.V8SerializationTag: """Returns the next non-padding serialization tag. @@ -269,7 +274,7 @@ def _ReadObjectInternal(self) -> Tuple[definitions.V8SerializationTag, Any]: self.version >= 15): parsed_object = self.ReadSharedObject() elif self.version < 13: - self.decoder.stream.seek(-1) + self.decoder.stream.seek(-1, os.SEEK_CUR) parsed_object = self.ReadHostObject() else: parsed_object = None @@ -492,7 +497,7 @@ def _ReadJSPrimitiveWrapper( return value def _ReadJSRegExp(self) -> RegExp: - """Reads a Javscript regular expression from the current position.""" + """Reads a Javascript regular expression from the current position.""" next_id = self._GetNextId() pattern = self.ReadString() _, flags = self.decoder.DecodeUint32Varint() # TODO: verify flags diff --git a/dfindexeddb/indexeddb/cli.py b/dfindexeddb/indexeddb/cli.py index e9f2611..faf8c0c 100644 --- a/dfindexeddb/indexeddb/cli.py +++ b/dfindexeddb/indexeddb/cli.py @@ -15,6 +15,7 @@ """A CLI tool for dfindexeddb.""" import argparse import dataclasses +import enum from datetime import datetime import json import pathlib @@ -57,6 +58,8 @@ def default(self, o): return list(o) if isinstance(o, v8.RegExp): return str(o) + if isinstance(o, enum.Enum): + return o.name return json.JSONEncoder.default(self, o) @@ -85,6 +88,7 @@ def IndexeddbCommand(args): (f'Error parsing blink value: {err} for {record.__class__.__name__} ' f'at offset {record.offset} in {db_record.path}'), file=sys.stderr) print(f'Traceback: {traceback.format_exc()}', file=sys.stderr) + print(f'Record: {record}', file=sys.stderr) _Output(db_record, output=args.output) diff --git a/tests/dfindexeddb/indexeddb/chromium/__init__.py b/tests/dfindexeddb/indexeddb/chromium/__init__.py index e69de29..d3a5e8e 100644 --- a/tests/dfindexeddb/indexeddb/chromium/__init__.py +++ b/tests/dfindexeddb/indexeddb/chromium/__init__.py @@ -0,0 +1,14 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/tests/dfindexeddb/indexeddb/chromium/blink.py b/tests/dfindexeddb/indexeddb/chromium/blink.py new file mode 100644 index 0000000..ae20039 --- /dev/null +++ b/tests/dfindexeddb/indexeddb/chromium/blink.py @@ -0,0 +1,657 @@ +# -*- coding: utf-8 -*- +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Unit tests for blink serialized objects.""" +import unittest + +from dfindexeddb.indexeddb.chromium import blink +from dfindexeddb.indexeddb.chromium import definitions + + +class BlinkTest(unittest.TestCase): + """Unit tests for blink serialized objects.""" + + def test_ReadBlob(self): + """Tests Blink Blob decoding.""" + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x62, 0x01, 0x61, 0x01, 0x62, 0x00]) + expected_value = blink.Blob(uuid='a', type='b', size=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadBlobIndex(self): + """Tests Blink BlobIndex decoding.""" + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x69, 0x00]) + expected_value = blink.BlobIndex(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadFile(self): + """Tests Blink File decoding.""" + with self.subTest('file_v3'): + serialized_value = bytes([ + 0xff, 0x03, 0x3f, 0x00, 0x66, + 0x04, 0x70, 0x61, 0x74, 0x68, + 0x04, 0x75, 0x75, 0x69, 0x64, + 0x0a, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, + 0x61, 0x69, 0x6e]) + expected_value = blink.File( + path='path', + name=None, + relative_path=None, + uuid='uuid', + type='text/plain', + has_snapshot=0, + size=None, + last_modified_ms=None, + is_user_visible=1) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + with self.subTest('file_v4'): + serialized_value = bytes([ + 0xff, 0x04, 0x3f, 0x00, 0x66, + 0x04, 0x70, 0x61, 0x74, 0x68, + 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, + 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, + 0x04, 0x75, 0x75, 0x69, 0x64, + 0x0a, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, + 0x61, 0x69, 0x6e, 0x00]) + expected_value = blink.File( + path='path', + name='name', + relative_path='relative_path', + uuid='uuid', + type='text/plain', + has_snapshot=0, + size=None, + last_modified_ms=None, + is_user_visible=1) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + with self.subTest('file_v8'): + serialized_value = bytes([ + 0xff, 0x08, 0x3f, 0x00, 0x66, + 0x04, 0x70, 0x61, 0x74, 0x68, + 0x04, 0x6e, 0x61, 0x6d, 0x65, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, + 0x65, 0x5f, 0x70, 0x61, 0x74, 0x68, + 0x04, 0x75, 0x75, 0x69, 0x64, + 0x0a, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, + 0x61, 0x69, 0x6e, 0x01, 0x80, 0x04, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xd0, 0xbf, 0x01, 0x00]) + expected_value = blink.File( + path='path', + name='name', + relative_path='relative_path', + uuid='uuid', + type='text/plain', + has_snapshot=1, + size=512, + last_modified_ms=-0.25, + is_user_visible=1) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadFileIndex(self): + """Tests Blink FileIndex decoding.""" + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x65, 0x00]) + expected_value = blink.FileIndex(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadFileList(self): + """Tests Blink BlobIndex decoding.""" + serialized_value = bytes([ + 0xff, 0x03, 0x3f, 0x00, 0x6c, 0x01, + 0x04, 0x70, 0x61, 0x74, 0x68, + 0x0d, 0x72, 0x65, 0x6c, 0x61, 0x74, 0x69, 0x76, 0x65, + 0x5f, 0x70, 0x61, 0x74, 0x68, + 0x0a, 0x74, 0x65, 0x78, 0x74, 0x2f, 0x70, 0x6c, 0x61, + 0x69, 0x6e]) + parsed_file = blink.File( + path='path', + name=None, + relative_path=None, + uuid='relative_path', + type='text/plain', + has_snapshot=0, + size=None, + last_modified_ms=None, + is_user_visible=1) + expected_value = blink.FileList(files=[parsed_file]) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadFileListIndex(self): + """Tests Blink FileListIndex decoding.""" + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x4c, 0x01, 0x00, 0x00]) + expected_value = blink.FileListIndex( + file_indexes=[blink.FileIndex(index=0)]) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadImageBitmap(self): + """Tests Blink ImageBitmap decoding.""" + with self.assertRaisesRegex(NotImplementedError, 'ReadImageBitmap'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x67, 0x01, 0x01, 0x02, 0x01, + 0x08, 0x00, 0x00, 0xff, 0xff, 0x00, 0xff, 0x00, 0xff]) + _ = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + + def test_ReadImageBitmapTransfer(self): + """Tests Blink ImageBitmapTransfer decoding.""" + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x47, 0x00]) + expected_value = blink.ImageBitmapTransfer(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadImageData(self): + """Tests Blink ImageData decoding.""" + with self.assertRaisesRegex(NotImplementedError, 'ReadImageData'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x23, 0x00]) + _ = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + + def test_ReadDOMPoint(self): + """Tests Blink DOMPoint decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x51, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40]) + expected_value = blink.DOMPoint(x=1.0, y=2.0, z=3.0, w=4.0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMPointReadOnly(self): + """Tests Blink DOMPointReadOnly decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x57, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40]) + expected_value = blink.DOMPointReadOnly(x=1.0, y=2.0, z=3.0, w=4.0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMRect(self): + """Tests Blink DOMRect decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x45, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40]) + expected_value = blink.DOMRect(x=1.0, y=2.0, width=3.0, height=4.0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMRectReadOnly(self): + """Tests Blink DOMReadReadOnly decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x52, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40]) + expected_value = blink.DOMRectReadOnly(x=1.0, y=2.0, width=3.0, height=4.0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMQuad(self): + """Tests Blink DOMQuad decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x54, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x22, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x2a, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x18, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x24, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x2c, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x1c, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x26, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x2e, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x10, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x20, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x28, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x30, 0x40]) + expected_value = blink.DOMQuad( + p1=blink.DOMPoint(x=1.0, y=5.0, z=9.0, w=13.0), + p2=blink.DOMPoint(x=2.0, y=6.0, z=10.0, w=14.0), + p3=blink.DOMPoint(x=3.0, y=7.0, z=11.0, w=15.0), + p4=blink.DOMPoint(x=4.0, y=8.0, z=12.0, w=16.0)) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMMatrix2D(self): + """Tests Blink DOMMatrix2D decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x49, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x10, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x18, 0x40, 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x49, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x14, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x18, 0x40 + ]) + expected_value = blink.DOMMatrix2D(values=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMMatrix2DReadOnly(self): + """Tests Blink DOMMatrix2DReadOnly decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x4f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x10, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x14, 0x40, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x18, 0x40, 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x49, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xf0, 0x3f, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0x40, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x40, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x14, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x18, 0x40 + ]) + expected_value = blink.DOMMatrix2DReadOnly( + values=[1.0, 2.0, 3.0, 4.0, 5.0, 6.0]) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMMatrix(self): + """Tests Blink DOMMatrix decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x59, 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, + 0xf1, 0x3f, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0xf3, 0x3f, 0xcd, 0xcc, + 0xcc, 0xcc, 0xcc, 0xcc, 0xf4, 0x3f, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, + 0xf6, 0x3f, 0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0x00, 0x40, 0x9a, 0x99, + 0x99, 0x99, 0x99, 0x99, 0x01, 0x40, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, + 0x02, 0x40, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x03, 0x40, 0xcd, 0xcc, + 0xcc, 0xcc, 0xcc, 0xcc, 0x08, 0x40, 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, + 0x09, 0x40, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x0a, 0x40, 0x33, 0x33, + 0x33, 0x33, 0x33, 0x33, 0x0b, 0x40, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, + 0x10, 0x40, 0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0x10, 0x40, 0x33, 0x33, + 0x33, 0x33, 0x33, 0x33, 0x11, 0x40, 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, + 0x11, 0x40,]) + expected_value = blink.DOMMatrix( + values=[ + 1.1, 1.2, 1.3, 1.4, 2.1, 2.2, 2.3, 2.4, + 3.1, 3.2, 3.3, 3.4, 4.1, 4.2, 4.3, 4.4]) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMMatrixReadOnly(self): + """Tests Blink DOMMatrixReadonly decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x55, 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, + 0xf1, 0x3f, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0xf3, 0x3f, 0xcd, 0xcc, + 0xcc, 0xcc, 0xcc, 0xcc, 0xf4, 0x3f, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, + 0xf6, 0x3f, 0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0x00, 0x40, 0x9a, 0x99, + 0x99, 0x99, 0x99, 0x99, 0x01, 0x40, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, + 0x02, 0x40, 0x33, 0x33, 0x33, 0x33, 0x33, 0x33, 0x03, 0x40, 0xcd, 0xcc, + 0xcc, 0xcc, 0xcc, 0xcc, 0x08, 0x40, 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, + 0x09, 0x40, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, 0x0a, 0x40, 0x33, 0x33, + 0x33, 0x33, 0x33, 0x33, 0x0b, 0x40, 0x66, 0x66, 0x66, 0x66, 0x66, 0x66, + 0x10, 0x40, 0xcd, 0xcc, 0xcc, 0xcc, 0xcc, 0xcc, 0x10, 0x40, 0x33, 0x33, + 0x33, 0x33, 0x33, 0x33, 0x11, 0x40, 0x9a, 0x99, 0x99, 0x99, 0x99, 0x99, + 0x11, 0x40,]) + expected_value = blink.DOMMatrixReadOnly( + values=[ + 1.1, 1.2, 1.3, 1.4, 2.1, 2.2, 2.3, 2.4, + 3.1, 3.2, 3.3, 3.4, 4.1, 4.2, 4.3, 4.4]) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadMessagePort(self): + """Tests Blink MessagePort decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x4d, 0x00]) + expected_value = blink.MessagePort(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadMojoHandle(self): + """Tests Blink MojoHandle decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x68, 0x00]) + expected_value = blink.MojoHandle(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_OffscreenCanvasTransfer(self): + """Tests Blink OffscreenCanvasTransfer decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x48, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00]) + expected_value = blink.OffscreenCanvasTransfer( + width=0, + height=0, + canvas_id=0, + client_id=0, + sink_id=0, + filter_quality=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadableStreamTransfer(self): + """Tests Blink ReadableStreamTransfer decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x72, 0x00]) + expected_value = blink.ReadableStreamTransfer(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_WriteableStreamTransfer(self): + """Tests Blink WriteableStreamTransfer decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x77, 0x00]) + expected_value = blink.WriteableStreamTransfer(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_TransformStreamTransfer(self): + """Tests Blink TransformStreamTransfer decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x6d, 0x00]) + expected_value = blink.TransformStreamTransfer(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDOMException(self): + """Tests Blink DOMException decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x78, 0x01, 0x41, 0x01, + 0x42, 0x01, 0x43]) + expected_value = blink.DOMException( + name='A', message='B', stack_unused='C') + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadCryptoKey(self): + """Tests Blink CryptoKey decoding.""" + with self.subTest('AES'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x4b, 0x01, 0x01, 0x10, + 0x04, 0x10, + 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, # key data + 0x08, 0x09, 0x10, 0x11, 0x12, 0x13, 0x14, 0x15]) + expected_value = blink.CryptoKey( + algorithm_parameters={ + 'id': definitions.CryptoKeyAlgorithm.AES_CBC, + 'length_bits': 128}, + key_type=definitions.WebCryptoKeyType.SECRET, + extractable=False, + usages=definitions.CryptoKeyUsage.DECRYPT, + key_data=( + b'\x00\x01\x02\x03\x04\x05\x06\x07' + b'\x08\x09\x10\x11\x12\x13\x14\x15')) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + with self.subTest('HMAC'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x4b, 0x02, 0x40, 0x06, + 0x10, 0x40, + 0x84, 0x04, 0x10, 0xb5, 0xb3, 0x15, 0xec, 0x95, # key data + 0x4c, 0x3b, 0xc7, 0x9c, 0x6a, 0x89, 0x06, 0xaa, + 0x25, 0x9a, 0xfe, 0xeb, 0x30, 0x14, 0xfa, 0x81, + 0x95, 0xd2, 0x69, 0x48, 0x87, 0xe8, 0x60, 0x2c, + 0x89, 0xb3, 0x22, 0xfd, 0x6a, 0x74, 0xe2, 0xdc, + 0x60, 0xbd, 0xd1, 0xcd, 0x2e, 0x2f, 0x1f, 0xb6, + 0x7f, 0xf6, 0x22, 0xf1, 0x25, 0xe8, 0xd7, 0x90, + 0xfb, 0x80, 0x21, 0x7e, 0x79, 0xc3, 0x5f, 0x98]) + expected_value = blink.CryptoKey( + algorithm_parameters={ + 'id': definitions.CryptoKeyAlgorithm.SHA256, + 'length_bits': 512}, + key_type=definitions.WebCryptoKeyType.SECRET, + extractable=False, + usages=definitions.CryptoKeyUsage.VERIFY, + key_data=( + b'\x84\x04\x10\xb5\xb3\x15\xec\x95' + b'\x4c\x3b\xc7\x9c\x6a\x89\x06\xaa' + b'\x25\x9a\xfe\xeb\x30\x14\xfa\x81' + b'\x95\xd2\x69\x48\x87\xe8\x60\x2c' + b'\x89\xb3\x22\xfd\x6a\x74\xe2\xdc' + b'\x60\xbd\xd1\xcd\x2e\x2f\x1f\xb6' + b'\x7f\xf6\x22\xf1\x25\xe8\xd7\x90' + b'\xfb\x80\x21\x7e\x79\xc3\x5f\x98')) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + with self.subTest('RSAHashedKey'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x4b, 0x04, 0x0d, 0x01, + 0x80, 0x08, 0x03, 0x01, 0x00, 0x01, 0x06, 0x11, + 0xa2, 0x01, + 0x0a, 0xc9, 0x9b, 0xea, 0x4a, 0x34, 0xec, 0x34, # key data + 0xe0, 0x79, 0x5e, 0x8d, 0x12, 0x25, 0x4f, 0x19, + 0x93, 0x82, 0x32, 0xd8, 0x87, 0x60, 0xe7, 0x1d, + 0x9c, 0xe5, 0x42, 0x49, 0x28, 0x26, 0x45, 0x72, + 0x9d, 0x69, 0x86, 0x23, 0xd2, 0xa7, 0x51, 0x97, + 0xb8, 0x06, 0x9c, 0xb2, 0x03, 0xe3, 0xb8, 0xdc, + 0xab, 0x5f, 0xb4, 0xc1, 0x2b, 0xc2, 0xd5, 0x37, + 0x12, 0x3d, 0x25, 0x1a, 0xfb, 0x42, 0x6d, 0xd6, + 0x51, 0xb3, 0x64, 0xbf, 0x50, 0xaf, 0x31, 0x25, + 0xcb, 0x98, 0x1a, 0xb7, 0xac, 0x67, 0x86, 0x68, + 0xf1, 0x4c, 0xcc, 0x34, 0x28, 0xa6, 0xc9, 0x9a, + 0x3e, 0x5c, 0x75, 0x61, 0xa1, 0x78, 0x39, 0x6e, + 0x4e, 0x10, 0x48, 0xf8, 0x5e, 0x69, 0xf1, 0xb4, + 0x3c, 0x12, 0x11, 0x63, 0x68, 0x7b, 0x86, 0xd8, + 0xa8, 0xe3, 0x83, 0xf9, 0x3f, 0xa0, 0xc6, 0xfb, + 0x0d, 0x5a, 0x01, 0x82, 0x9b, 0x9e, 0x7a, 0xa4, + 0x57, 0xa3, 0x84, 0x2b, 0xb6, 0xc7, 0xae, 0x0e, + 0xad, 0xeb, 0x86, 0xc8, 0x72, 0xb3, 0x2a, 0x5e, + 0xf3, 0x1a, 0x33, 0x59, 0xe3, 0x61, 0xba, 0x91, + 0x85, 0xf8, 0x48, 0x3b, 0x9a, 0xb6, 0xbf, 0xe9, + 0xe3, 0xb3]) + expected_value = blink.CryptoKey( + algorithm_parameters={ + 'id': definitions.CryptoKeyAlgorithm.RSA_PSS, + 'modulus_length_bits': 1024, + 'public_exponent_size': 3, + 'public_exponent_bytes': b'\x01\x00\x01', + 'hash': definitions.CryptoKeyAlgorithm.SHA256}, + key_type=definitions.AsymmetricCryptoKeyType.PUBLIC_KEY, + extractable=True, + usages=( + definitions.CryptoKeyUsage.VERIFY | + definitions.CryptoKeyUsage.EXTRACTABLE), + key_data=( + b'\x0a\xc9\x9b\xea\x4a\x34\xec\x34' + b'\xe0\x79\x5e\x8d\x12\x25\x4f\x19' + b'\x93\x82\x32\xd8\x87\x60\xe7\x1d' + b'\x9c\xe5\x42\x49\x28\x26\x45\x72' + b'\x9d\x69\x86\x23\xd2\xa7\x51\x97' + b'\xb8\x06\x9c\xb2\x03\xe3\xb8\xdc' + b'\xab\x5f\xb4\xc1\x2b\xc2\xd5\x37' + b'\x12\x3d\x25\x1a\xfb\x42\x6d\xd6' + b'\x51\xb3\x64\xbf\x50\xaf\x31\x25' + b'\xcb\x98\x1a\xb7\xac\x67\x86\x68' + b'\xf1\x4c\xcc\x34\x28\xa6\xc9\x9a' + b'\x3e\x5c\x75\x61\xa1\x78\x39\x6e' + b'\x4e\x10\x48\xf8\x5e\x69\xf1\xb4' + b'\x3c\x12\x11\x63\x68\x7b\x86\xd8' + b'\xa8\xe3\x83\xf9\x3f\xa0\xc6\xfb' + b'\x0d\x5a\x01\x82\x9b\x9e\x7a\xa4' + b'\x57\xa3\x84\x2b\xb6\xc7\xae\x0e' + b'\xad\xeb\x86\xc8\x72\xb3\x2a\x5e' + b'\xf3\x1a\x33\x59\xe3\x61\xba\x91' + b'\x85\xf8\x48\x3b\x9a\xb6\xbf\xe9' + b'\xe3\xb3')) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + with self.subTest('ECDSA Key'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x4b, 0x05, 0x0e, 0x01, + 0x01, 0x11, 0x5b, + 0xcb, 0x98, 0x1a, 0xb7, 0xac, 0x67, 0x86, 0x68, # key data + 0xf1, 0x4c, 0xcc, 0x34, 0x28, 0xa6, 0xc9, 0x9a, + 0x3e, 0x5c, 0x75, 0x61, 0xa1, 0x78, 0x39, 0x6e, + 0x4e, 0x10, 0x48, 0xf8, 0x5e, 0x69, 0xf1, 0xb4, + 0x3c, 0x12, 0x11, 0x63, 0x68, 0x7b, 0x86, 0xd8, + 0xa8, 0xe3, 0x83, 0xf9, 0x3f, 0xa0, 0xc6, 0xfb, + 0x0d, 0x5a, 0x01, 0x82, 0x9b, 0x9e, 0x7a, 0xa4, + 0x57, 0xa3, 0x84, 0x2b, 0xb6, 0xc7, 0xae, 0x0e, + 0xad, 0xeb, 0x86, 0xc8, 0x72, 0xb3, 0x2a, 0x5e, + 0xf3, 0x1a, 0x33, 0x59, 0xe3, 0x61, 0xba, 0x91, + 0x85, 0xf8, 0x48, 0x3b, 0x9a, 0xb6, 0xbf, 0xe9, + 0xe3, 0xb3, 0x30]) + expected_value = blink.CryptoKey( + algorithm_parameters={ + 'crypto_key_algorithm': definitions.CryptoKeyAlgorithm.ECDSA, + 'named_curve_type': definitions.NamedCurve.P256}, + key_type=definitions.AsymmetricCryptoKeyType.PUBLIC_KEY, + extractable=True, + usages=( + definitions.CryptoKeyUsage.VERIFY + | definitions.CryptoKeyUsage.EXTRACTABLE), + key_data=( + b'\xcb\x98\x1a\xb7\xac\x67\x86\x68' + b'\xf1\x4c\xcc\x34\x28\xa6\xc9\x9a' + b'\x3e\x5c\x75\x61\xa1\x78\x39\x6e' + b'\x4e\x10\x48\xf8\x5e\x69\xf1\xb4' + b'\x3c\x12\x11\x63\x68\x7b\x86\xd8' + b'\xa8\xe3\x83\xf9\x3f\xa0\xc6\xfb' + b'\x0d\x5a\x01\x82\x9b\x9e\x7a\xa4' + b'\x57\xa3\x84\x2b\xb6\xc7\xae\x0e' + b'\xad\xeb\x86\xc8\x72\xb3\x2a\x5e' + b'\xf3\x1a\x33\x59\xe3\x61\xba\x91' + b'\x85\xf8\x48\x3b\x9a\xb6\xbf\xe9' + b'\xe3\xb3\x30')) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + with self.subTest('NoParams Key'): + serialized_value = bytes([ + 0xff, 0x09, 0x3f, 0x00, 0x4b, 0x06, 0x11, 0xa0, 0x02, + 0x03, 0x01, 0x02, 0x03, 0x00]) + expected_value = blink.CryptoKey( + algorithm_parameters={ + 'crypto_key_algorithm': definitions.CryptoKeyAlgorithm.PBKDF2}, + key_type=definitions.WebCryptoKeyType.SECRET, + extractable=False, + usages=( + definitions.CryptoKeyUsage.DRIVE_BITS + | definitions.CryptoKeyUsage.DERIVE_KEY), + key_data=b'\x01\x02\x03' + ) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadDomFileSystem(self): + """Tests UnguessableToken decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x64, 0x00, 0x00, 0x00]) + expected_value = blink.DOMFileSystem(raw_type=0, name='', root_url='') + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadFileSystemFileHandle(self): + """Tests UnguessableToken decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x64, 0x00, 0x00, 0x00]) + expected_value = blink.DOMFileSystem(raw_type=0, name='', root_url='') + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadRTCEncodedAudioFrame(self): + """Tests Blink RTCEncodedAudioFrame decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x41, 0x00]) + expected_value = blink.RTCEncodedAudioFrame(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadRTCEncodedVideoFrame(self): + """Tests Blink RTCEncodedVideoFrame decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x56, 0x00]) + expected_value = blink.RTCEncodedVideoFrame(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadAudioData(self): + """Tests AudioData decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x61, 0x00, 0x00]) + expected_value = blink.AudioData(audio_frame_index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadVideoFrame(self): + """Tests VideoFrame decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x76, 0x00]) + expected_value = blink.VideoFrame(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadEncodedAudioChunk(self): + """Tests UnguessableToken decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x79, 0x00]) + expected_value = blink.EncodedAudioChunk(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadEncodedVideoChunk(self): + """Tests UnguessableToken decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x7a, 0x00]) + expected_value = blink.EncodedVideoChunk(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadMediaStreamTrack(self): + """Tests MediaStream decoding.""" + with self.assertRaisesRegex(NotImplementedError, 'ReadMediaStream'): + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x73, 0x00]) + _ = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + + def test_ReadCropTarget(self): + """Tests CropTarget decoding.""" + with self.assertRaisesRegex(NotImplementedError, 'ReadCropTarget'): + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x63, 0x00]) + _ = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + + def test_ReadRestrictionTarget(self): + """Tests RestrictionTarget decoding.""" + with self.assertRaisesRegex(NotImplementedError, 'ReadRestrictionTarget'): + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x44, 0x00]) + _ = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + + def test_ReadMediaSourceHandle(self): + """Tests MediaSourceHandle decoding.""" + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x53, 0x00]) + expected_value = blink.MediaSourceHandle(index=0) + parsed_value = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + self.assertEqual(parsed_value, expected_value) + + def test_ReadFencedFrameConfig(self): + """Tests FencedFrameConfig decoding.""" + with self.assertRaisesRegex(NotImplementedError, 'ReadFencedFrameConfig'): + serialized_value = bytes([ + 0xff, 0x11, 0xff, 0x0d, 0x5c, 0x43, 0x00]) + _ = blink.V8ScriptValueDecoder.FromBytes(serialized_value) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/dfindexeddb/indexeddb/chromium/record.py b/tests/dfindexeddb/indexeddb/chromium/record.py index a1b6f92..f3b4a2f 100644 --- a/tests/dfindexeddb/indexeddb/chromium/record.py +++ b/tests/dfindexeddb/indexeddb/chromium/record.py @@ -126,13 +126,13 @@ def test_earliest_sweep_key(self): def test_earliest_compaction_key(self): """Tests the EarliestCompactionTimeKey.""" - expected_key = record.EarlistCompactionTimeKey( + expected_key = record.EarliestCompactionTimeKey( offset=4, key_prefix=record.KeyPrefix( offset=0, database_id=0, object_store_id=0, index_id=0)) expected_value = 13318229420051176 record_bytes = ((b'\x00\x00\x00\x00\x06'), (b'\xe8\x8a\x9e\xed\xdbP/')) - parsed_key = record.EarlistCompactionTimeKey.FromBytes(record_bytes[0]) + parsed_key = record.EarliestCompactionTimeKey.FromBytes(record_bytes[0]) parsed_value = parsed_key.ParseValue(record_bytes[1]) self.assertEqual(expected_key, parsed_key) @@ -579,7 +579,7 @@ def test_object_store_data_key(self): encoded_user_key=record.IDBKey( offset=4, type=definitions.IDBKeyType.NUMBER, value=3.0)) expected_value = record.ObjectStoreDataValue( - unkown=4, + unknown=4, is_wrapped=True, blob_offset=1, blob_size=2303,