Skip to content

MappedBinaryIO, Testimplementation for alternating KaitaiStream - maybe #76

@Hypnootika

Description

@Hypnootika

Hello everyone,

im decently new to working with binary files and KaitaiStruct. I love it but i unfortunately dont like the ReadWriteStruct.

I created a different approach based on the Python Runtime and i would like to have some feedback about possible improvements (and / or / or why) thats not suitable for Kaitai.

Please be kind with me, thats my first "package" and definitely the first mmap impl. i created.

The overall intention is (if you guys like the approach) that i would try to convert it and improve it further ( and create a new /different compiler-mode).

If you see mistakes or not logical implementations, please tell me. I want to learn!

Edit1: Note, there are obviously a lot of functions missing that Kaitai needs. This is just my usecase i currently build this around. Take it as a Prototype for a possible mmap approach.

Edit2: About the performance: I cant really say much at the moment but just by testing this, i already noticed a gain in speed (IDE runs the code a lot faster). Thats obviously a really bad comparison but if someone is interested, i could do tests aswell

import os
import struct
from mmap import mmap, ACCESS_COPY
from typing import List, Union


class Parser:
    """Parser class for binary data"""

    struct_mapping = {
        "u2be": struct.Struct(">H"),
        "u4be": struct.Struct(">I"),
        "u8be": struct.Struct(">Q"),
        "u2le": struct.Struct("<H"),
        "u4le": struct.Struct("<I"),
        "u8le": struct.Struct("<Q"),
        "s1": struct.Struct("b"),
        "s2be": struct.Struct(">h"),
        "s4be": struct.Struct(">i"),
        "s8be": struct.Struct(">q"),
        "s2le": struct.Struct("<h"),
        "s4le": struct.Struct("<i"),
        "s8le": struct.Struct("<q"),
        "f4be": struct.Struct(">f"),
        "f8be": struct.Struct(">d"),
        "f4le": struct.Struct("<f"),
        "f8le": struct.Struct("<d"),
        "u1": struct.Struct("B"),
    }

    range_mapping = {
        "u2be": (0, 65535),
        "u4be": (0, 4294967295),
        "u8be": (0, 18446744073709551615),
        "u2le": (0, 65535),
        "u4le": (0, 4294967295),
        "u8le": (0, 18446744073709551615),
        "s1": (-128, 127),
        "s2be": (-32768, 32767),
        "s4be": (-2147483648, 2147483647),
        "s8be": (-9223372036854775808, 9223372036854775807),
        "s2le": (-32768, 32767),
        "s4le": (-2147483648, 2147483647),
        "s8le": (-9223372036854775808, 9223372036854775807),
        "u1": (0, 255),
        "f4be": (-3.4e38, 3.4e38),
        "f8be": (-1.8e308, 1.8e308),
        "f4le": (-3.4e38, 3.4e38),
        "f8le": (-1.8e308, 1.8e308),
    }

    @classmethod
    def is_value_in_range(cls, pattern_id: str, value: Union[int, float]) -> bool:
        """Check if value is in range of pattern_id"""
        min_value, max_value = cls.range_mapping.get(pattern_id, (None, None))
        if min_value is None or max_value is None:
            raise ValueError(f"Pattern ID {pattern_id} not found.")
        return min_value <= value <= max_value

    @classmethod
    def pack_value(cls, pattern_id: str, value: Union[int, float]) -> bytes:
        """Convert value to bytes"""
        if not cls.is_value_in_range(pattern_id, value):
            raise ValueError(f"Value {value} out of range for pattern ID {pattern_id}.")
        struct_pattern = cls.struct_mapping.get(pattern_id)
        if struct_pattern is None:
            raise ValueError(f"Invalid pattern ID {pattern_id}.")
        return struct_pattern.pack(value)

    def read(self, data: bytes, pattern_id: str) -> bytes:
        """Read bytes from data"""
        size = self.struct_mapping.get(pattern_id, struct.Struct("")).size
        return data[:size]

    def read_value(self, data: bytes, pattern_id: str) -> Union[int, float]:
        """Read value from data"""
        packed_data = self.read(data, pattern_id)
        return self.struct_mapping[pattern_id].unpack(packed_data)[0]

    def read_array(
        self, data: bytes, count: int, pattern_id: str
    ) -> List[Union[int, float]]:
        """Read array of values from data"""
        size = self.struct_mapping[pattern_id].size
        return [
            self.read_value(data[i : i + size], pattern_id)
            for i in range(0, count * size, size)
        ]


