Skip to content

Commit 79a018f

Browse files
committed
Merge branch 'release_20.05' into release_20.09
2 parents 311cbe4 + 71e01da commit 79a018f

File tree

2 files changed

+56
-36
lines changed

2 files changed

+56
-36
lines changed

lib/galaxy/jobs/runners/kubernetes.py

+4-21
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Offload jobs to a Kubernetes cluster.
33
"""
44

5-
import errno
65
import logging
76
import math
87
import os
@@ -459,30 +458,14 @@ def check_watched_item(self, job_state):
459458
# there is no job responding to this job_id, it is either lost or something happened.
460459
log.error("No Jobs are available under expected selector app=%s", job_state.job_id)
461460
self.mark_as_failed(job_state)
462-
try:
463-
with open(job_state.error_file, 'w') as error_file:
464-
error_file.write("No Kubernetes Jobs are available under expected selector app=%s\n" % job_state.job_id)
465-
except OSError as e:
466-
# Python 2/3 compatible handling of FileNotFoundError
467-
if e.errno == errno.ENOENT:
468-
log.error("Job directory already cleaned up. Assuming already handled for selector app=%s", job_state.job_id)
469-
else:
470-
raise
471-
return job_state
461+
# job is no longer viable - remove from watched jobs
462+
return None
472463
else:
473464
# there is more than one job associated to the expected unique job id used as selector.
474465
log.error("More than one Kubernetes Job associated to job id '%s'", job_state.job_id)
475466
self.mark_as_failed(job_state)
476-
try:
477-
with open(job_state.error_file, 'w') as error_file:
478-
error_file.write("More than one Kubernetes Job associated with job id '%s'\n" % job_state.job_id)
479-
except OSError as e:
480-
# Python 2/3 compatible handling of FileNotFoundError
481-
if e.errno == errno.ENOENT:
482-
log.error("Job directory already cleaned up. Assuming already handled for selector app=%s", job_state.job_id)
483-
else:
484-
raise
485-
return job_state
467+
# job is no longer viable - remove from watched jobs
468+
return None
486469

487470
def _handle_job_failure(self, job, job_state):
488471
# Figure out why job has failed

test/integration/test_kubernetes_runner.py

+52-15
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,17 @@ def test_job_environment(self):
216216
job_env = self._run_and_get_environment_properties()
217217
assert job_env.some_env == '42'
218218

219+
@staticmethod
220+
def _wait_for_external_state(sa_session, job, expected):
221+
# Not checking the state here allows the change from queued to running to overwrite
222+
# the change from queued to deleted_new in the API thread - this is a problem because
223+
# the job will still run. See issue https://github.com/galaxyproject/galaxy/issues/4960.
224+
max_tries = 60
225+
while max_tries > 0 and job.job_runner_external_id is None or job.state != expected:
226+
sa_session.refresh(job)
227+
time.sleep(1)
228+
max_tries -= 1
229+
219230
@skip_without_tool('cat_data_and_sleep')
220231
def test_kill_process(self):
221232
with self.dataset_populator.test_history() as history_id:
@@ -234,22 +245,12 @@ def test_kill_process(self):
234245

235246
app = self._app
236247
sa_session = app.model.context.current
237-
external_id = None
238-
state = False
239-
240-
job = sa_session.query(app.model.Job).filter_by(tool_id="cat_data_and_sleep").one()
241-
# Not checking the state here allows the change from queued to running to overwrite
242-
# the change from queued to deleted_new in the API thread - this is a problem because
243-
# the job will still run. See issue https://github.com/galaxyproject/galaxy/issues/4960.
244-
max_tries = 60
245-
while max_tries > 0 and external_id is None or state != app.model.Job.states.RUNNING:
246-
sa_session.refresh(job)
247-
assert not job.finished
248-
external_id = job.job_runner_external_id
249-
state = job.state
250-
time.sleep(1)
251-
max_tries -= 1
248+
job = sa_session.query(app.model.Job).get(app.security.decode_id(job_dict["id"]))
249+
250+
self._wait_for_external_state(sa_session, job, app.model.Job.states.RUNNING)
251+
assert not job.finished
252252

253+
external_id = job.job_runner_external_id
253254
output = unicodify(subprocess.check_output(['kubectl', 'get', 'job', external_id, '-o', 'json']))
254255
status = json.loads(output)
255256
assert status['status']['active'] == 1
@@ -264,6 +265,42 @@ def test_kill_process(self):
264265
subprocess.check_output(['kubectl', 'get', 'job', external_id, '-o', 'json'], stderr=subprocess.STDOUT)
265266
assert "not found" in unicodify(excinfo.value.output)
266267

268+
@skip_without_tool('cat_data_and_sleep')
269+
def test_external_job_delete(self):
270+
with self.dataset_populator.test_history() as history_id:
271+
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
272+
running_inputs = {
273+
"input1": {"src": "hda", "id": hda1["id"]},
274+
"sleep_time": 240,
275+
}
276+
running_response = self.dataset_populator.run_tool(
277+
"cat_data_and_sleep",
278+
running_inputs,
279+
history_id,
280+
assert_ok=False,
281+
)
282+
job_dict = running_response.json()["jobs"][0]
283+
284+
app = self._app
285+
sa_session = app.model.context.current
286+
job = sa_session.query(app.model.Job).get(app.security.decode_id(job_dict["id"]))
287+
288+
self._wait_for_external_state(sa_session, job, app.model.Job.states.RUNNING)
289+
290+
external_id = job.job_runner_external_id
291+
output = unicodify(subprocess.check_output(['kubectl', 'get', 'job', external_id, '-o', 'json']))
292+
status = json.loads(output)
293+
assert status['status']['active'] == 1
294+
295+
output = unicodify(subprocess.check_output(['kubectl', 'delete', 'job', external_id, '-o', 'name']))
296+
assert 'job.batch/%s' % external_id in output
297+
298+
result = self.dataset_populator.wait_for_tool_run(run_response=running_response, history_id=history_id,
299+
assert_ok=False).json()
300+
details = self.dataset_populator.get_job_details(result['jobs'][0]['id'], full=True).json()
301+
302+
assert details['state'] == app.model.Job.states.ERROR, details
303+
267304
@skip_without_tool('job_properties')
268305
def test_exit_code_127(self):
269306
inputs = {

0 commit comments

Comments
 (0)