Skip to content

Commit 8dbc724

Browse files
committed
Pod deleted on init-container failure - Unittest for Dask (not complete)
1 parent 6e66163 commit 8dbc724

File tree

4 files changed

+437
-15
lines changed

4 files changed

+437
-15
lines changed

Diff for: calrissian/dask.py

+15-4
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,7 @@ def container_environment(self):
164164

165165
return environment
166166

167+
167168
def init_containers(self):
168169
containers = []
169170
# get dirname for any actual paths
@@ -252,7 +253,7 @@ class CalrissianCommandLineDaskJob(CalrissianCommandLineJob):
252253
daskGateway_controller_cm_name = 'dask-cluster-controller-cm'
253254

254255
def __init__(self, *args, **kwargs):
255-
super(CalrissianCommandLineJob, self).__init__(*args, **kwargs)
256+
# super(CalrissianCommandLineJob, self).__init__(*args, **kwargs)
256257
super(CalrissianCommandLineDaskJob, self).__init__(*args, **kwargs)
257258
self.client = KubernetesDaskClient()
258259

@@ -360,8 +361,7 @@ def create_kubernetes_runtime(self, runtimeContext):
360361

361362
def run(self, runtimeContext, tmpdir_lock=None):
362363
def get_pod_command(pod):
363-
if 'args' in pod['spec']['containers'][0].keys():
364-
return pod['spec']['containers'][0]['args']
364+
return pod['spec']['containers'][0]['args']
365365

366366
def get_pod_name(pod):
367367
return pod['spec']['containers'][0]['name']
@@ -429,7 +429,18 @@ def wait_for_completion(self, cm_name: str) -> CompletionResult:
429429
w = watch.Watch()
430430
for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
431431
pod = event['object']
432-
# status = self.get_first_or_none(pod.status.container_statuses)
432+
433+
init_status = self.get_list_or_none(pod.status.init_container_statuses)
434+
if init_status and self.state_is_terminated(init_status[0].state):
435+
init_status_code = init_status[0].state.terminated.exit_code
436+
if init_status_code is not None and init_status_code != 0:
437+
with DaskPodMonitor() as monitor:
438+
self.delete_pod_name(pod.metadata.name)
439+
self.delete_configmap_name(cm_name=cm_name)
440+
monitor.remove(pod)
441+
self._clear_pod()
442+
w.stop()
443+
433444
last_status = self.get_last_or_none(pod.status.container_statuses)
434445
if last_status == None or not self.state_is_terminated(last_status.state):
435446
statuses = self.get_list_or_none(pod.status.container_statuses)

0 commit comments

Comments
 (0)