class BaseMappedBinary:
    def __init__(self, file_path: str, output_file_path: str = None):
        self.file_path = file_path
        self.output_file_path = output_file_path
        if not os.path.exists(self.file_path):
            self.file = open(self.file_path, "w+b")
        else:
            self.file = open(self.file_path, "r+b")
        self.mapped_file = mmap(self.file.fileno(), 0, access=ACCESS_COPY)
        self.offset = 0
        self.parser = Parser()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def _read_from_offset(self, size: int) -> bytes:
        return self.mapped_file[self.offset : self.offset + size]

    def _update_offset(self, size: int):
        self.offset += size

    def close(self):
        self.mapped_file.close()
        self.file.close()

    def seek(self, offset: int) -> int:
        """Seek to offset"""
        self.offset = offset
        return self.offset

    def tell(self) -> int:
        """Return current offset"""
        return self.offset

    def flush(self):
        self.mapped_file.flush()


class MappedBinaryReader(BaseMappedBinary):
    def __init__(self, file_path: str):
        super().__init__(file_path, output_file_path=None)

    def read(self, pattern_id: str) -> bytes:
        return self.parser.read(
            self._read_from_offset(self.parser.struct_mapping[pattern_id].size),
            pattern_id,
        )

    def read_value(self, pattern_id: str) -> Union[int, float]:
        size = self.parser.struct_mapping[pattern_id].size
        value = self.parser.read_value(self._read_from_offset(size), pattern_id)
        self._update_offset(size)
        return value

    def read_array(self, count: int, pattern_id: str) -> List[Union[int, float]]:
        size = self.parser.struct_mapping[pattern_id].size
        values = self.parser.read_array(
            self._read_from_offset(count * size), count, pattern_id
        )
        self._update_offset(count * size)
        return values

    def read_string(self, count: int) -> str:
        """Read string from data"""
        value = self._read_from_offset(count).decode("utf-8")
        self._update_offset(count)
        return value

    def read_string_array(self, count: int) -> List[str]:
        """Read array of strings from data"""
        return [self.read_string(count) for _ in range(count)]

    def read_string_array_with_count(self) -> List[str]:
        """Read array of strings from data"""
        count = self.read_value("u4le")
        return self.read_string_array(count)

    def read_string_with_count(self) -> str:
        """Read string from data"""
        count = self.read_value("u4le")
        return self.read_string(count)

    def read_bytes(self, count: int) -> bytes:
        """Read bytes from data"""
        return self._read_from_offset(count)

    def read_bytes_with_count(self) -> bytes:
        """Read bytes from data"""
        count = self.read_value("u4le")
        return self._read_from_offset(count)

    def read_value_array_with_count(self, pattern_id: str) -> List[Union[int, float]]:
        """Read array of values from data"""
        count = self.read_value("u4le")
        return self.read_array(count, pattern_id)

    def read_value_array(self, count: int, pattern_id: str) -> List[Union[int, float]]:
        """Read array of values from data"""
        return self.read_array(count, pattern_id)


