Skip to content

Commit befe99a

Browse files
committed
DaskGatewayRequirement fully implemented - Begin code cleaning and refactoring
1 parent 4911e8f commit befe99a

File tree

2 files changed

+114
-25
lines changed

2 files changed

+114
-25
lines changed

Diff for: calrissian/job.py

+10-11
Original file line numberDiff line numberDiff line change
@@ -512,8 +512,6 @@ def init_containers(self):
512512
'--max-ram',
513513
str(dask_requirement["clusterMaxMemory"])
514514
]
515-
516-
log.info(init_dask_command)
517515

518516
init_dask_cluster = {
519517
'name': self.init_container_name(),
@@ -560,15 +558,16 @@ def build(self):
560558
'resources': self.container_resources(),
561559
'volumeMounts': self.volume_mounts,
562560
'workingDir': self.container_workingdir(),
563-
}
564-
# {
565-
# 'name': 'sidecar-container',
566-
# 'image': str(self.container_image),
567-
# 'command': sidecar_command,
568-
# 'env': self.container_environment(),
569-
# 'volumeMounts': self.volume_mounts,
570-
# 'workingDir': self.container_workingdir(),
571-
# }
561+
},
562+
{
563+
'name': 'sidecar-container',
564+
'image': str(self.container_image),
565+
'command': sidecar_command,
566+
'env': self.container_environment(),
567+
'resources': self.container_resources(),
568+
'volumeMounts': self.volume_mounts,
569+
'workingDir': self.container_workingdir(),
570+
}
572571
],
573572
'restartPolicy': 'Never',
574573
'volumes': self.volumes,

Diff for: calrissian/k8s.py

+104-14
Original file line numberDiff line numberDiff line change
@@ -156,25 +156,54 @@ def follow_logs(self):
156156

157157
log.info('[{}] follow_logs end'.format(pod_name))
158158

159+
@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
160+
def follow_container_logs(self, status):
161+
pod_name = self.pod.metadata.name
162+
163+
log.info('[{}] follow_logs start'.format(pod_name))
164+
for line in self.core_api_instance.read_namespaced_pod_log(self.pod.metadata.name, self.namespace, follow=True,
165+
_preload_content=False, container=status.name).stream():
166+
# .stream() is only available if _preload_content=False
167+
# .stream() returns a generator, each iteration yields bytes.
168+
# kubernetes-client decodes them as utf-8 when _preload_content is True
169+
# https://github.com/kubernetes-client/python/blob/fcda6fe96beb21cd05522c17f7f08c5a7c0e3dc3/kubernetes/client/rest.py#L215-L216
170+
# So we do the same here
171+
if not status.state.running:
172+
break
173+
line = line.decode('utf-8', errors="ignore").rstrip()
174+
log.debug('[{}] {}'.format(pod_name, line))
175+
self.tool_log.append(self.format_log_entry(pod_name, line))
176+
177+
log.info('[{}] follow_logs end'.format(pod_name))
159178

160179
@retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log)
161180
def wait_for_completion(self) -> CompletionResult:
162181
w = watch.Watch()
163182
for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
164183
pod = event['object']
165-
status = self.get_first_or_none(pod.status.container_statuses)
166-
log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status))
167-
if status is None:
168-
continue
169-
if self.state_is_waiting(status.state):
170-
continue
171-
elif self.state_is_running(status.state):
172-
# Can only get logs once container is running
173-
self.follow_logs() # This will not return until pod completes
174-
elif self.state_is_terminated(status.state):
184+
# status = self.get_first_or_none(pod.status.container_statuses)
185+
last_status = self.get_last_or_none(pod.status.container_statuses)
186+
if last_status == None or not self.state_is_terminated(last_status.state):
187+
statuses = self.get_list_or_none(pod.status.container_statuses)
188+
if statuses == None:
189+
continue
190+
for status in statuses:
191+
log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status))
192+
if status is None:
193+
continue
194+
if self.state_is_waiting(status.state):
195+
continue
196+
elif self.state_is_running(status.state):
197+
# Can only get logs once container is running
198+
self.follow_container_logs(status) # This will not return until container completes
199+
elif self.state_is_terminated(status.state):
200+
continue
201+
else:
202+
raise CalrissianJobException('Unexpected pod container status', status)
203+
elif self.state_is_terminated(last_status.state):
175204
log.info('Handling terminated pod name {} with id {}'.format(pod.metadata.name, pod.metadata.uid))
176-
container = self.get_first_or_none(pod.spec.containers)
177-
self._handle_completion(status.state, container)
205+
container = self.get_last_or_none(pod.spec.containers)
206+
self._handle_completion(last_status.state, container)
178207
if self.should_delete_pod():
179208
with PodMonitor() as monitor:
180209
self.delete_pod_name(pod.metadata.name)
@@ -183,14 +212,49 @@ def wait_for_completion(self) -> CompletionResult:
183212
# stop watching for events, our pod is done. Causes wait loop to exit
184213
w.stop()
185214
else:
186-
raise CalrissianJobException('Unexpected pod container status', status)
215+
raise CalrissianJobException('Unexpected pod container status', last_status)
187216

