Skip to content

Commit 129951a

Browse files
authored
Merge pull request #187 from rkdarst/pr-179-fixups
Merge new feature branch: add unknown job state
2 parents ab0e00e + 87c5eba commit 129951a

File tree

2 files changed

+94
-41
lines changed

2 files changed

+94
-41
lines changed

batchspawner/batchspawner.py

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424

2525
import xml.etree.ElementTree as ET
2626

27+
from enum import Enum
28+
2729
from jinja2 import Template
2830

2931
from tornado import gen
@@ -55,6 +57,11 @@ def format_template(template, *args, **kwargs):
5557
return Template(template).render(*args, **kwargs)
5658
return template.format(*args, **kwargs)
5759

60+
class JobStatus(Enum):
61+
NOTFOUND = 0
62+
RUNNING = 1
63+
PENDING = 2
64+
UNKNOWN = 3
5865

5966
class BatchSpawnerBase(Spawner):
6067
"""Base class for spawners using resource manager batch job submission mechanisms
@@ -256,30 +263,39 @@ async def submit_batch_script(self):
256263
self.job_id = ''
257264
return self.job_id
258265

259-
# Override if your batch system needs something more elaborate to read the job status
266+
# Override if your batch system needs something more elaborate to query the job status
260267
batch_query_cmd = Unicode('',
261-
help="Command to run to read job status. Formatted using req_xyz traits as {xyz} "
268+
help="Command to run to query job status. Formatted using req_xyz traits as {xyz} "
262269
"and self.job_id as {job_id}."
263270
).tag(config=True)
264271

265-
async def read_job_state(self):
272+
async def query_job_status(self):
273+
"""Check job status, return JobStatus object."""
266274
if self.job_id is None or len(self.job_id) == 0:
267-
# job not running
268275
self.job_status = ''
269-
return self.job_status
276+
return JobStatus.NOTFOUND
270277
subvars = self.get_req_subvars()
271278
subvars['job_id'] = self.job_id
272279
cmd = ' '.join((format_template(self.exec_prefix, **subvars),
273280
format_template(self.batch_query_cmd, **subvars)))
274281
self.log.debug('Spawner querying job: ' + cmd)
275282
try:
276-
out = await self.run_command(cmd)
277-
self.job_status = out
283+
self.job_status = await self.run_command(cmd)
284+
except RuntimeError as e:
285+
# e.args[0] is stderr from the process
286+
self.job_status = e.args[0]
278287
except Exception as e:
279288
self.log.error('Error querying job ' + self.job_id)
280289
self.job_status = ''
281-
finally:
282-
return self.job_status
290+
291+
if self.state_isrunning():
292+
return JobStatus.RUNNING
293+
elif self.state_ispending():
294+
return JobStatus.PENDING
295+
elif self.state_isunknown():
296+
return JobStatus.UNKNOWN
297+
else:
298+
return JobStatus.NOTFOUND
283299

284300
batch_cancel_cmd = Unicode('',
285301
help="Command to stop/cancel a previously submitted job. Formatted like batch_query_cmd."
@@ -326,22 +342,20 @@ def state_isrunning(self):
326342
"Return boolean indicating if job is running, likely by parsing self.job_status"
327343
raise NotImplementedError("Subclass must provide implementation")
328344

345+
def state_isunknown(self):
346+
"Return boolean indicating if job state retrieval failed because of the resource manager"
347+
return None
348+
329349
def state_gethost(self):
330350
"Return string, hostname or addr of running job, likely by parsing self.job_status"
331351
raise NotImplementedError("Subclass must provide implementation")
332352

333353
async def poll(self):
334354
"""Poll the process"""
335-
if self.job_id is not None and len(self.job_id) > 0:
336-
await self.read_job_state()
337-
if self.state_isrunning() or self.state_ispending():
338-
return None
339-
else:
340-
self.clear_state()
341-
return 1
342-
343-
if not self.job_id:
344-
# no job id means it's not running
355+
status = await self.query_job_status()
356+
if status in (JobStatus.PENDING, JobStatus.RUNNING, JobStatus.UNKNOWN):
357+
return None
358+
else:
345359
self.clear_state()
346360
return 1
347361

@@ -366,18 +380,20 @@ async def start(self):
366380
if len(self.job_id) == 0:
367381
raise RuntimeError("Jupyter batch job submission failure (no jobid in output)")
368382
while True:
369-
await self.poll()
370-
if self.state_isrunning():
383+
status = await self.query_job_status()
384+
if status == JobStatus.RUNNING:
371385
break
386+
elif status == JobStatus.PENDING:
387+
self.log.debug('Job ' + self.job_id + ' still pending')
388+
elif status == JobStatus.UNKNOWN:
389+
self.log.debug('Job ' + self.job_id + ' still unknown')
372390
else:
373-
if self.state_ispending():
374-
self.log.debug('Job ' + self.job_id + ' still pending')
375-
else:
376-
self.log.warning('Job ' + self.job_id + ' neither pending nor running.\n' +
377-
self.job_status)
378-
raise RuntimeError('The Jupyter batch job has disappeared'
379-
' while pending in the queue or died immediately'
380-
' after starting.')
391+
self.log.warning('Job ' + self.job_id + ' neither pending nor running.\n' +
392+
self.job_status)
393+
self.clear_state()
394+
raise RuntimeError('The Jupyter batch job has disappeared'
395+
' while pending in the queue or died immediately'
396+
' after starting.')
381397
await gen.sleep(self.startup_poll_interval)
382398

383399
self.ip = self.state_gethost()
@@ -410,8 +426,8 @@ async def stop(self, now=False):
410426
if now:
411427
return
412428
for i in range(10):
413-
await self.poll()
414-
if not self.state_isrunning():
429+
status = await self.query_job_status()
430+
if status not in (JobStatus.RUNNING, JobStatus.UNKNOWN):
415431
return
416432
await gen.sleep(1.0)
417433
if self.job_id:
@@ -467,20 +483,22 @@ class BatchSpawnerRegexStates(BatchSpawnerBase):
467483
If this variable is set, the match object will be expanded using this string
468484
to obtain the notebook IP.
469485
See Python docs: re.match.expand""").tag(config=True)
486+
state_unknown_re = Unicode('',
487+
help="Regex that matches job_status if the resource manager is not answering."
488+
"Blank indicates not used.").tag(config=True)
470489

