Skip to content

Commit ad1efbe

Browse files
committed
Dask Pod first implementation - init and main container OK
1 parent ceb1c27 commit ad1efbe

File tree

7 files changed

+291
-23
lines changed

7 files changed

+291
-23
lines changed

Diff for: calrissian/__about__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
version="0.18.1"
1+
version="0.19.0"

Diff for: calrissian/context.py

+1
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,5 @@ def __init__(self, kwargs=None):
2323
self.pod_serviceaccount = None
2424
self.tool_logs_basepath = None
2525
self.max_gpus = None
26+
self.gateway_url = None
2627
return super(CalrissianRuntimeContext, self).__init__(kwargs)

Diff for: calrissian/job.py

+270-20
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from typing import Dict
22
from cwltool.job import ContainerCommandLineJob, needs_shell_quoting_re
3-
43
# override cwltool.cuda.cuda_check
54
def _cuda_check(cuda_req, requestCount):
65
return 1
@@ -13,6 +12,7 @@ def _cuda_check(cuda_req, requestCount):
1312
from calrissian.k8s import KubernetesClient, CompletionResult
1413
from calrissian.report import Reporter, TimedResourceReport
1514
from cwltool.builder import Builder
15+
import sys
1616
import logging
1717
import os
1818
import yaml
@@ -118,6 +118,7 @@ class KubernetesVolumeBuilder(object):
118118
def __init__(self):
119119
self.persistent_volume_entries = {}
120120
self.emptydir_volume_names = []
121+
self.configmap_volume_names = []
121122
self.volume_mounts = []
122123
self.volumes = []
123124

@@ -153,6 +154,14 @@ def add_emptydir_volume(self, name):
153154
self.emptydir_volume_names.append(name)
154155
self.volumes.append(volume)
155156