class MappedBinaryWriter(BaseMappedBinary):
    def __init__(self, file_path: str):
        super().__init__(file_path, output_file_path=None)
        self.data = b""

    def get_data(self) -> bytes:
        """Return the collected data as bytes"""
        return self.data

    def write(self, pattern_id: str, value: Union[int, float]) -> None:
        """Write value to data"""
        self.data += self.parser.pack_value(pattern_id, value)

    def write_value(self, pattern_id: str, value: Union[int, float]) -> None:
        """Write value to data"""
        self.write(pattern_id, value)

    def write_array(self, pattern_id: str, values: List[Union[int, float]]) -> None:
        """Write array of values to data"""
        for value in values:
            self.write_value(pattern_id, value)

    def write_value_array(
        self, pattern_id: str, values: List[Union[int, float]]
    ) -> None:
        """Write array of values to data"""
        self.write_array(pattern_id, values)

    def write_bytes(self, value: bytes) -> None:
        """Write bytes to data"""
        self.data += value

    def write_bytes_with_count(self, value: bytes) -> None:
        """Write bytes to data"""
        self.write_value("u4le", len(value))
        self.write_bytes(value)

    def write_string(self, value: str) -> None:
        """Write string to data"""
        self.data += value.encode("utf-8")

    def write_string_array(self, values: List[str]) -> None:
        """Write array of strings to data"""
        for value in values:
            self.write_string(value)

    def write_string_array_with_count(self, values: List[str]) -> None:
        """Write array of strings to data"""
        self.write_value("u4le", len(values))
        self.write_string_array(values)

    def write_string_with_count(self, value: str) -> None:
        """Write string to data"""
        self.write_value("u4le", len(value))
        self.write_string(value)

    def write_value_array_with_count(
        self, pattern_id: str, values: List[Union[int, float]]
    ) -> None:
        """Write array of values to data"""
        self.write_value("u4le", len(values))
        self.write_array(pattern_id, values)


class MappedBinaryIO(MappedBinaryReader, MappedBinaryWriter):
    def __init__(self, file_path: str, output_file_path: str = None):
        self.file_path = file_path

        if output_file_path is None:
            self.output_file_path = file_path + ".bin"
        else:
            self.output_file_path = output_file_path
        self.reader = MappedBinaryReader(self.file_path)
        self.writer = MappedBinaryWriter(self.file_path)

    def read_value(self, pattern_id: str) -> Union[int, float]:
        return self.reader.read_value(pattern_id)

    def write_value(self, pattern_id: str, value: Union[int, float]) -> None:
        self.writer.write_value(pattern_id, value)

    def flush(self) -> None:
        self.writer.flush()

    def seek(self, offset: int) -> int:
        return self.reader.seek(offset)

    def tell(self) -> int:
        return self.reader.tell()

    def close(self) -> None:
        self.reader.close()
        self.writer.close()

and a testfile class:



class ExpFile(MappedBinaryIO):
    def __init__(self, file_path: str, output_file_path: str = None):
        super().__init__(file_path)
        self._read()
        self.data = self.writer.get_data()
        if output_file_path is None:
            self.output_file_path = file_path + ".bin"
        else:
            self.output_file_path = output_file_path
        self.mapped_file = self.reader.mapped_file

    def _read(self):
        self.magic = self.reader.read_string(4)
        self.version = self.reader.read_value("u2le")
        self.uk = self.reader.read_value("u4le")
        self.header_size = self.reader.read_value("u4le")

    def __repr__(self):
        return (
            f"ExpFile({self.magic=}, {self.version=}, {self.uk=}, {self.header_size=})"
        )

    def _write(self):
        self.writer.write_string(self.magic)
        self.writer.write("u2le", self.version)
        self.writer.write("u4le", self.uk)
        self.writer.write("u4le", self.header_size)
        return self.writer.get_data()

    def write_to_file(self):
        with open(self.output_file_path, "wb") as f:
            f.write(self._write())


if __name__ == "__main__":
    mt = ExpFile(r"D:\binparser\eso0001.dat")
    mt.write_to_file()
    print(mt)
    print(mt.tell())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions