-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathservices.py
116 lines (103 loc) · 5.24 KB
/
services.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
""" Implement the services of this implementation """
from home_connect_async import HomeConnect, HomeConnectError, Appliance
from homeassistant.core import HomeAssistant
from homeassistant.exceptions import HomeAssistantError
from homeassistant.helpers import device_registry as dr
class Services():
""" Collection of the Services offered by the integration """
def __init__(self, hass:HomeAssistant, homeconnect:HomeConnect) -> None:
self.homeconnect = homeconnect
self.hass = hass
self.dr = dr.async_get(hass)
async def async_select_program(self, call) -> None:
""" Service for selecting a program """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
program_key = data['program_key']
options = data.get('options')
validate = data.get('validate')
try:
await appliance.async_select_program(program_key, options, validate)
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
async def async_start_program(self, call) -> None:
""" Service for starting the currently selected program """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
program_key = data.get('program_key')
options = data.get('options')
validate = data.get('validate')
try:
await appliance.async_start_program(program_key, options, validate)
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
async def async_stop_program(self, call) -> None:
""" Service for stopping the currently active program """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
try:
await appliance.async_stop_active_program()
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
async def async_pause_program(self, call) -> None:
""" Service for pausing the currently active program """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
try:
await appliance.async_pause_active_program()
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
async def async_resume_program(self, call) -> None:
""" Service for stopping the currently active program """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
try:
await appliance.async_resume_paused_program()
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
async def async_set_program_option(self, call) -> None:
""" Service for setting an option on the current program """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
try:
await appliance.async_set_option(data['key'], data['value'])
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
except ValueError as ex:
raise HomeAssistantError(str(ex)) from ex
async def async_apply_setting(self, call) -> None:
""" Service for applying an appliance setting """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
try:
await appliance.async_apply_setting(data['key'], data['value'])
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
except ValueError as ex:
raise HomeAssistantError(str(ex)) from ex
async def async_run_command(self, call) -> None:
""" Service for running a command on an appliance """
data = call.data
appliance = self.get_appliance_from_device_id(data['device_id'])
if appliance:
try:
await appliance.async_send_command(data['key'], data['value'])
except HomeConnectError as ex:
raise HomeAssistantError(ex.error_description if ex.error_description else ex.msg) from ex
except ValueError as ex:
raise HomeAssistantError(str(ex)) from ex
def get_appliance_from_device_id(self, device_id) -> Appliance|None:
""" Helper function to get an appliance from the Home Assistant device_id """
device = self.dr.devices[device_id]
haId = list(device.identifiers)[0][1]
for (key, appliance) in self.homeconnect.appliances.items():
if key.lower().replace('-','_') == haId:
return appliance
return None