Skip to content

Commit 71e01da

Browse files
committed
Merge branch 'release_20.01' into release_20.05
2 parents 1a8e271 + 535e4c5 commit 71e01da

File tree

3 files changed

+57
-37
lines changed

3 files changed

+57
-37
lines changed

lib/galaxy/jobs/runners/kubernetes.py

+4-21
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
Offload jobs to a Kubernetes cluster.
33
"""
44

5-
import errno
65
import logging
76
import math
87
import os
@@ -465,30 +464,14 @@ def check_watched_item(self, job_state):
465464
# there is no job responding to this job_id, it is either lost or something happened.
466465
log.error("No Jobs are available under expected selector app=%s", job_state.job_id)
467466
self.mark_as_failed(job_state)
468-
try:
469-
with open(job_state.error_file, 'w') as error_file:
470-
error_file.write("No Kubernetes Jobs are available under expected selector app=%s\n" % job_state.job_id)
471-
except EnvironmentError as e:
472-
# Python 2/3 compatible handling of FileNotFoundError
473-
if e.errno == errno.ENOENT:
474-
log.error("Job directory already cleaned up. Assuming already handled for selector app=%s", job_state.job_id)
475-
else:
476-
raise
477-
return job_state
467+
# job is no longer viable - remove from watched jobs
468+
return None
478469
else:
479470
# there is more than one job associated to the expected unique job id used as selector.
480471
log.error("More than one Kubernetes Job associated to job id '%s'", job_state.job_id)
481472
self.mark_as_failed(job_state)
482-
try:
483-
with open(job_state.error_file, 'w') as error_file:
484-
error_file.write("More than one Kubernetes Job associated with job id '%s'\n" % job_state.job_id)
485-
except EnvironmentError as e:
486-
# Python 2/3 compatible handling of FileNotFoundError
487-
if e.errno == errno.ENOENT:
488-
log.error("Job directory already cleaned up. Assuming already handled for selector app=%s", job_state.job_id)
489-
else:
490-
raise
491-
return job_state
473+
# job is no longer viable - remove from watched jobs
474+
return None
492475

493476
def _handle_job_failure(self, job, job_state):
494477
# Figure out why job has failed

packages/test.sh

+1-1
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ TEST_ENV_DIR=${TEST_ENV_DIR:-$(mktemp -d -t gxpkgtestenvXXXXXX)}
1414

1515
virtualenv -p "$TEST_PYTHON" "$TEST_ENV_DIR"
1616
. "${TEST_ENV_DIR}/bin/activate"
17-
pip install pytest
17+
pip install "pytest<6.1"
1818

1919
# ensure ordered by dependency dag
2020
PACKAGE_DIRS=(

test/integration/test_kubernetes_runner.py

+52-15
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,17 @@ def test_job_environment(self):
216216
job_env = self._run_and_get_environment_properties()
217217
assert job_env.some_env == '42'
218218

219+
@staticmethod
220+
def _wait_for_external_state(sa_session, job, expected):
221+
# Not checking the state here allows the change from queued to running to overwrite
222+
# the change from queued to deleted_new in the API thread - this is a problem because
223+
# the job will still run. See issue https://github.com/galaxyproject/galaxy/issues/4960.
224+
max_tries = 60
225+
while max_tries > 0 and job.job_runner_external_id is None or job.state != expected:
226+
sa_session.refresh(job)
227+
time.sleep(1)
228+
max_tries -= 1
229+
219230
@skip_without_tool('cat_data_and_sleep')
220231
def test_kill_process(self):
221232
with self.dataset_populator.test_history() as history_id:
@@ -234,22 +245,12 @@ def test_kill_process(self):
234245

235246
app = self._app
236247
sa_session = app.model.context.current
237-
external_id = None
238-
state = False
239-
240-
job = sa_session.query(app.model.Job).filter_by(tool_id="cat_data_and_sleep").one()
241-
# Not checking the state here allows the change from queued to running to overwrite
242-
# the change from queued to deleted_new in the API thread - this is a problem because
243-
# the job will still run. See issue https://github.com/galaxyproject/galaxy/issues/4960.
244-
max_tries = 60
245-
while max_tries > 0 and external_id is None or state != app.model.Job.states.RUNNING:
246-
sa_session.refresh(job)
247-
assert not job.finished
248-
external_id = job.job_runner_external_id
249-
state = job.state
250-
time.sleep(1)
251-
max_tries -= 1
248+
job = sa_session.query(app.model.Job).get(app.security.decode_id(job_dict["id"]))
249+
250+
self._wait_for_external_state(sa_session, job, app.model.Job.states.RUNNING)
251+
assert not job.finished
252252

253+
external_id = job.job_runner_external_id
253254
output = unicodify(subprocess.check_output(['kubectl', 'get', 'job', external_id, '-o', 'json']))
254255
status = json.loads(output)
255256
assert status['status']['active'] == 1
@@ -264,6 +265,42 @@ def test_kill_process(self):
264265
subprocess.check_output(['kubectl', 'get', 'job', external_id, '-o', 'json'], stderr=subprocess.STDOUT)
265266
assert "not found" in unicodify(excinfo.value.output)
266267

268+
@skip_without_tool('cat_data_and_sleep')
269+
def test_external_job_delete(self):
270+
with self.dataset_populator.test_history() as history_id:
271+
hda1 = self.dataset_populator.new_dataset(history_id, content="1 2 3")
272+
running_inputs = {
273+
"input1": {"src": "hda", "id": hda1["id"]},
274+
"sleep_time": 240,
275+
}
276+
running_response = self.dataset_populator.run_tool(
277+
"cat_data_and_sleep",
278+
running_inputs,
279+
history_id,
280+
assert_ok=False,
281+
)
282+
job_dict = running_response.json()["jobs"][0]
283+
284+
app = self._app
285+
sa_session = app.model.context.current
286+
job = sa_session.query(app.model.Job).get(app.security.decode_id(job_dict["id"]))
287+
288+
self._wait_for_external_state(sa_session, job, app.model.Job.states.RUNNING)
289+
290+
external_id = job.job_runner_external_id
291+
output = unicodify(subprocess.check_output(['kubectl', 'get', 'job', external_id, '-o', 'json']))
292+
status = json.loads(output)
293+
assert status['status']['active'] == 1
294+
295+
output = unicodify(subprocess.check_output(['kubectl', 'delete', 'job', external_id, '-o', 'name']))
296+
assert 'job.batch/%s' % external_id in output
297+
298+
result = self.dataset_populator.wait_for_tool_run(run_response=running_response, history_id=history_id,
299+
assert_ok=False).json()
300+
details = self.dataset_populator.get_job_details(result['jobs'][0]['id'], full=True).json()
301+
302+
assert details['state'] == app.model.Job.states.ERROR, details
303+
267304
@skip_without_tool('job_properties')
268305
def test_exit_code_127(self):
269306
inputs = {

0 commit comments

Comments
 (0)