Skip to content

Commit 4911e8f

Browse files
committed
Custom schema implementation - add schema file in temporary location
1 parent ad1efbe commit 4911e8f

File tree

3 files changed

+76
-17
lines changed

3 files changed

+76
-17
lines changed

Diff for: calrissian/custom_schema/schema.yaml

+48
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
$base: https://calrissian-cwl.github.io/schema#
2+
$namespaces:
3+
cwl: "https://w3id.org/cwl/cwl#"
4+
$graph:
5+
- $import: https://w3id.org/cwl/CommonWorkflowLanguage.yml
6+
7+
- name: DaskGatewayRequirement
8+
type: record
9+
extends: cwl:ProcessRequirement
10+
inVocab: false
11+
doc: "Indicates that a process requires a Dask cluster procured via [Dask Gateway](https://gateway.dask.org/) runtime."
12+
fields:
13+
class:
14+
type: 'string'
15+
doc: "Always 'DaskGatewayRequirement'"
16+
jsonldPredicate:
17+
"_id": "@type"
18+
"_type": "@vocab"
19+
workerCores:
20+
type:
21+
- 'int'
22+
- 'cwl:Expression'
23+
doc: |
24+
Number of cpu-cores available for a dask worker.
25+
workerCoresLimit:
26+
type:
27+
- 'int'
28+
- 'cwl:Expression'
29+
doc: |
30+
Maximum number of cpu-cores available for a dask worker.
31+
workerMemory:
32+
type:
33+
- 'string'
34+
- 'cwl:Expression'
35+
doc: |
36+
Maximum number of bytes available for a dask worker.
37+
clustermaxCore:
38+
type:
39+
- 'int'
40+
- 'cwl:Expression'
41+
doc: |
42+
Maximum number of cores to configure the cluster
43+
clusterMaxMemory:
44+
type:
45+
- 'string'
46+
- 'cwl:Expression'
47+
doc: |
48+
Maximum amount of memory to configure the cluster

Diff for: calrissian/job.py

+5-15
Original file line numberDiff line numberDiff line change
@@ -490,7 +490,7 @@ def init_containers(self):
490490
'volumeMounts': self.volume_mounts,
491491
})
492492

493-
dask_requirement = next((elem for elem in self.hints if elem['class'] == 'https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'), None)
493+
dask_requirement = next((elem for elem in self.requirements if elem['class'] == 'https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'), None)
494494

495495
init_dask_command = [
496496
'python',
@@ -500,17 +500,17 @@ def init_containers(self):
500500
'--gateway-url',
501501
self.gateway_url,
502502
'--image',
503-
str(dask_requirement['dockerPull']),
503+
str(self.container_image),
504504
'--worker-cores',
505505
str(dask_requirement["workerCores"]),
506506
'--worker-memory',
507507
str(dask_requirement["workerMemory"]),
508508
'--worker-cores-limit',
509509
str(dask_requirement["workerCoresLimit"]),
510510
'--max-cores',
511-
str(dask_requirement["coresMax"]),
511+
str(dask_requirement["clustermaxCore"]),
512512
'--max-ram',
513-
str(dask_requirement["ramMax"])
513+
str(dask_requirement["clusterMaxMemory"])
514514
]
515515

516516
log.info(init_dask_command)
@@ -693,8 +693,6 @@ def check_requirements(self, runtimeContext):
693693
if not field in self.supported_features[feature]:
694694
raise UnsupportedRequirement('Error: feature {}.{} is not supported'.format(feature, field))
695695

696-
697-
698696
def _get_container_image(self):
699697
docker_requirement, _ = self.get_requirement('DockerRequirement')
700698
if docker_requirement:
@@ -706,14 +704,6 @@ def _get_container_image(self):
706704
raise CalrissianCommandLineJobException('Unable to create Job - Please ensure tool has a DockerRequirement with dockerPull or specify a default_container')
707705
return container_image
708706

709-
def _get_worker_image(self):
710-
docker_requirement, _ = self.get_requirement('https://calrissian-cwl.github.io/schema#DaskGatewayRequirement')
711-
if docker_requirement:
712-
container_image = docker_requirement['dockerPull']
713-
if not container_image:
714-
raise CalrissianCommandLineJobException('Unable to create Job - Please ensure tool has a DaskGatewayRequirement with dockerPull')
715-
return container_image
716-
717707
def quoted_command_line(self):
718708
return quoted_arg_list(self.command_line)
719709

@@ -800,7 +790,7 @@ def create_kubernetes_runtime(self, runtimeContext):
800790

801791
k8s_builder = KubernetesDaskPodBuilder(
802792
self.name,
803-
self._get_worker_image(),
793+
self._get_container_image(),
804794
self.environment,
805795
self.volume_builder.volume_mounts,
806796
self.volume_builder.volumes,

Diff for: calrissian/main.py

+23-2
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
import contextlib
2+
import re
3+
4+
import cwltool
5+
from cwltool.process import use_custom_schema, get_schema
26
from calrissian.executor import ThreadPoolJobExecutor
37
from calrissian.context import CalrissianLoadingContext, CalrissianRuntimeContext
48
from calrissian.version import version
@@ -124,14 +128,30 @@ def flush_tees():
124128
sys.stderr.flush()
125129

126130

131+
def add_custom_schema():
132+
cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE = re.compile(r".*")
133+
cwltool.command_line_tool.ACCEPTLIST_RE = cwltool.command_line_tool.ACCEPTLIST_EN_RELAXED_RE
134+
supported_versions = ["v1.0", "v1.1", "v1.2"]
135+
136+
with open(os.path.join(os.path.dirname(__file__), "custom_schema/schema.yaml")) as f:
137+
schema_content = f.read()
138+
139+
for s in supported_versions:
140+
use_custom_schema(s, "https://calrissian-cwl.github.io/schema", schema_content)
141+
get_schema(s)
142+
143+
cwltool.process.supportedProcessRequirements.extend([
144+
"https://calrissian-cwl.github.io/schema#DaskGatewayRequirement"
145+
])
146+
147+
127148
def main():
128149
parser = arg_parser()
129150
add_arguments(parser)
130151
parsed_args = parse_arguments(parser)
131152
level = get_log_level(parsed_args)
132153
activate_logging(level)
133154
install_tees(parsed_args.stdout, parsed_args.stderr)
134-
log.info("CALRISSIAN LOG")
135155
max_ram_megabytes = MemoryParser.parse_to_megabytes(parsed_args.max_ram)
136156
max_cores = CPUParser.parse(parsed_args.max_cores)
137157
max_gpus = int(parsed_args.max_gpus) if parsed_args.max_gpus else 0
@@ -146,7 +166,8 @@ def main():
146166
loadingContext=CalrissianLoadingContext(),
147167
runtimeContext=runtime_context,
148168
versionfunc=version,
149-
)
169+
custom_schema_callback=(add_custom_schema if parsed_args.gateway_url else None)
170+
)
150171
finally:
151172
# Always clean up after cwlmain
152173
delete_pods()

0 commit comments

Comments
 (0)