diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 1d7f1ce3c..8260340f6 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -32,7 +32,7 @@ repos: hooks: - id: codespell args: - - --ignore-words-list=deebot + - --ignore-words-list=deebot,MapP - --skip="./.*,*.csv,*.json" - --quiet-level=2 - --exclude-file=deebot_client/util/continents.py diff --git a/deebot_client/command.py b/deebot_client/command.py index dc0e0cf38..74069979a 100644 --- a/deebot_client/command.py +++ b/deebot_client/command.py @@ -324,8 +324,15 @@ def _pop_or_raise(name: str, type_: type, data: dict[str, Any]) -> Any: try: return type_(value) except ValueError as err: - msg = f'Could not convert "{value}" of {name} into {type_}' - raise DeebotError(msg) from err + if hasattr(type_, "from_xml"): + try: + return type_.from_xml(value) + except ValueError as err2: + msg = f'Could not convert "{value}" of {name} into {type_}' + raise DeebotError(msg) from err2 + else: + msg = f'Could not convert "{value}" of {name} into {type_}' + raise DeebotError(msg) from err class GetCommand(CommandWithMessageHandling, ABC): diff --git a/deebot_client/commands/__init__.py b/deebot_client/commands/__init__.py index 5fb527c47..fc4574353 100644 --- a/deebot_client/commands/__init__.py +++ b/deebot_client/commands/__init__.py @@ -11,6 +11,9 @@ COMMANDS as JSON_COMMANDS, COMMANDS_WITH_MQTT_P2P_HANDLING as JSON_COMMANDS_WITH_MQTT_P2P_HANDLING, ) +from .xml import ( + COMMANDS_WITH_MQTT_P2P_HANDLING as XML_COMMANDS_WITH_MQTT_P2P_HANDLING, +) if TYPE_CHECKING: from deebot_client.command import Command, CommandMqttP2P @@ -18,7 +21,8 @@ COMMANDS: dict[DataType, dict[str, type[Command]]] = {DataType.JSON: JSON_COMMANDS} COMMANDS_WITH_MQTT_P2P_HANDLING: dict[DataType, dict[str, type[CommandMqttP2P]]] = { - DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING + DataType.JSON: JSON_COMMANDS_WITH_MQTT_P2P_HANDLING, + DataType.XML: XML_COMMANDS_WITH_MQTT_P2P_HANDLING, } diff --git a/deebot_client/commands/xml/__init__.py b/deebot_client/commands/xml/__init__.py index 36b35c8ac..024841798 100644 --- a/deebot_client/commands/xml/__init__.py +++ b/deebot_client/commands/xml/__init__.py @@ -6,34 +6,55 @@ from deebot_client.command import Command, CommandMqttP2P +from .battery import GetBatteryInfo from .charge import Charge from .charge_state import GetChargeState +from .clean import Clean, CleanArea, GetCleanState +from .clean_logs import GetCleanLogs from .error import GetError -from .fan_speed import GetFanSpeed +from .fan_speed import GetCleanSpeed, SetCleanSpeed from .life_span import GetLifeSpan from .play_sound import PlaySound from .pos import GetPos from .stats import GetCleanSum +from .water_info import GetWaterBoxInfo, GetWaterPermeability if TYPE_CHECKING: from .common import XmlCommand __all__ = [ "Charge", + "Clean", + "CleanArea", + "GetBatteryInfo", "GetChargeState", + "GetCleanLogs", + "GetCleanSpeed", + "GetCleanState", "GetCleanSum", "GetError", - "GetFanSpeed", "GetLifeSpan", "GetPos", + "GetWaterBoxInfo", + "GetWaterPermeability", "PlaySound", + "SetCleanSpeed", ] # fmt: off # ordered by file asc _COMMANDS: list[type[XmlCommand]] = [ + Clean, + CleanArea, GetError, + GetBatteryInfo, + GetCleanLogs, + GetCleanSpeed, + GetCleanState, GetLifeSpan, + GetWaterBoxInfo, + GetWaterPermeability, + SetCleanSpeed, PlaySound, ] # fmt: on diff --git a/deebot_client/commands/xml/battery.py b/deebot_client/commands/xml/battery.py new file mode 100644 index 000000000..b78c86a90 --- /dev/null +++ b/deebot_client/commands/xml/battery.py @@ -0,0 +1,40 @@ +"""Battery Info command.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import BatteryEvent +from deebot_client.message import HandlingResult + +from .common import XmlCommandWithMessageHandling + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class GetBatteryInfo(XmlCommandWithMessageHandling): + """GetBatteryInfo command.""" + + NAME = "GetBatteryInfo" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if ( + xml.attrib.get("ret") != "ok" + or (battery := xml.find("battery")) is None + or (power := battery.attrib.get("power")) is None + ): + return HandlingResult.analyse() + + if power: + event_bus.notify(BatteryEvent(int(power))) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/commands/xml/charge_state.py b/deebot_client/commands/xml/charge_state.py index 78b7110a2..6fff4d568 100644 --- a/deebot_client/commands/xml/charge_state.py +++ b/deebot_client/commands/xml/charge_state.py @@ -38,7 +38,7 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: case "slotcharging" | "slot_charging" | "wirecharging": status = State.DOCKED case "idle": - status = State.IDLE + pass case "going": status = State.RETURNING case _: diff --git a/deebot_client/commands/xml/clean.py b/deebot_client/commands/xml/clean.py new file mode 100644 index 000000000..b04690992 --- /dev/null +++ b/deebot_client/commands/xml/clean.py @@ -0,0 +1,97 @@ +"""Clean commands.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult +from deebot_client.models import CleanAction, CleanMode, State + +from .common import ExecuteCommand, XmlCommandWithMessageHandling + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class Clean(ExecuteCommand): + """Generic start/pause/stop cleaning command.""" + + NAME = "Clean" + HAS_SUB_ELEMENT = True + + def __init__( + self, action: CleanAction, speed: FanSpeedLevel = FanSpeedLevel.NORMAL + ) -> None: + # + + super().__init__( + { + "type": CleanMode.AUTO.xml_value, + "act": action.xml_value, + "speed": speed.xml_value, + } + ) + + +class CleanArea(ExecuteCommand): + """Clean area command.""" + + NAME = "Clean" + HAS_SUB_ELEMENT = True + + def __init__( + self, + mode: CleanMode, + area: str, + cleanings: int = 1, + speed: FanSpeedLevel = FanSpeedLevel.NORMAL, + ) -> None: + # + + super().__init__( + { + "type": mode.xml_value, + "act": CleanAction.START.xml_value, + "speed": speed.xml_value, + "deep": str(cleanings), + "mid": area, + } + ) + + +class GetCleanState(XmlCommandWithMessageHandling): + """GetCleanState command.""" + + NAME = "GetCleanState" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or (clean := xml.find("clean")) is None: + return HandlingResult.analyse() + + speed_attrib = clean.attrib.get("speed") + if speed_attrib is not None: + fan_speed_level = FanSpeedLevel.from_xml(speed_attrib) + event_bus.notify(FanSpeedEvent(fan_speed_level)) + + clean_attrib = clean.attrib.get("st") + if clean_attrib is not None: + clean_action = CleanAction.from_xml(clean_attrib) + if clean_action == CleanAction.START: + event_bus.notify(StateEvent(State.CLEANING)) + elif clean_action == CleanAction.PAUSE: + event_bus.notify(StateEvent(State.PAUSED)) + else: + _LOGGER.debug("Ignored CleanState %s", clean_action) + + return HandlingResult.success() diff --git a/deebot_client/commands/xml/clean_logs.py b/deebot_client/commands/xml/clean_logs.py new file mode 100644 index 000000000..6e706c398 --- /dev/null +++ b/deebot_client/commands/xml/clean_logs.py @@ -0,0 +1,64 @@ +"""Clean Logs commands.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.command import CommandResult +from deebot_client.events import ( + CleanJobStatus, + CleanLogEntry, + CleanLogEvent, +) +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult + +from .common import XmlCommandWithMessageHandling + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class GetCleanLogs(XmlCommandWithMessageHandling): + """GetCleanLogs command.""" + + NAME = "GetCleanLogs" + + def __init__(self, count: int = 0) -> None: + super().__init__({"count": str(count)}) + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if ( + xml.attrib.get("ret") != "ok" + or (resp_logs := xml.findall("CleanSt")) is None + ): + return HandlingResult.analyse() + + if len(resp_logs) >= 0: + logs: list[CleanLogEntry] = [] + for log in resp_logs: + try: + logs.append( + CleanLogEntry( + timestamp=int(log.attrib["s"]), + image_url="", # Missing + type=log.attrib["t"], + area=int(log.attrib["a"]), + stop_reason=CleanJobStatus.FINISHED, # To be extracted + duration=int(log.attrib["l"]), + ) + ) + except Exception: # pylint: disable=broad-except + _LOGGER.warning("Skipping log entry: %s", log, exc_info=True) + event_bus.notify(CleanLogEvent(logs)) + return CommandResult.success() + return HandlingResult.analyse() diff --git a/deebot_client/commands/xml/common.py b/deebot_client/commands/xml/common.py index 8044c09b7..4d84bf042 100644 --- a/deebot_client/commands/xml/common.py +++ b/deebot_client/commands/xml/common.py @@ -3,12 +3,18 @@ from __future__ import annotations from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, cast +from typing import TYPE_CHECKING, Any, cast from xml.etree.ElementTree import Element, SubElement from defusedxml import ElementTree # type: ignore[import-untyped] -from deebot_client.command import Command, CommandWithMessageHandling, SetCommand +from deebot_client.command import ( + Command, + CommandMqttP2P, + CommandWithMessageHandling, + GetCommand, + SetCommand, +) from deebot_client.const import DataType from deebot_client.logging_filter import get_logger from deebot_client.message import HandlingResult, HandlingState, MessageStr @@ -81,8 +87,50 @@ def _handle_xml(cls, _: EventBus, xml: Element) -> HandlingResult: return HandlingResult(HandlingState.FAILED) -class XmlSetCommand(ExecuteCommand, SetCommand, ABC): +class XmlCommandMqttP2P(XmlCommand, CommandMqttP2P, ABC): + """Json base command for mqtt p2p channel.""" + + @classmethod + def create_from_mqtt(cls, payload: str | bytes | bytearray) -> CommandMqttP2P: + """Create a command from the mqtt data.""" + xml = ElementTree.fromstring(payload) + return cls._create_from_mqtt(xml.attrib) + + def handle_mqtt_p2p( + self, event_bus: EventBus, response_payload: str | bytes | bytearray + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + if isinstance(response_payload, bytearray): + data = bytes(response_payload).decode() + elif isinstance(response_payload, bytes): + data = response_payload.decode() + elif isinstance(response_payload, str): + data = response_payload + else: + msg = "Unsupported message data type {message_type}" + raise TypeError(msg.format(essage_type=type(response_payload))) + self._handle_mqtt_p2p(event_bus, data) + + @abstractmethod + def _handle_mqtt_p2p( + self, event_bus: EventBus, response: dict[str, Any] | str + ) -> None: + """Handle response received over the mqtt channel "p2p".""" + + +class XmlSetCommand(ExecuteCommand, SetCommand, XmlCommandMqttP2P, ABC): """Xml base set command. Command needs to be linked to the "get" command, for handling (updating) the sensors. """ + + +class XmlGetCommand(XmlCommandWithMessageHandling, GetCommand, ABC): + """Xml get command.""" + + @classmethod + @abstractmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle arguments of set command.""" diff --git a/deebot_client/commands/xml/fan_speed.py b/deebot_client/commands/xml/fan_speed.py index 2a834554c..085acc5ba 100644 --- a/deebot_client/commands/xml/fan_speed.py +++ b/deebot_client/commands/xml/fan_speed.py @@ -2,12 +2,15 @@ from __future__ import annotations -from typing import TYPE_CHECKING +from types import MappingProxyType +from typing import TYPE_CHECKING, Any +from deebot_client.command import InitParam from deebot_client.events import FanSpeedEvent, FanSpeedLevel from deebot_client.message import HandlingResult +from deebot_client.util import get_enum -from .common import XmlCommandWithMessageHandling +from .common import XmlGetCommand, XmlSetCommand if TYPE_CHECKING: from xml.etree.ElementTree import Element @@ -15,11 +18,22 @@ from deebot_client.event_bus import EventBus -class GetFanSpeed(XmlCommandWithMessageHandling): - """GetFanSpeed command.""" +class GetCleanSpeed(XmlGetCommand): + """GetCleanSpeed command.""" NAME = "GetCleanSpeed" + @classmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle message->body->data and notify the correct event subscribers. + + :return: A message response + """ + event_bus.notify(FanSpeedEvent(FanSpeedLevel.from_xml(str(args["speed"])))) + return HandlingResult.success() + @classmethod def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: """Handle xml message and notify the correct event subscribers. @@ -29,16 +43,18 @@ def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: if xml.attrib.get("ret") != "ok" or not (speed := xml.attrib.get("speed")): return HandlingResult.analyse() - event: FanSpeedEvent | None = None + event_bus.notify(FanSpeedEvent(FanSpeedLevel.from_xml(speed))) + return HandlingResult.success() + - match speed.lower(): - case "standard": - event = FanSpeedEvent(FanSpeedLevel.NORMAL) - case "strong": - event = FanSpeedEvent(FanSpeedLevel.MAX) +class SetCleanSpeed(XmlSetCommand): + """SetCleanSpeed command.""" - if event: - event_bus.notify(event) - return HandlingResult.success() + NAME = "SetCleanSpeed" + get_command = GetCleanSpeed + _mqtt_params = MappingProxyType({"speed": InitParam(FanSpeedLevel)}) - return HandlingResult.analyse() + def __init__(self, speed: FanSpeedLevel | str) -> None: + if isinstance(speed, str): + speed = get_enum(FanSpeedLevel, speed) + super().__init__({"speed": speed.xml_value}) diff --git a/deebot_client/commands/xml/water_info.py b/deebot_client/commands/xml/water_info.py new file mode 100644 index 000000000..f0da9cad3 --- /dev/null +++ b/deebot_client/commands/xml/water_info.py @@ -0,0 +1,82 @@ +"""WaterBox command module.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from deebot_client.events import ( + WaterAmount, + WaterInfoEvent, +) +from deebot_client.message import HandlingResult + +from .common import XmlGetCommand + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class GetWaterPermeability(XmlGetCommand): + """GetWaterPermeability command.""" + + NAME = "GetWaterPermeability" + + @classmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle message->body->data and notify the correct event subscribers. + + :return: A message response + """ + event_bus.notify(WaterInfoEvent(amount=WaterAmount(int(args["v"])))) + return HandlingResult.success() + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (value := xml.attrib.get("v")): + return HandlingResult.analyse() + + event_bus.notify(WaterInfoEvent(amount=WaterAmount(int(value)))) + return HandlingResult.success() + + +class GetWaterBoxInfo(XmlGetCommand): + """GetWaterBoxInfo command.""" + + NAME = "GetWaterBoxInfo" + + @classmethod + def handle_set_args( + cls, event_bus: EventBus, args: dict[str, Any] + ) -> HandlingResult: + """Handle message->body->data and notify the correct event subscribers. + + :return: A message response + """ + event_bus.notify( + WaterInfoEvent( + amount=WaterAmount.HIGH, mop_attached=(str(args["on"]) != "0") + ) + ) + return HandlingResult.success() + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if xml.attrib.get("ret") != "ok" or not (on := xml.attrib.get("on")): + return HandlingResult.analyse() + + event_bus.notify( + WaterInfoEvent(amount=WaterAmount.HIGH, mop_attached=on != "0") + ) + return HandlingResult.success() diff --git a/deebot_client/device.py b/deebot_client/device.py index 0b2838e7c..88aa36ccb 100644 --- a/deebot_client/device.py +++ b/deebot_client/device.py @@ -14,6 +14,7 @@ from deebot_client.util import cancel from .command import Command +from .const import DataType from .event_bus import EventBus from .events import ( AvailabilityEvent, @@ -38,7 +39,6 @@ _LOGGER = get_logger(__name__) _AVAILABLE_CHECK_INTERVAL = 60 - DeviceCommandExecute = Callable[[Command], Coroutine[Any, Any, dict[str, Any]]] @@ -200,15 +200,30 @@ def _handle_message( try: _LOGGER.debug("Try to handle message %s: %s", message_name, message_data) - if message := get_message(message_name, self._device_info.static.data_type): + message_data_type = self._device_info.static.data_type + if message := get_message(message_name, message_data_type): if isinstance(message_data, dict): data = message_data - else: + elif message_data_type == DataType.JSON: data = json.loads(message_data) + elif isinstance(message_data, bytes): + data = message_data.decode() + elif isinstance(message_data, bytearray): + data = bytes(message_data).decode() + elif isinstance(message_data, str): + data = message_data + else: + msg = "Unsupported message data type {message_name}: {message_type}" + raise TypeError( + msg.format( + message_name=message_name, message_type=type(message_data) + ) + ) - fw_version = data.get("header", {}).get("fwVer", None) - if fw_version: - self.fw_version = fw_version + if isinstance(data, dict): + fw_version = data.get("header", {}).get("fwVer", None) + if fw_version: + self.fw_version = fw_version message.handle(self.events, data) except Exception: # pylint: disable=broad-except diff --git a/deebot_client/events/fan_speed.py b/deebot_client/events/fan_speed.py index 4a8f599e1..2a1dec40f 100644 --- a/deebot_client/events/fan_speed.py +++ b/deebot_client/events/fan_speed.py @@ -3,20 +3,22 @@ from __future__ import annotations from dataclasses import dataclass -from enum import IntEnum, unique +from enum import unique + +from deebot_client.util.enum import IntEnumWithXml from .base import Event @unique -class FanSpeedLevel(IntEnum): +class FanSpeedLevel(IntEnumWithXml): """Enum class for all possible fan speed levels.""" # Values should be sort from low to high on their meanings - QUIET = 1000 - NORMAL = 0 - MAX = 1 - MAX_PLUS = 2 + QUIET = 1000, "" + NORMAL = 0, "standard" + MAX = 1, "strong" + MAX_PLUS = 2, "" @dataclass(frozen=True) diff --git a/deebot_client/hardware/deebot/2pv572.py b/deebot_client/hardware/deebot/2pv572.py new file mode 100644 index 000000000..555c79f56 --- /dev/null +++ b/deebot_client/hardware/deebot/2pv572.py @@ -0,0 +1,103 @@ +"""2pv572 Capabilities.""" + +from __future__ import annotations + +from deebot_client.capabilities import ( + Capabilities, + CapabilityClean, + CapabilityCleanAction, + CapabilityCustomCommand, + CapabilityEvent, + CapabilityExecute, + CapabilityLifeSpan, + CapabilitySet, + CapabilitySettings, + CapabilitySetTypes, + CapabilityStats, + DeviceType, +) +from deebot_client.commands.json import SetVolume +from deebot_client.commands.json.custom import CustomCommand +from deebot_client.commands.xml import ( + Charge, + Clean, + CleanArea, + GetBatteryInfo, + GetCleanLogs, + GetCleanSpeed, + GetCleanState, + PlaySound, + SetCleanSpeed, +) +from deebot_client.commands.xml.charge_state import GetChargeState +from deebot_client.commands.xml.error import GetError +from deebot_client.commands.xml.stats import GetCleanSum +from deebot_client.const import DataType +from deebot_client.events import ( + AvailabilityEvent, + BatteryEvent, + CleanLogEvent, + CustomCommandEvent, + ErrorEvent, + FanSpeedEvent, + FanSpeedLevel, + LifeSpanEvent, + NetworkInfoEvent, + ReportStatsEvent, + StateEvent, + StatsEvent, + TotalStatsEvent, + VolumeEvent, +) +from deebot_client.models import StaticDeviceInfo +from deebot_client.util import short_name + +from . import DEVICES + +DEVICES[short_name(__name__)] = StaticDeviceInfo( + DataType.XML, + Capabilities( + availability=CapabilityEvent(AvailabilityEvent, []), + battery=CapabilityEvent(BatteryEvent, [GetBatteryInfo()]), + charge=CapabilityExecute(Charge), + clean=CapabilityClean( + action=CapabilityCleanAction(command=Clean, area=CleanArea), + log=CapabilityEvent(CleanLogEvent, [GetCleanLogs()]), + ), + custom=CapabilityCustomCommand( + event=CustomCommandEvent, get=[], set=CustomCommand + ), + device_type=DeviceType.VACUUM, + error=CapabilityEvent(ErrorEvent, [GetError()]), + fan_speed=CapabilitySetTypes( + event=FanSpeedEvent, + get=[GetCleanSpeed()], + set=SetCleanSpeed, + types=( + FanSpeedLevel.NORMAL, + FanSpeedLevel.MAX, + ), + ), + life_span=CapabilityLifeSpan( + types=(), + event=LifeSpanEvent, + get=[], + reset=CustomCommand, + ), + network=CapabilityEvent(NetworkInfoEvent, []), + play_sound=CapabilityExecute(PlaySound), + settings=CapabilitySettings( + volume=CapabilitySet( + event=VolumeEvent, + get=[], + set=SetVolume, + ), + ), + state=CapabilityEvent(StateEvent, [GetChargeState(), GetCleanState()]), + stats=CapabilityStats( + clean=CapabilityEvent(StatsEvent, [GetCleanSum()]), + report=CapabilityEvent(ReportStatsEvent, []), + total=CapabilityEvent(TotalStatsEvent, [GetCleanSum()]), + ), + ), +) diff --git a/deebot_client/hardware/deebot/ls1ok3.py b/deebot_client/hardware/deebot/ls1ok3.py new file mode 100644 index 000000000..fb76e34d3 --- /dev/null +++ b/deebot_client/hardware/deebot/ls1ok3.py @@ -0,0 +1,101 @@ +"""ls1ok3 Capabilities.""" + +from __future__ import annotations + +from deebot_client.capabilities import ( + Capabilities, + CapabilityClean, + CapabilityCleanAction, + CapabilityCustomCommand, + CapabilityEvent, + CapabilityExecute, + CapabilityLifeSpan, + CapabilitySet, + CapabilitySettings, + CapabilitySetTypes, + CapabilityStats, + DeviceType, +) +from deebot_client.commands.json import SetVolume +from deebot_client.commands.json.custom import CustomCommand +from deebot_client.commands.xml import ( + Charge, + Clean, + CleanArea, + GetBatteryInfo, + GetCleanLogs, + GetCleanSpeed, + GetCleanState, + PlaySound, + SetCleanSpeed, +) +from deebot_client.commands.xml.charge_state import GetChargeState +from deebot_client.commands.xml.error import GetError +from deebot_client.commands.xml.stats import GetCleanSum +from deebot_client.const import DataType +from deebot_client.events import ( + AvailabilityEvent, + BatteryEvent, + CleanLogEvent, + CustomCommandEvent, + ErrorEvent, + FanSpeedEvent, + FanSpeedLevel, + LifeSpanEvent, + NetworkInfoEvent, + ReportStatsEvent, + StateEvent, + StatsEvent, + TotalStatsEvent, + VolumeEvent, +) +from deebot_client.models import StaticDeviceInfo +from deebot_client.util import short_name + +from . import DEVICES + +DEVICES[short_name(__name__)] = StaticDeviceInfo( + DataType.XML, + Capabilities( + availability=CapabilityEvent(AvailabilityEvent, []), + battery=CapabilityEvent(BatteryEvent, [GetBatteryInfo()]), + charge=CapabilityExecute(Charge), + clean=CapabilityClean( + action=CapabilityCleanAction(command=Clean, area=CleanArea), + log=CapabilityEvent(CleanLogEvent, [GetCleanLogs()]), + ), + custom=CapabilityCustomCommand(event=CustomCommandEvent, get=[], set=CustomCommand), + device_type=DeviceType.VACUUM, + error=CapabilityEvent(ErrorEvent, [GetError()]), + fan_speed=CapabilitySetTypes( + event=FanSpeedEvent, + get=[GetCleanSpeed()], + set=SetCleanSpeed, + types=( + FanSpeedLevel.NORMAL, + FanSpeedLevel.MAX, + ), + ), + life_span=CapabilityLifeSpan( + types=(), + event=LifeSpanEvent, + get=[], + reset=CustomCommand, + ), + network=CapabilityEvent(NetworkInfoEvent, []), + play_sound=CapabilityExecute(PlaySound), + settings=CapabilitySettings( + volume=CapabilitySet( + event=VolumeEvent, + get=[], + set=SetVolume, + ), + ), + state=CapabilityEvent(StateEvent, [GetChargeState(), GetCleanState()]), + stats=CapabilityStats( + clean=CapabilityEvent(StatsEvent, [GetCleanSum()]), + report=CapabilityEvent(ReportStatsEvent, []), + total=CapabilityEvent(TotalStatsEvent, [GetCleanSum()]), + ), + ), +) \ No newline at end of file diff --git a/deebot_client/messages/__init__.py b/deebot_client/messages/__init__.py index f4e529d2d..46f39ebdc 100644 --- a/deebot_client/messages/__init__.py +++ b/deebot_client/messages/__init__.py @@ -9,11 +9,13 @@ from deebot_client.message import Message from .json import MESSAGES as JSON_MESSAGES +from .xml import MESSAGES as XML_MESSAGES _LOGGER = get_logger(__name__) MESSAGES = { DataType.JSON: JSON_MESSAGES, + DataType.XML: XML_MESSAGES, } _LEGACY_USE_GET_COMMAND = [ diff --git a/deebot_client/messages/xml/__init__.py b/deebot_client/messages/xml/__init__.py new file mode 100644 index 000000000..024ce5605 --- /dev/null +++ b/deebot_client/messages/xml/__init__.py @@ -0,0 +1,39 @@ +"""XML messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.messages.xml.battery import BatteryInfo +from deebot_client.messages.xml.charge import ChargeState +from deebot_client.messages.xml.clean import CleanReport, CleanSt +from deebot_client.messages.xml.map import MapP +from deebot_client.messages.xml.pos import Pos +from deebot_client.messages.xml.water_info import WaterBoxInfo + +if TYPE_CHECKING: + from deebot_client.message import Message + +__all__ = [ + "BatteryInfo", + "ChargeState", + "CleanReport", + "CleanSt", + "MapP", + "Pos", + "WaterBoxInfo", +] +# fmt: off +# ordered by file asc +_MESSAGES: list[type[Message]] = [ + BatteryInfo, + ChargeState, + CleanReport, + WaterBoxInfo, + Pos, + MapP, + CleanSt +] +# fmt: on + +MESSAGES: dict[str, type[Message]] = {message.NAME: message for message in _MESSAGES} diff --git a/deebot_client/messages/xml/battery.py b/deebot_client/messages/xml/battery.py new file mode 100644 index 000000000..8c650d97c --- /dev/null +++ b/deebot_client/messages/xml/battery.py @@ -0,0 +1,37 @@ +"""Battery messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import BatteryEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class BatteryInfo(XmlMessage): + """BatteryInfo message.""" + + NAME = "BatteryInfo" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if (battery := xml.find("battery")) is None or ( + power := battery.attrib.get("power") + ) is None: + return HandlingResult.analyse() + + if power: + event_bus.notify(BatteryEvent(int(power))) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/charge.py b/deebot_client/messages/xml/charge.py new file mode 100644 index 000000000..e6fff7ea6 --- /dev/null +++ b/deebot_client/messages/xml/charge.py @@ -0,0 +1,47 @@ +"""Charge messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import StateEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage +from deebot_client.models import State + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class ChargeState(XmlMessage): + """ChargeState message.""" + + NAME = "ChargeState" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + status: State | None = None + + if (charge := xml.find("charge")) is not None: + type_ = charge.attrib["type"].lower() + match type_: + case "slotcharging" | "slot_charging" | "wirecharging": + status = State.DOCKED + case "idle": + pass + case "going": + status = State.RETURNING + case _: + status = State.ERROR + + if status: + event_bus.notify(StateEvent(status)) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/clean.py b/deebot_client/messages/xml/clean.py new file mode 100644 index 000000000..1de3e2aae --- /dev/null +++ b/deebot_client/messages/xml/clean.py @@ -0,0 +1,66 @@ +"""Clean messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent +from deebot_client.logging_filter import get_logger +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage +from deebot_client.models import CleanAction, State + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + +_LOGGER = get_logger(__name__) + + +class CleanSt(XmlMessage): + """CleanSt message.""" + + NAME = "CleanSt" + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, _xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + :return: A message response + """ + return HandlingResult.analyse() + + +class CleanReport(XmlMessage): + """CleanReport message.""" + + NAME = "CleanReport" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if (clean := xml.find("clean")) is None: + return HandlingResult.analyse() + + speed_attrib = clean.attrib.get("speed") + if speed_attrib is not None: + fan_speed_level = FanSpeedLevel.from_xml(speed_attrib) + event_bus.notify(FanSpeedEvent(fan_speed_level)) + + clean_attrib = clean.attrib.get("st") + if clean_attrib is not None: + clean_action = CleanAction.from_xml(clean_attrib) + if clean_action == CleanAction.START: + event_bus.notify(StateEvent(State.CLEANING)) + elif clean_action == CleanAction.PAUSE: + event_bus.notify(StateEvent(State.PAUSED)) + else: + _LOGGER.debug("Ignored CleanState %s", clean_action) + + return HandlingResult.success() diff --git a/deebot_client/messages/xml/common.py b/deebot_client/messages/xml/common.py new file mode 100644 index 000000000..e63493d65 --- /dev/null +++ b/deebot_client/messages/xml/common.py @@ -0,0 +1,36 @@ +"""Common xml based messages.""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING +from xml.etree import ElementTree as ET + +from deebot_client.message import MessageStr + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + from deebot_client.message import HandlingResult + + +class XmlMessage(MessageStr, ABC): + """Xml message.""" + + @classmethod + def _handle_str(cls, event_bus: EventBus, message: str) -> HandlingResult: + """Handle string message and notify the correct event subscribers. + + :return: A message response + """ + xml = ET.fromstring(message) # noqa: S314 + return cls._handle_xml(event_bus, xml) + + @classmethod + @abstractmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ diff --git a/deebot_client/messages/xml/map.py b/deebot_client/messages/xml/map.py new file mode 100644 index 000000000..5b6b954c1 --- /dev/null +++ b/deebot_client/messages/xml/map.py @@ -0,0 +1,32 @@ +"""Map messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class MapP(XmlMessage): + """MapP message. + + Probably a map piece + """ + + NAME = "MapP" + + @classmethod + def _handle_xml(cls, _event_bus: EventBus, _xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + b"" + + :return: A message response + """ + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/pos.py b/deebot_client/messages/xml/pos.py new file mode 100644 index 000000000..703495337 --- /dev/null +++ b/deebot_client/messages/xml/pos.py @@ -0,0 +1,34 @@ +"""Pos messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import Position, PositionsEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage +from deebot_client.rs.map import PositionType + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class Pos(XmlMessage): + """Pos message.""" + + NAME = "Pos" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + if p := xml.attrib.get("p"): + p_x, p_y = p.split(",", 2) + p_a = xml.attrib.get("a", 0) + position = Position( + type=PositionType.DEEBOT, x=int(p_x), y=int(p_y), a=int(p_a) + ) + event_bus.notify(PositionsEvent(positions=[position])) + return HandlingResult.success() + + return HandlingResult.analyse() diff --git a/deebot_client/messages/xml/water_info.py b/deebot_client/messages/xml/water_info.py new file mode 100644 index 000000000..97ec912b6 --- /dev/null +++ b/deebot_client/messages/xml/water_info.py @@ -0,0 +1,34 @@ +"""Water messages.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +from deebot_client.events import WaterAmount, WaterInfoEvent +from deebot_client.message import HandlingResult +from deebot_client.messages.xml.common import XmlMessage + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + + from deebot_client.event_bus import EventBus + + +class WaterBoxInfo(XmlMessage): + """WaterBoxInfo message.""" + + NAME = "WaterBoxInfo" + + @classmethod + def _handle_xml(cls, event_bus: EventBus, xml: Element) -> HandlingResult: + """Handle xml message and notify the correct event subscribers. + + :return: A message response + """ + if (on := xml.attrib.get("on")) is None: + return HandlingResult.analyse() + + event_bus.notify( + WaterInfoEvent(amount=WaterAmount.HIGH, mop_attached=on != "0") + ) + return HandlingResult.success() diff --git a/deebot_client/models.py b/deebot_client/models.py index c88ccabb0..426dccf6e 100644 --- a/deebot_client/models.py +++ b/deebot_client/models.py @@ -3,10 +3,12 @@ from __future__ import annotations from dataclasses import dataclass -from enum import IntEnum, StrEnum, unique +from enum import IntEnum, unique from pathlib import Path from typing import TYPE_CHECKING, Required, TypedDict +from deebot_client.util.enum import StrEnumWithXml + if TYPE_CHECKING: from deebot_client.capabilities import Capabilities from deebot_client.const import DataType @@ -64,22 +66,22 @@ class State(IntEnum): @unique -class CleanAction(StrEnum): +class CleanAction(StrEnumWithXml): """Enum class for all possible clean actions.""" - START = "start" - PAUSE = "pause" - RESUME = "resume" - STOP = "stop" + START = "start", "s" + PAUSE = "pause", "p" + RESUME = "resume", "r" + STOP = "stop", "h" @unique -class CleanMode(StrEnum): +class CleanMode(StrEnumWithXml): """Enum class for all possible clean modes.""" - AUTO = "auto" - SPOT_AREA = "spotArea" - CUSTOM_AREA = "customArea" + AUTO = "auto", "auto" + SPOT_AREA = "spotArea", "SpotArea" + CUSTOM_AREA = "customArea", "spot" @dataclass(frozen=True) diff --git a/deebot_client/util/enum.py b/deebot_client/util/enum.py new file mode 100644 index 000000000..80edf531d --- /dev/null +++ b/deebot_client/util/enum.py @@ -0,0 +1,52 @@ +"""Enum util.""" + +from __future__ import annotations + +from enum import IntEnum, StrEnum +from typing import Self + + +class StrEnumWithXml(StrEnum): + """String enum with xml value.""" + + xml_value: str + + def __new__(cls, value: str, xml_value: str = "") -> Self: + """Create new StrEnumWithXml.""" + obj = str.__new__(cls, value) + obj._value_ = value + obj.xml_value = xml_value + return obj + + @classmethod + def from_xml(cls, value: str) -> Self: + """Convert from xml value.""" + for member in cls: + if member.xml_value == value: + return member + + msg = f"{value} is not a valid {cls.__name__}" + raise ValueError(msg) + + +class IntEnumWithXml(IntEnum): + """Int enum with xml value.""" + + xml_value: str + + def __new__(cls, value: int, xml_value: str = "") -> Self: + """Create new StrEnumWithXml.""" + obj = int.__new__(cls, value) + obj._value_ = value + obj.xml_value = xml_value + return obj + + @classmethod + def from_xml(cls, value: str) -> Self: + """Convert from xml value.""" + for member in cls: + if member.xml_value == value: + return member + + msg = f"{value} is not a valid {cls.__name__}" + raise ValueError(msg) diff --git a/tests/commands/xml/test_battery.py b/tests/commands/xml/test_battery.py new file mode 100644 index 000000000..7806969db --- /dev/null +++ b/tests/commands/xml/test_battery.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest + +from deebot_client.command import CommandResult +from deebot_client.commands.xml import GetBatteryInfo +from deebot_client.events import BatteryEvent +from deebot_client.message import HandlingState +from tests.commands import assert_command + +from . import get_request_xml + +if TYPE_CHECKING: + from deebot_client.events.base import Event + + +@pytest.mark.parametrize( + ("power", "expected_event"), + [ + (40, BatteryEvent(40)), + ], + ids=["40_pct_battery"], +) +async def test_get_battery_info(power: int, expected_event: Event) -> None: + json = get_request_xml(f"") + await assert_command(GetBatteryInfo(), json, expected_event) + + +@pytest.mark.parametrize( + "xml", + ["", ""], + ids=["error", "no_state"], +) +async def test_get_battery_info_error(xml: str) -> None: + json = get_request_xml(xml) + await assert_command( + GetBatteryInfo(), + json, + None, + command_result=CommandResult(HandlingState.ANALYSE_LOGGED), + ) diff --git a/tests/commands/xml/test_clean.py b/tests/commands/xml/test_clean.py new file mode 100644 index 000000000..4e36527c0 --- /dev/null +++ b/tests/commands/xml/test_clean.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +import pytest + +from deebot_client.command import CommandResult +from deebot_client.commands.xml import GetCleanState +from deebot_client.commands.xml.clean import CleanArea +from deebot_client.events import FanSpeedEvent, FanSpeedLevel, StateEvent +from deebot_client.message import HandlingState +from deebot_client.models import CleanMode, State +from tests.commands import assert_command + +from . import get_request_xml + + +@pytest.mark.parametrize( + ("command", "command_result"), + [ + (CleanArea(CleanMode.SPOT_AREA, "4", 1), HandlingState.SUCCESS), + ], +) +async def test_CleanArea(command: CleanArea, command_result: HandlingState) -> None: + json = get_request_xml("") + await assert_command( + command, json, None, command_result=CommandResult(command_result) + ) + + +@pytest.mark.parametrize( + ("speed", "state", "expected_fan_speed_event", "expected_state_event"), + [ + ( + "standard", + "s", + FanSpeedEvent(FanSpeedLevel.NORMAL), + StateEvent(State.CLEANING), + ), + ( + "strong", + "s", + FanSpeedEvent(FanSpeedLevel.MAX), + StateEvent(State.CLEANING), + ), + ( + "standard", + "p", + FanSpeedEvent(FanSpeedLevel.NORMAL), + None, + ), + ], + ids=["standard_cleaning", "strong_cleaning", "paused"], +) +async def test_get_clean_state( + speed: str, + state: str, + expected_fan_speed_event: FanSpeedEvent, + expected_state_event: StateEvent, +) -> None: + json = get_request_xml( + f"" + ) + await assert_command( + GetCleanState(), + json, + [x for x in [expected_fan_speed_event, expected_state_event] if x is not None], + ) + + +@pytest.mark.parametrize( + "xml", + ["", ""], + ids=["error", "no_state"], +) +async def test_get_clean_state_error(xml: str) -> None: + json = get_request_xml(xml) + await assert_command( + GetCleanState(), + json, + None, + command_result=CommandResult(HandlingState.ANALYSE_LOGGED), + ) diff --git a/tests/commands/xml/test_fan_speed.py b/tests/commands/xml/test_fan_speed.py index bf728ecfc..288b3c1e2 100644 --- a/tests/commands/xml/test_fan_speed.py +++ b/tests/commands/xml/test_fan_speed.py @@ -4,8 +4,8 @@ import pytest -from deebot_client.command import CommandResult -from deebot_client.commands.xml import GetFanSpeed +from deebot_client.command import CommandResult, CommandWithMessageHandling +from deebot_client.commands.xml import GetCleanSpeed, SetCleanSpeed from deebot_client.events import FanSpeedEvent, FanSpeedLevel from deebot_client.message import HandlingState from tests.commands import assert_command @@ -26,19 +26,36 @@ ) async def test_get_fan_speed(speed: str, expected_event: Event) -> None: json = get_request_xml(f"") - await assert_command(GetFanSpeed(), json, expected_event) + await assert_command(GetCleanSpeed(), json, expected_event) @pytest.mark.parametrize( "xml", - ["", ""], - ids=["error", "no_state"], + [""], + ids=["error"], ) async def test_get_fan_speed_error(xml: str) -> None: json = get_request_xml(xml) await assert_command( - GetFanSpeed(), + GetCleanSpeed(), json, None, command_result=CommandResult(HandlingState.ANALYSE_LOGGED), ) + + +@pytest.mark.parametrize( + ("command", "xml", "result"), + [ + ( + SetCleanSpeed(FanSpeedLevel.MAX), + "", + HandlingState.SUCCESS, + ), + ], +) +async def test_set_fan_speed( + command: CommandWithMessageHandling, xml: str, result: HandlingState +) -> None: + json = get_request_xml(xml) + await assert_command(command, json, None, command_result=CommandResult(result)) diff --git a/tests/util/test_enum.py b/tests/util/test_enum.py new file mode 100644 index 000000000..8b34fe47a --- /dev/null +++ b/tests/util/test_enum.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +import pytest + +from deebot_client.util.enum import StrEnumWithXml + + +class TestStrEnumWithXml(StrEnumWithXml): + """Simple Enum for testing.""" + + ENUM1 = "value1", "xmlvalue1" + ENUM2 = "value2", "xmlvalue2" + + +@pytest.mark.parametrize( + ("test_enum", "test_value", "test_xml_value"), + [ + (TestStrEnumWithXml.ENUM1, "value1", "xmlvalue1"), + (TestStrEnumWithXml.ENUM2, "value2", "xmlvalue2"), + ], +) +def test_StrEnumWithXml_values( + test_enum: TestStrEnumWithXml, test_value: str, test_xml_value: str +) -> None: + assert test_enum.value == test_value + assert test_enum.xml_value == test_xml_value + assert test_value == TestStrEnumWithXml.from_xml(test_xml_value).value + assert test_xml_value == TestStrEnumWithXml(test_value).xml_value + + # test ENUM from invalid xml + try: + TestStrEnumWithXml.from_xml("this_is_invalid") + raise AssertionError + except ValueError: + assert True