471490
def state_ispending(self):
472491
assert self.state_pending_re, "Misconfigured: define state_running_re"
473-
if self.job_status and re.search(self.state_pending_re, self.job_status):
474-
return True
475-
else:
476-
return False
492+
return self.job_status and re.search(self.state_pending_re, self.job_status)
477493

478494
def state_isrunning(self):
479495
assert self.state_running_re, "Misconfigured: define state_running_re"
480-
if self.job_status and re.search(self.state_running_re, self.job_status):
481-
return True
482-
else:
483-
return False
496+
return self.job_status and re.search(self.state_running_re, self.job_status)
497+
498+
def state_isunknown(self):
499+
# Blank means "not set" and this function always returns None.
500+
if self.state_unknown_re:
501+
return self.job_status and re.search(self.state_unknown_re, self.job_status)
484502

485503
def state_gethost(self):
486504
assert self.state_exechost_re, "Misconfigured: define state_exechost_re"
@@ -645,6 +663,7 @@ class SlurmSpawner(UserEnvMixin,BatchSpawnerRegexStates):
645663
# RUNNING, COMPLETING = running
646664
state_pending_re = Unicode(r'^(?:PENDING|CONFIGURING)').tag(config=True)
647665
state_running_re = Unicode(r'^(?:RUNNING|COMPLETING)').tag(config=True)
666+
state_unknown_re = Unicode(r'^slurm_load_jobs error: (?:Socket timed out on send/recv|Unable to contact slurm controller)').tag(config=True)
648667
state_exechost_re = Unicode(r'\s+((?:[\w_-]+\.?)+)$').tag(config=True)
649668

