-
Notifications
You must be signed in to change notification settings - Fork 685
/
Copy pathtest_job_manager.py
231 lines (196 loc) · 6.48 KB
/
test_job_manager.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
"""Test the condition decorators."""
import asyncio
from unittest.mock import ANY, AsyncMock
import pytest
from supervisor.coresys import CoreSys
from supervisor.exceptions import JobStartException
TEST_JOB = "test"
async def test_add_job(coresys: CoreSys):
"""Test adding jobs."""
job = coresys.jobs.new_job(TEST_JOB)
assert job in coresys.jobs.jobs
async def test_remove_job_directly(coresys: CoreSys, caplog: pytest.LogCaptureFixture):
"""Test removing jobs from manager."""
job = coresys.jobs.new_job(TEST_JOB)
assert job in coresys.jobs.jobs
coresys.jobs.remove_job(job)
assert job not in coresys.jobs.jobs
assert f"Removing incomplete job {job.name}" not in caplog.text
job = coresys.jobs.new_job(TEST_JOB)
assert job in coresys.jobs.jobs
with job.start():
coresys.jobs.remove_job(job)
assert job not in coresys.jobs.jobs
assert f"Removing incomplete job {job.name}" in caplog.text
async def test_job_done(coresys: CoreSys):
"""Test done set correctly with jobs."""
job = coresys.jobs.new_job(TEST_JOB)
assert not job.done
assert not coresys.jobs.is_job
with job.start():
assert coresys.jobs.is_job
assert coresys.jobs.current == job
assert not job.done
assert not coresys.jobs.is_job
assert job.done
with pytest.raises(JobStartException), job.start():
pass
async def test_job_start_bad_parent(coresys: CoreSys):
"""Test job cannot be started outside of parent."""
job = coresys.jobs.new_job(TEST_JOB)
job2 = coresys.jobs.new_job(f"{TEST_JOB}_2")
with job.start(), pytest.raises(JobStartException), job2.start():
pass
with job2.start():
assert coresys.jobs.current == job2
async def test_update_job(coresys: CoreSys):
"""Test updating jobs."""
job = coresys.jobs.new_job(TEST_JOB)
job.progress = 50
assert job.progress == 50
job.stage = "stage"
assert job.stage == "stage"
with pytest.raises(ValueError):
job.progress = 110
with pytest.raises(ValueError):
job.progress = -10
async def test_notify_on_change(coresys: CoreSys, ha_ws_client: AsyncMock):
"""Test jobs notify Home Assistant on changes."""
job = coresys.jobs.new_job(TEST_JOB)
job.progress = 50
await asyncio.sleep(0)
# pylint: disable=protected-access
ha_ws_client.async_send_command.assert_called_with(
{
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": TEST_JOB,
"reference": None,
"uuid": ANY,
"progress": 50,
"stage": None,
"done": None,
"parent_id": None,
"errors": [],
"created": ANY,
},
},
}
)
job.stage = "test"
await asyncio.sleep(0)
ha_ws_client.async_send_command.assert_called_with(
{
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": TEST_JOB,
"reference": None,
"uuid": ANY,
"progress": 50,
"stage": "test",
"done": None,
"parent_id": None,
"errors": [],
"created": ANY,
},
},
}
)
job.reference = "test"
await asyncio.sleep(0)
ha_ws_client.async_send_command.assert_called_with(
{
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": TEST_JOB,
"reference": "test",
"uuid": ANY,
"progress": 50,
"stage": "test",
"done": None,
"parent_id": None,
"errors": [],
"created": ANY,
},
},
}
)
with job.start():
await asyncio.sleep(0)
ha_ws_client.async_send_command.assert_called_with(
{
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": TEST_JOB,
"reference": "test",
"uuid": ANY,
"progress": 50,
"stage": "test",
"done": False,
"parent_id": None,
"errors": [],
"created": ANY,
},
},
}
)
job.capture_error()
await asyncio.sleep(0)
ha_ws_client.async_send_command.assert_called_with(
{
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": TEST_JOB,
"reference": "test",
"uuid": ANY,
"progress": 50,
"stage": "test",
"done": False,
"parent_id": None,
"errors": [
{
"type": "HassioError",
"message": "Unknown error, see supervisor logs",
}
],
"created": ANY,
},
},
}
)
await asyncio.sleep(0)
ha_ws_client.async_send_command.assert_called_with(
{
"type": "supervisor/event",
"data": {
"event": "job",
"data": {
"name": TEST_JOB,
"reference": "test",
"uuid": ANY,
"progress": 50,
"stage": "test",
"done": True,
"parent_id": None,
"errors": [
{
"type": "HassioError",
"message": "Unknown error, see supervisor logs",
}
],
"created": ANY,
},
},
}
)
# pylint: enable=protected-access