157+
def add_configmap_volume(self, name, cm_name):
158+
volume = {
159+
'name': name,
160+
'configMap': {'name' : cm_name}
161+
}
162+
self.configmap_volume_names.append(name)
163+
self.volumes.append(volume)
164+
156165
def find_persistent_volume(self, source):
157166
"""
158167
For a given source path, return the volume entry that contains it
@@ -195,6 +204,14 @@ def add_emptydir_volume_binding(self, name, target):
195204
}
196205
self.volume_mounts.append(volume_mount)
197206

207+
def add_configmap_volume_binding(self, name, target):
208+
if not name in self.configmap_volume_names:
209+
raise VolumeBuilderException('Could not find a configMap volume named {}'.format(name))
210+
volume_mount = {
211+
'name': name,
212+
'mountPath': target
213+
}
214+
self.volume_mounts.append(volume_mount)
198215

199216
class KubernetesPodBuilder(object):
200217

@@ -384,6 +401,188 @@ def build(self):
384401
return spec
385402

386403

404+
class KubernetesDaskPodBuilder(KubernetesPodBuilder):
405+
406+
def __init__(self,
407+
name,
408+
container_image,
409+
environment,
410+
volume_mounts,
411+
volumes,
412+
command_line,
413+
stdout,
414+
stderr,
415+
stdin,
416+
resources,
417+
labels,
418+
nodeselectors,
419+
security_context,
420+
serviceaccount,
421+
gateway_url,
422+
requirements=None,
423+
hints=None):
424+
self.name = name
425+
self.container_image = container_image
426+
self.environment = environment
427+
self.volume_mounts = volume_mounts
428+
self.volumes = volumes
429+
self.command_line = command_line
430+
self.stdout = stdout
431+
self.stderr = stderr
432+
self.stdin = stdin
433+
self.resources = resources
434+
self.labels = labels
435+
self.nodeselectors = nodeselectors
436+
self.security_context = security_context
437+
self.serviceaccount = serviceaccount
438+
self.gateway_url = gateway_url
439+
self.requirements = {} if requirements is None else requirements
440+
self.hints = [] if hints is None else hints
441+
442+
443+
# INFO:calrissian.job:ARGS
444+
# ['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A']
445+
# INFO:calrissian.job:['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A']
446+
447+
# INFO:calrissian.job:COMMAND
448+
# ['/bin/sh', '-c', ['set -e \\', 'trap touch /shared/completed EXIT \\', 'export DASK_CLUSTER=`cat /shared/dask_cluster_name.txt` \\', 'python -m app'], ['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A']]
449+
# INFO:calrissian.job:['/bin/sh', '-c', ['set -e \\', 'trap touch /shared/completed EXIT \\', 'export DASK_CLUSTER=`cat /shared/dask_cluster_name.txt` \\', 'python -m app'], ['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A']]
450+
451+
def container_command(self):
452+
super_command = super().container_command()
453+
super_args = super().container_args()
454+
455+
shell_script = f"""\
456+
set -e;
457+
trap 'touch /shared/completed' EXIT;
458+
export DASK_CLUSTER=$(cat /shared/dask_cluster_name.txt);
459+
python -m app {super_args[0]}
460+
"""
461+
super_command.append(shell_script)
462+
return super_command
463+
464+
465+
466+
def container_environment(self):
467+
environment = []
468+
for name, value in sorted(self.environment.items()):
469+
environment.append({'name': name, 'value': value})
470+
471+
environment.append({'name': 'PYTHONPATH', 'value': '/app'})
472+
473+
return environment
474+
475+
def init_containers(self):
476+
containers = []
477+
# get dirname for any actual paths
478+
dirs_to_create = [os.path.dirname(p) for p in [self.stdout, self.stderr] if p]
479+
# Remove empty strings
480+
dirs_to_create = [d for d in dirs_to_create if d]
481+
# Quote if necessary
482+
dirs_to_create = quoted_arg_list(dirs_to_create)
483+
command_list = ['mkdir -p {};'.format(d) for d in dirs_to_create]
484+
if command_list:
485+
containers.append({
486+
'name': self.init_container_name(),
487+
'image': os.environ.get(INIT_IMAGE_ENV_VARIABLE, DEFAULT_INIT_IMAGE),
488+
'command': ['/bin/sh', '-c', ' '.join(command_list)],
489+
'workingDir': self.container_workingdir(),
490+
'volumeMounts': self.volume_mounts,
491+
})
492+
493+
dask_requirement = next((elem for elem in self.hints if elem['class'] == 'https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'), None)
494+
495+
init_dask_command = [
496+
'python',
497+
'/app/init-dask.py',
498+
'--target',
499+
'/shared/dask_cluster_name.txt',
500+
'--gateway-url',
501+
self.gateway_url,
502+
'--image',
503+
str(dask_requirement['dockerPull']),
504+
'--worker-cores',
505+
str(dask_requirement["workerCores"]),
506+
'--worker-memory',
507+
str(dask_requirement["workerMemory"]),
508+
'--worker-cores-limit',
509+
str(dask_requirement["workerCoresLimit"]),
510+
'--max-cores',
511+
str(dask_requirement["coresMax"]),
512+
'--max-ram',
513+
str(dask_requirement["ramMax"])
514+
]
515+
516+
log.info(init_dask_command)
517+
518+
init_dask_cluster = {
519+
'name': self.init_container_name(),
520+
'image': str(self.container_image),
521+
'env': [{'name': 'PYTHONPATH', 'value': '/app'}],
522+
'command': init_dask_command,
523+
'workingDir': self.container_workingdir(),
524+
'volumeMounts': self.volume_mounts
525+
}
526+
527+
containers.append(init_dask_cluster)
528+
529+
return containers
530+
531+
532+
def build(self):
533+
534+
sidecar_command = [
535+
'python',
536+
'/app/dispose-dask.py',
537+
'--source',
538+
'/shared/dask_cluster_name.txt',
539+
'--gateway-url',
540+
self.gateway_url,
541+
'--signal',
542+
'/shared/completed'
543+
]
544+
545+
spec = {
546+
'metadata': {
547+
'name': self.pod_name(),
548+
'labels': self.pod_labels(),
549+
},
550+
'apiVersion': 'v1',
551+
'kind':'Pod',
552+
'spec': {
553+
'initContainers': self.init_containers(),
554+
'containers': [
555+
{
556+
'name': 'main-container',
557+
'image': str(self.container_image),
558+
'command': self.container_command(),
559+
'env': self.container_environment(),
560+
'resources': self.container_resources(),
561+
'volumeMounts': self.volume_mounts,
562+
'workingDir': self.container_workingdir(),
563+
}
564+
# {
565+
# 'name': 'sidecar-container',
566+
# 'image': str(self.container_image),
567+
# 'command': sidecar_command,
568+
# 'env': self.container_environment(),
569+
# 'volumeMounts': self.volume_mounts,
570+
# 'workingDir': self.container_workingdir(),
571+
# }
572+
],
573+
'restartPolicy': 'Never',
574+
'volumes': self.volumes,
575+
'securityContext': self.security_context,
576+
'nodeSelector': self.pod_nodeselectors()
577+
}
578+
}
579+
580+
if ( self.serviceaccount ):
581+
spec['spec']['serviceAccountName'] = self.serviceaccount
582+
583+
return spec
584+
585+
387586
# This now subclasses ContainerCommandLineJob, but only uses two of its methods:
388587
# create_file_and_add_volume and add_volumes
389588
class CalrissianCommandLineJob(ContainerCommandLineJob):
@@ -507,6 +706,14 @@ def _get_container_image(self):
507706
raise CalrissianCommandLineJobException('Unable to create Job - Please ensure tool has a DockerRequirement with dockerPull or specify a default_container')
508707
return container_image
509708

709+
def _get_worker_image(self):
710+
docker_requirement, _ = self.get_requirement('https://calrissian-cwl.github.io/schema#DaskGatewayRequirement')
711+
if docker_requirement:
712+
container_image = docker_requirement['dockerPull']
713+
if not container_image:
714+
raise CalrissianCommandLineJobException('Unable to create Job - Please ensure tool has a DaskGatewayRequirement with dockerPull')
715+
return container_image
716+
510717
def quoted_command_line(self):
511718
return quoted_arg_list(self.command_line)
512719

@@ -541,6 +748,9 @@ def get_pod_env_vars(self, runtimeContext):
541748
else:
542749
return {}
543750

751+
def get_dask_gateway_url(self, runtimeContext):
752+
return runtimeContext.gateway_url
753+
544754
def create_kubernetes_runtime(self, runtimeContext):
545755
# In cwltool, the runtime list starts as something like ['docker','run'] and these various builder methods
546756
# append to that list with docker (or singularity) options like volume mount paths
@@ -573,25 +783,60 @@ def create_kubernetes_runtime(self, runtimeContext):
573783
tmpdir_prefix=runtimeContext.tmpdir_prefix,
574784
secret_store=runtimeContext.secret_store,
575785
any_path_okay=any_path_okay)
786+
787+
if self.get_requirement('https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'):
576788

577-
k8s_builder = KubernetesPodBuilder(
578-
self.name,
579-
self._get_container_image(),
580-
self.environment,
581-
self.volume_builder.volume_mounts,
582-
self.volume_builder.volumes,
583-
self.quoted_command_line(),
584-
self.stdout,
585-
self.stderr,
586-
self.stdin,
587-
self.builder.resources,
588-
self.get_pod_labels(runtimeContext),
589-
self.get_pod_nodeselectors(runtimeContext),
590-
self.get_security_context(runtimeContext),
591-
self.get_pod_serviceaccount(runtimeContext),
592-
self.builder.requirements,
593-
self.builder.hints
594-
)
789+
dask_default_conf = '/etc/dask'
790+
shared_data = '/shared'
791+
792+
# self.client.create_dask_gateway_cofig_map(gateway_url=self.get_dask_gateway_url(runtimeContext))
793+
794+
self._add_configmap_volume_and_binding(
795+
name='dask-gateway-cm',
796+
cm_name='dask-gateway-cm',
797+
target=dask_default_conf)
798+
799+
self._add_emptydir_volume_and_binding('shared-data', shared_data)
800+
801+
k8s_builder = KubernetesDaskPodBuilder(
802+
self.name,
803+
self._get_worker_image(),
804+
self.environment,
805+
self.volume_builder.volume_mounts,
806+
self.volume_builder.volumes,
807+
self.quoted_command_line(),
808+
self.stdout,
809+
self.stderr,
810+
self.stdin,
811+
self.builder.resources,
812+
self.get_pod_labels(runtimeContext),
813+
self.get_pod_nodeselectors(runtimeContext),
814+
self.get_security_context(runtimeContext),
815+
self.get_pod_serviceaccount(runtimeContext),
816+
self.get_dask_gateway_url(runtimeContext),
817+
requirements=self.builder.requirements,
818+
hints=self.builder.hints,
819+
)
820+
821+
else:
822+
k8s_builder = KubernetesPodBuilder(
823+
self.name,
824+
self._get_container_image(),
825+
self.environment,
826+
self.volume_builder.volume_mounts,
827+
self.volume_builder.volumes,
828+
self.quoted_command_line(),
829+
self.stdout,
830+
self.stderr,
831+
self.stdin,
832+
self.builder.resources,
833+
self.get_pod_labels(runtimeContext),
834+
self.get_pod_nodeselectors(runtimeContext),
835+
self.get_security_context(runtimeContext),
836+
self.get_pod_serviceaccount(runtimeContext),
837+
self.builder.requirements,
838+
self.builder.hints
839+
)
595840
built = k8s_builder.build()
596841
log.debug('{}\n{}{}\n'.format('-' * 80, yaml.dump(built), '-' * 80))
597842
# Report an error if anything was added to the runtime list
@@ -609,6 +854,10 @@ def _add_emptydir_volume_and_binding(self, name, target):
609854
def _add_volume_binding(self, source, target, writable=False):
610855
self.volume_builder.add_volume_binding(source, target, writable)
611856

857+
def _add_configmap_volume_and_binding(self, name, cm_name, target):
858+
self.volume_builder.add_configmap_volume(name, cm_name)
859+
self.volume_builder.add_configmap_volume_binding(cm_name, target)
860+
612861
# Below are concrete implementations of methods called by add_volumes
613862
# They are based on https://github.com/common-workflow-language/cwltool/blob/1.0.20181201184214/cwltool/docker.py
614863
# But the key difference is that docker is invoked via command-line, so the ones in docker.py append to
@@ -695,7 +944,8 @@ def _required_env(self) -> Dict[str, str]:
695944
def run(self, runtimeContext, tmpdir_lock=None):
696945

697946
def get_pod_command(pod):
698-
return pod['spec']['containers'][0]['args']
947+
if 'args' in pod['spec']['containers'][0].keys():
948+
return pod['spec']['containers'][0]['args']
699949

700950
def get_pod_name(pod):
701951
return pod['spec']['containers'][0]['name']

Diff for: calrissian/k8s.py

+10
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,16 @@ def get_current_pod(self):
272272
raise CalrissianJobException("Missing required environment variable ${}".format(POD_NAME_ENV_VARIABLE))
273273
return self.get_pod_for_name(pod_name)
274274

275+
@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
276+
def create_dask_gateway_cofig_map(self, gateway_url: str):
277+
configmap = client.V1ConfigMap(
278+
metadata=client.V1ObjectMeta(name="dask-gateway-cm"),
279+
data={
280+
"gateway.yaml": f"|\ngateway:\n\taddress:{gateway_url}"
281+
}
282+
)
283+
284+
self.core_api_instance.create_namespaced_config_map(namespace=self.namespace, body=configmap)
275285

276286
class PodMonitor(object):
277287
"""

0 commit comments

Comments
 (0)