-
-
Notifications
You must be signed in to change notification settings - Fork 231
/
Copy pathtest_reqs_hints.py
58 lines (43 loc) · 1.65 KB
/
test_reqs_hints.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
"""Test for Requirements and Hints in cwltool."""
import json
from io import StringIO
from cwltool.main import main
from .util import get_data
def test_workflow_reqs_are_evaluated_earlier_default_args() -> None:
"""Test that a Workflow process will evaluate the requirements earlier.
Uses the default input values.
This means that workflow steps, such as Expression and Command Line Tools
can both use resources without re-evaluating expressions. This is useful
when you have an expression that, for instance, dynamically decides
how many threads/cpus to use.
Issue: https://github.com/common-workflow-language/cwltool/issues/1330
"""
stream = StringIO()
assert (
main(
[get_data("tests/wf/1330.cwl")],
stdout=stream,
)
== 0
)
out = json.loads(stream.getvalue())
assert out["out"] == "2\n"
def test_workflow_reqs_are_evaluated_earlier_provided_inputs() -> None:
"""Test that a Workflow process will evaluate the requirements earlier.
Passes inputs via a job file.
This means that workflow steps, such as Expression and Command Line Tools
can both use resources without re-evaluating expressions. This is useful
when you have an expression that, for instance, dynamically decides
how many threads/cpus to use.
Issue: https://github.com/common-workflow-language/cwltool/issues/1330
"""
stream = StringIO()
assert (
main(
[get_data("tests/wf/1330.cwl"), get_data("tests/wf/1330.json")],
stdout=stream,
)
== 0
)
out = json.loads(stream.getvalue())
assert out["out"] == "1\n"