650669
def parse_job_id(self, output):

batchspawner/tests/test_spawners.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import re
44
from unittest import mock
5-
from .. import BatchSpawnerRegexStates
5+
from .. import BatchSpawnerRegexStates, JobStatus
66
from traitlets import Unicode
77
import time
88
import pytest
@@ -28,6 +28,7 @@ class BatchDummy(BatchSpawnerRegexStates):
2828
state_pending_re = Unicode('PEND')
2929
state_running_re = Unicode('RUN')
3030
state_exechost_re = Unicode('RUN (.*)$')
31+
state_unknown_re = Unicode('UNKNOWN')
3132

3233
cmd_expectlist = None
3334
out_expectlist = None
@@ -76,6 +77,7 @@ def new_spawner(db, spawner_class=BatchDummy, **kwargs):
7677
spawner.mock_port = testport
7778
return spawner
7879

80+
@pytest.mark.slow
7981
def test_stress_submit(db, io_loop):
8082
for i in range(200):
8183
time.sleep(0.01)
@@ -134,15 +136,46 @@ def test_submit_failure(db, io_loop):
134136
assert spawner.job_id == ''
135137
assert spawner.job_status == ''
136138

137-
def test_pending_fails(db, io_loop):
139+
def test_submit_pending_fails(db, io_loop):
140+
"""Submission works, but the batch query command immediately fails"""
138141
spawner = new_spawner(db=db)
139142
assert spawner.get_state() == {}
140143
spawner.batch_query_cmd = 'echo xyz'
141144
with pytest.raises(RuntimeError) as e_info:
142145
io_loop.run_sync(spawner.start, timeout=30)
146+
status = io_loop.run_sync(spawner.query_job_status, timeout=30)
147+
assert status == JobStatus.NOTFOUND
143148
assert spawner.job_id == ''
144149
assert spawner.job_status == ''
145150

151+
def test_poll_fails(db, io_loop):
152+
"""Submission works, but a later .poll() fails"""
153+
spawner = new_spawner(db=db)
154+
assert spawner.get_state() == {}
155+
# The start is successful:
156+
io_loop.run_sync(spawner.start, timeout=30)
157+
spawner.batch_query_cmd = 'echo xyz'
158+
# Now, the poll fails:
159+
io_loop.run_sync(spawner.poll, timeout=30)
160+
# .poll() will run self.clear_state() if it's not found:
161+
assert spawner.job_id == ''
162+
assert spawner.job_status == ''
163+
164+
def test_unknown_status(db, io_loop):
165+
"""Polling returns an unknown status"""
166+
spawner = new_spawner(db=db)
167+
assert spawner.get_state() == {}
168+
# The start is successful:
169+
io_loop.run_sync(spawner.start, timeout=30)
170+
spawner.batch_query_cmd = 'echo UNKNOWN'
171+
# This poll should not fail:
172+
io_loop.run_sync(spawner.poll, timeout=30)
173+
status = io_loop.run_sync(spawner.query_job_status, timeout=30)
174+
assert status == JobStatus.UNKNOWN
175+
assert spawner.job_id == '12345'
176+
assert spawner.job_status != ''
177+
178+
146179
def test_templates(db, io_loop):
147180
"""Test templates in the run_command commands"""
148181
spawner = new_spawner(db=db)
@@ -394,6 +427,7 @@ def test_slurm(db, io_loop):
394427
normal_slurm_script = [
395428
(re.compile(r'sudo.*sbatch'), str(testjob)),
396429
(re.compile(r'sudo.*squeue'), 'PENDING '), # pending
430+
(re.compile(r'sudo.*squeue'), 'slurm_load_jobs error: Unable to contact slurm controller'), # unknown
397431
(re.compile(r'sudo.*squeue'), 'RUNNING '+testhost), # running
398432
(re.compile(r'sudo.*squeue'), 'RUNNING '+testhost),
399433
(re.compile(r'sudo.*scancel'), 'STOP'),

0 commit comments

Comments
 (0)