diff --git a/tests/test_health.py b/tests/test_health.py new file mode 100644 index 0000000..aab8a9a --- /dev/null +++ b/tests/test_health.py @@ -0,0 +1,69 @@ +from __future__ import annotations + +import datetime + +from inline_snapshot import snapshot + +from zabbix_auto_config.health import HealthFile +from zabbix_auto_config.health import ProcessInfo +from zabbix_auto_config.state import State + + +def test_healthfile_to_json(health_file: HealthFile) -> None: + # NOTE: timedeltas are serialized as `PT#S` where # is the number of seconds + + health_file = HealthFile( + date=datetime.datetime(2021, 1, 1, 0, 0, 0), + cwd="/path/to/zac", + pid=1234, + failsafe=123, + processes=[ + ProcessInfo( + name="test_process", + pid=1235, + alive=True, + state=State( + ok=False, + error="Test error", + error_type="CustomException", + error_count=1, + error_time=1736951323.874142, + execution_count=3, + total_duration=datetime.timedelta(seconds=4), + max_duration=datetime.timedelta(seconds=2), + last_duration_warning=datetime.datetime(2021, 1, 2, 0, 0, 0), + ), + ) + ], + ) + assert health_file.to_json() == snapshot( + """\ +{ + "date": "2021-01-01T00:00:00", + "cwd": "/path/to/zac", + "pid": 1234, + "processes": [ + { + "name": "test_process", + "pid": 1235, + "alive": true, + "state": { + "ok": false, + "error": "Test error", + "error_type": "CustomException", + "error_time": 1736951323.874142, + "error_count": 1, + "execution_count": 3, + "total_duration": "PT4S", + "max_duration": "PT2S", + "last_duration_warning": "2021-01-02T00:00:00" + } + } + ], + "queues": [], + "failsafe": 123, + "date_unixtime": 1609455600, + "all_ok": true +}\ +""" + ) diff --git a/zabbix_auto_config/__init__.py b/zabbix_auto_config/__init__.py index 9d885ad..4a954e5 100644 --- a/zabbix_auto_config/__init__.py +++ b/zabbix_auto_config/__init__.py @@ -3,14 +3,12 @@ import datetime import importlib import importlib.metadata -import json import logging import multiprocessing import os import os.path import sys import time -from pathlib import Path from typing import List import multiprocessing_logging @@ -19,11 +17,11 @@ from zabbix_auto_config import models from zabbix_auto_config import processing from zabbix_auto_config.__about__ import __version__ -from zabbix_auto_config._types import HealthDict from zabbix_auto_config._types import HostModifier from zabbix_auto_config._types import HostModifierModule from zabbix_auto_config._types import SourceCollector from zabbix_auto_config._types import SourceCollectorModule +from zabbix_auto_config.health import write_health from zabbix_auto_config.state import get_manager @@ -105,48 +103,6 @@ def get_config() -> models.Settings: return config -def write_health( - health_file: Path, - processes: List[processing.BaseProcess], - queues: List[multiprocessing.Queue], - failsafe: int, -) -> None: - now = datetime.datetime.now() - health: HealthDict = { - "date": now.isoformat(timespec="seconds"), - "date_unixtime": int(now.timestamp()), - "pid": os.getpid(), - "cwd": os.getcwd(), - "all_ok": all(p.state.ok for p in processes), - "processes": [], - "queues": [], - "failsafe": failsafe, - } - - for process in processes: - health["processes"].append( - { - "name": process.name, - "pid": process.pid, - "alive": process.is_alive(), - **process.state.asdict(), - } - ) - - for queue in queues: - health["queues"].append( - { - "size": queue.qsize(), - } - ) - - try: - with open(health_file, "w") as f: - f.write(json.dumps(health)) - except Exception as e: - logging.error("Unable to write health file %s: %s", health_file, e) - - def log_process_status(processes: List[processing.BaseProcess]) -> None: process_statuses = [] diff --git a/zabbix_auto_config/_types.py b/zabbix_auto_config/_types.py index 8a8afb4..957c150 100644 --- a/zabbix_auto_config/_types.py +++ b/zabbix_auto_config/_types.py @@ -62,22 +62,3 @@ class SourceCollector(NamedTuple): name: str module: SourceCollectorModule config: SourceCollectorSettings - - -class QueueDict(TypedDict): - """Queue information for the health check dict.""" - - size: int - - -class HealthDict(TypedDict): - """Application health dict used by `zabbix_auto_config.__init__.write_health`""" - - date: str - date_unixtime: int - pid: int - cwd: str - all_ok: bool - processes: List[dict] - queues: List[QueueDict] - failsafe: int diff --git a/zabbix_auto_config/health.py b/zabbix_auto_config/health.py new file mode 100644 index 0000000..d6fedc4 --- /dev/null +++ b/zabbix_auto_config/health.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import logging +import multiprocessing +import os +from datetime import datetime +from pathlib import Path +from typing import Optional + +from pydantic import BaseModel +from pydantic import Field +from pydantic import computed_field +from pydantic import field_serializer + +from zabbix_auto_config import processing +from zabbix_auto_config.state import State + + +class HealthFile(BaseModel): + """Health file for the application.""" + + date: datetime = Field(default_factory=datetime.now) + cwd: str + pid: int + processes: list[ProcessInfo] = [] + queues: list[QueueInfo] = [] + failsafe: int + + @computed_field + @property + def date_unixtime(self) -> int: + return int(self.date.timestamp()) + + @computed_field + @property + def all_ok(self) -> bool: + return all(p.alive for p in self.processes) + + @field_serializer("date", when_used="json") + def serialize_date(self, value: datetime) -> str: + return value.isoformat(timespec="seconds") + + def to_json(self) -> str: + return self.model_dump_json(indent=2) + + +class ProcessInfo(BaseModel): + name: str + pid: Optional[int] + alive: bool + state: State + + +class QueueInfo(BaseModel): + size: int + + +def write_health( + health_file: Path, + processes: list[processing.BaseProcess], + queues: list[multiprocessing.Queue], + failsafe: int, +) -> None: + health = HealthFile( + cwd=os.getcwd(), + pid=os.getpid(), + failsafe=failsafe, + ) + + for process in processes: + health.processes.append( + ProcessInfo( + name=process.name, + pid=process.pid, + alive=process.is_alive(), + state=process.state, + ) + ) + + for queue in queues: + health.queues.append( + QueueInfo( + size=queue.qsize(), + ) + ) + + try: + with open(health_file, "w") as f: + f.write(health.to_json()) + except Exception as e: + logging.error("Unable to write health file %s: %s", health_file, e)