188217
# When the pod is done we should have a completion result
189218
# Otherwise it will lead to further exceptions
190219
if self.completion_result is None:
191220
raise IncompleteStatusException
192-
221+
193222
return self.completion_result
223+
224+
# @retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log)
225+
# def wait_for_completion(self) -> CompletionResult:
226+
# w = watch.Watch()
227+
# for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
228+
# pod = event['object']
229+
# status = self.get_first_or_none(pod.status.container_statuses)
230+
# log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status))
231+
# if status is None:
232+
# continue
233+
# if self.state_is_waiting(status.state):
234+
# continue
235+
# elif self.state_is_running(status.state):
236+
# # Can only get logs once container is running
237+
# self.follow_logs() # This will not return until pod completes
238+
# elif self.state_is_terminated(status.state):
239+
# log.info('Handling terminated pod name {} with id {}'.format(pod.metadata.name, pod.metadata.uid))
240+
# container = self.get_first_or_none(pod.spec.containers)
241+
# self._handle_completion(status.state, container)
242+
# if self.should_delete_pod():
243+
# with PodMonitor() as monitor:
244+
# self.delete_pod_name(pod.metadata.name)
245+
# monitor.remove(pod)
246+
# self._clear_pod()
247+
# # stop watching for events, our pod is done. Causes wait loop to exit
248+
# w.stop()
249+
# else:
250+
# raise CalrissianJobException('Unexpected pod container status', status)
251+
252+
# # When the pod is done we should have a completion result
253+
# # Otherwise it will lead to further exceptions
254+
# if self.completion_result is None:
255+
# raise IncompleteStatusException
256+
257+
# return self.completion_result
194258

195259
def _set_pod(self, pod):
196260
log.info('k8s pod \'{}\' started'.format(pod.metadata.name))
@@ -216,6 +280,32 @@ def state_is_waiting(state):
216280
def state_is_terminated(state):
217281
return state.terminated
218282

283+
@staticmethod
284+
def get_list_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]:
285+
"""
286+
Check the list. Should be 0 or 1 items. If 0, there's no container yet. If 1, there's a
287+
container. If > 1, there's more than 1 container and that's unexpected behavior
288+
:param containers_or_container_statuses: list of V1ContainerStatus or V1Container
289+
:return: first item if len of list is 1, None if 0, and raises CalrissianJobException if > 1
290+
"""
291+
if not container_list: # None or empty list
292+
return None
293+
else:
294+
return list(container_list)
295+
296+
@staticmethod
297+
def get_last_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]:
298+
"""
299+
Check the list. Should be 0 or 1 items. If 0, there's no container yet. If 1, there's a
300+
container. If > 1, there's more than 1 container and that's unexpected behavior
301+
:param containers_or_container_statuses: list of V1ContainerStatus or V1Container
302+
:return: first item if len of list is 1, None if 0, and raises CalrissianJobException if > 1
303+
"""
304+
if not container_list: # None or empty list
305+
return None
306+
else:
307+
return container_list[-1]
308+
219309
@staticmethod
220310
def get_first_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]:
221311
"""

0 commit comments

Comments
 (0)