From 246c3ee43c65b683eed0686fd194f2810ccee20c Mon Sep 17 00:00:00 2001 From: Bert Barabas <17627242+bertbarabas@users.noreply.github.com> Date: Tue, 3 Sep 2024 20:26:06 -0500 Subject: [PATCH] Improve performance by removing @dataclass and using __slots__ --- CaptureFile/CaptureFile.py | 189 ++++++++++++++++++++++++++----------- README.md | 1 - docs/README.ipynb | 1 - tests/test_captureFile.py | 2 +- 4 files changed, 136 insertions(+), 57 deletions(-) diff --git a/CaptureFile/CaptureFile.py b/CaptureFile/CaptureFile.py index 34581e9..f87219a 100644 --- a/CaptureFile/CaptureFile.py +++ b/CaptureFile/CaptureFile.py @@ -3,7 +3,6 @@ from contextlib import contextmanager from copy import deepcopy -from dataclasses import InitVar, dataclass, field from functools import _lru_cache_wrapper, lru_cache from io import BytesIO from itertools import islice @@ -29,7 +28,6 @@ Record = Union[str, bytes] -@dataclass class CaptureFile: """The CaptureFile constructor opens and returns a capture file named `file_name` for reading or writing, depending on the value of `to_write`. @@ -80,8 +78,7 @@ class CaptureFile: -1 is the default compromise which currently is equivalent to 6.""" _lock_start_position: ClassVar[int] = 0x7FFFFFFFFFFFFFFE - _lock_end_position: ClassVar[int] = 0x7FFFFFFFFFFFFFFF - _lock_size: ClassVar[int] = _lock_end_position - _lock_start_position + _lock_size: ClassVar[int] = 1 _filenames_opened_for_write_sem: ClassVar[Semaphore] = Semaphore() _filenames_opened_for_write: ClassVar[Set[Path]] = set() @@ -92,40 +89,65 @@ class CaptureFile: Dict[Path, "ReferenceCountedLock"] ] = dict() + __slots__ = ( + "file_name", + "to_write", + "encoding", + "use_os_file_locking", + "_file_name", + "_metadata", + "_config", + "_file", + "_compression_block", + "_current_master_node", + "_new_is_in_progress", + "_record_count", + "_block_cache", + "_full_node_cache", + ) + file_name: str - to_write: bool = False - initial_metadata: InitVar[Optional[bytes]] = None - force_new_empty_file: InitVar[bool] = False - encoding: Optional[str] = "utf_8" - use_os_file_locking: bool = False - compression_block_size: InitVar[int] = 32768 + to_write: bool + encoding: Optional[str] + use_os_file_locking: bool - _file_name: Path = field(init=False) - """A "Path" instance of file_name set during __post_init__""" + _file_name: Path + """A "Path" instance of file_name set during __init__""" - _metadata: Optional[bytes] = field(init=False, default=None) + _metadata: Optional[bytes] - _config: "CaptureFileConfiguration" = field(init=False) + _config: "CaptureFileConfiguration" - _file: Optional[IO[bytes]] = field(init=False, default=None) + _file: Optional[IO[bytes]] - _compression_block: "BytesStream" = field(init=False) + _compression_block: "BytesStream" - _current_master_node: "MasterNode" = field(init=False) + _current_master_node: "MasterNode" - _new_is_in_progress: bool = field(init=False) + _new_is_in_progress: bool - _record_count: int = field(init=False) + _record_count: int - _block_cache: _lru_cache_wrapper = field(init=False) - _full_node_cache: _lru_cache_wrapper = field(init=False) + _block_cache: _lru_cache_wrapper + _full_node_cache: _lru_cache_wrapper - def __post_init__( + def __init__( self, - initial_metadata: Optional[bytes], - force_new_empty_file: bool, - compression_block_size: int, + file_name: str, + to_write: bool = False, + initial_metadata: Optional[bytes] = None, + force_new_empty_file: bool = False, + encoding: Optional[str] = "utf_8", + use_os_file_locking: bool = False, + compression_block_size: int = 32768, ): + self.file_name = file_name + self.to_write = to_write + self.encoding = encoding + self.use_os_file_locking = use_os_file_locking + + self._metadata = None + self._file = None self._block_cache = lru_cache(maxsize=10)(self._block_cache_method) self._full_node_cache = lru_cache(maxsize=10)(self._full_node_cache_method) @@ -135,6 +157,7 @@ def __post_init__( self._new_is_in_progress = True self._new_file(initial_metadata, compression_block_size) self._new_is_in_progress = False + self.open(self.to_write) def __str__(self): @@ -330,7 +353,6 @@ def _fetch_sized_data(self, start_position: int, /) -> bytes: return self._fetch_data(start_position + 4, size) def _fetch_data(self, start_position: int, size: int, /) -> bytes: - assert self._file written_limit = ( self._file_limit() // self._config.page_size * self._config.page_size @@ -629,7 +651,6 @@ def _record_generator( power: int, /, ) -> Generator[Record, None, None]: - rightmost_node = rightmost_path.rightmost_node(height) power = power // self._config.fan_out @@ -928,7 +949,6 @@ def commit(self, /): self._file.flush() -@dataclass class CaptureFileConfiguration: """The persistent configuration values of the capture file that are stored in the first bytes of the file. @@ -939,46 +959,70 @@ class CaptureFileConfiguration: Default values are provided if a new instance of this class is created directly from its constructor""" - version: int = 2 + __slots__ = ( + "version", + "page_size", + "compression_block_size", + "fan_out", + "master_node_size", + "master_node_positions", + "compression_block_start", + "initial_file_limit", + "full_node_struct", + ) + + version: int """The version indicates the compatibility of code with file structure. Code with a version higher than the one stored in file should be capable of reading and writing to the file but a file with a higher version number than what is in the code will not be usable.""" - page_size: int = 4096 + page_size: int """Pages of page_size bytes are used in various places as a minimum block of data. See DESIGN.md for how pages are used.""" - compression_block_size: int = 32768 + compression_block_size: int """Minimum number of bytes to compress and write out. While data is accumulating it is recorded in the master node but after this limit is exceeded it will be compressed and written out""" - fan_out: int = 32 + fan_out: int """The maximum number of children in the index tree's nodes. For more information about the tree structure and usage see DESIGN.md""" - master_node_size: int = field(init=False) + master_node_size: int - master_node_positions: Tuple[int] = field(init=False) + master_node_positions: tuple[int, int] """The two starting byte positions in the file of the two master nodes""" - compression_block_start: int = field(init=False) - initial_file_limit: int = field(init=False) - full_node_struct: Struct = field(init=False) + compression_block_start: int + initial_file_limit: int + full_node_struct: Struct current_version: ClassVar[int] = 2 """The code's current version which can support any earlier version recorded in the file""" capture_file_type: ClassVar[bytes] = b"MioCapture\0" + "Length of capture_file_type is 11 which is where the 11 in >11s4L used in struct comes from" - struct: ClassVar[Struct] = Struct(f">{len(capture_file_type)}s4L") + struct: ClassVar[Struct] = Struct(">11s4L") """Struct = String("MioCapture\0"), Long(version), Long(page_size), Long(compression_block_size), Long(fan_out)""" - def __post_init__(self, /): + def __init__( + self, + version: int = 2, + page_size: int = 4096, + compression_block_size: int = 32768, + fan_out: int = 32, + ): + self.version = version + self.page_size = page_size + self.compression_block_size = compression_block_size + self.fan_out = fan_out + assert ( self.compression_block_size % self.page_size == 0 ), "compression block size must be a multiple of page size" @@ -991,10 +1035,10 @@ def __post_init__(self, /): # compress_and_flush_if_full to know for certain no writing is happening # on the first page after the file is created even across multiple OS # process. - self.master_node_positions = [ + self.master_node_positions = ( self.page_size, self.page_size + self.master_node_size, - ] + ) last_master_page_start = self.page_size - 4 last_master_page_end = last_master_page_start + self.page_size self.compression_block_start = last_master_page_end @@ -1049,7 +1093,6 @@ def write(self, file, /): file.write(buffer) -@dataclass class MasterNode: """ A MasterNode tracks where things are in the capture file. @@ -1063,6 +1106,15 @@ class MasterNode: struct: ClassVar[Struct] = Struct(f">LQL") """Struct = serial_number, file_limit, compression_block_len ">LQL" """ + __slots__ = ( + "serial_number", + "file_limit", + "metadata_pointer", + "rightmost_path", + "contents_of_last_page", + "compression_block_contents", + ) + serial_number: int """MasterNode with largest serial_number is the active one @@ -1093,6 +1145,22 @@ class MasterNode: the file_limit once there is at least compression_block_size data present""" + def __init__( + self, + serial_number: int, + file_limit: int, + metadata_pointer: "DataCoordinates", + rightmost_path: "RightmostPath", + contents_of_last_page: bytearray, + compression_block_contents: bytes, + ): + self.serial_number = serial_number + self.file_limit = file_limit + self.metadata_pointer = metadata_pointer + self.rightmost_path = rightmost_path + self.contents_of_last_page = contents_of_last_page + self.compression_block_contents = compression_block_contents + @classmethod def new_from(cls, master_node_buffer: bytes, page_size: int, /) -> "MasterNode": (serial_number, file_limit, compression_block_len) = cls.struct.unpack_from( @@ -1170,7 +1238,6 @@ def as_bytes(self, config: CaptureFileConfiguration, /) -> bytes: return stream.getvalue() -@dataclass class RightmostPath: """A list of RightmostNodes in height order (leaf -> root), one for each level in the tree. @@ -1181,12 +1248,12 @@ class RightmostPath: number_of_children_struct: ClassVar[Struct] = Struct(">L") """Big-endian unsigned long ">L" """ - rightmost_nodes: List["RightmostNode"] = field(default_factory=list, init=False) + __slots__ = "rightmost_nodes" - buffer: InitVar[Optional[bytes]] = None - offset: InitVar[int] = 0 + rightmost_nodes: List["RightmostNode"] - def __post_init__(self, buffer: Optional[bytes], offset: int, /): + def __init__(self, buffer: Optional[bytes] = None, offset=0) -> None: + self.rightmost_nodes = [] if buffer is not None: ( total_number_of_children, @@ -1279,7 +1346,6 @@ def write_rightmost_nodes(self, stream: "BytesStream", /): rightmost_node.write_with_height(stream, height) -@dataclass class RightmostNode: """This is the rightmost node of a level in the tree index of all records and is not referred to by any parent node. @@ -1301,7 +1367,12 @@ class RightmostNode: and ready to be filled again. """ - children: List["DataCoordinates"] = field(default_factory=list, init=False) + __slots__ = "children" + + children: List["DataCoordinates"] + + def __init__(self) -> None: + self.children = [] def add_child(self, data_coordinate: "DataCoordinates", /): self.children.append(data_coordinate) @@ -1333,7 +1404,6 @@ def child_count(self, /) -> int: return len(self.children) -@dataclass(frozen=True) class DataCoordinates: """The two-dimensional coordinates of data within a capture file. @@ -1352,6 +1422,8 @@ class DataCoordinates: block_size_struct: ClassVar[Struct] = Struct(">L") """Big-endian unsigned long ">L" """ + __slots__ = ("compressed_block_start", "data_start") + compressed_block_start: int """The start position of the compressed block in capture file""" @@ -1360,7 +1432,7 @@ class DataCoordinates: the compressed block""" @classmethod - def from_bytes(cls, block: bytes, offset: int, /) -> "DataCoordinates": + def from_bytes(cls, block: bytes | memoryview, offset: int, /) -> "DataCoordinates": return cls(*cls.struct.unpack_from(block, offset)) @classmethod @@ -1384,6 +1456,10 @@ def from_bytes_with_height_prefix( def null(cls, /) -> "DataCoordinates": return cls(0, 0) + def __init__(self, compressed_block_start: int, data_start: int) -> None: + self.compressed_block_start = compressed_block_start + self.data_start = data_start + def write_data_coordinate(self, stream: "BytesStream", /): stream.write( DataCoordinates.struct.pack(self.compressed_block_start, self.data_start) @@ -1447,10 +1523,15 @@ def zero_fill_to(self, end_position: int, /): self.write(b"\0" * (end_position - self.tell())) -@dataclass class ReferenceCountedLock: - _reference_count: int = 0 - _lock: Lock = Lock() + + __slots__ = ("_reference_count", "_lock" ) + _reference_count: int + _lock: Lock + + def __init__(self) -> None: + self._reference_count = 0 + self._lock = Lock() def add_reference(self) -> None: self._reference_count += 1 diff --git a/README.md b/README.md index 9debec3..4f5fa2f 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,6 @@ API](https://github.com/MIOsoft/CaptureFile-Python/blob/master/docs/CaptureFile. The detailed description covers several useful APIs and parameters that are not covered in the Quickstart below. -To work with capture files visually, you can use the free [MIObdt](https://miosoft.com/miobdt/) application. ## Install diff --git a/docs/README.ipynb b/docs/README.ipynb index 4a66716..2fdbb04 100644 --- a/docs/README.ipynb +++ b/docs/README.ipynb @@ -46,7 +46,6 @@ "The detailed description covers several useful APIs and parameters that are not\n", "covered in the Quickstart below.\n", "\n", - "To work with capture files visually, you can use the free [MIObdt](https://miosoft.com/miobdt/) application.\n", "\n", "## Install\n", "\n", diff --git a/tests/test_captureFile.py b/tests/test_captureFile.py index 13c921f..d5882fd 100644 --- a/tests/test_captureFile.py +++ b/tests/test_captureFile.py @@ -338,7 +338,7 @@ def test_timing_of_iterator(): def test_record_generator_directly(): start = time.time() cfr = CaptureFile(file_name_1) - number_of_records = 1_000_000 + number_of_records = 10_000_000 start_record = 1 rg = cfr.record_generator(start_record) for i in range(start_record, start_record + number_of_records):