1
+ """Wrap a CWL document as a callable Python object."""
2
+
3
+ import argparse
4
+ import functools
1
5
import os
6
+ import sys
2
7
from typing import Any , Optional , Union
3
8
4
9
from . import load_tool
5
- from .context import LoadingContext , RuntimeContext
10
+ from .argparser import arg_parser
11
+ from .context import LoadingContext , RuntimeContext , getdefault
6
12
from .errors import WorkflowException
7
13
from .executors import JobExecutor , SingleJobExecutor
14
+ from .main import find_default_container
8
15
from .process import Process
9
- from .utils import CWLObjectType
16
+ from .resolver import tool_resolver
17
+ from .secrets import SecretStore
18
+ from .utils import DEFAULT_TMP_PREFIX , CWLObjectType
10
19
11
20
12
21
class WorkflowStatus (Exception ):
@@ -25,11 +34,15 @@ def __init__(self, t: Process, factory: "Factory") -> None:
25
34
self .t = t
26
35
self .factory = factory
27
36
28
- def __call__ (self , ** kwargs ):
29
- # type: (**Any) -> Union[str, Optional[CWLObjectType]]
30
- runtime_context = self .factory .runtime_context .copy ()
31
- runtime_context .basedir = os .getcwd ()
32
- out , status = self .factory .executor (self .t , kwargs , runtime_context )
37
+ def __call__ (self , ** kwargs : Any ) -> Union [str , Optional [CWLObjectType ]]:
38
+ """
39
+ Execute the process.
40
+
41
+ :raise WorkflowStatus: If the result is not a success.
42
+ """
43
+ if not self .factory .runtime_context .basedir :
44
+ self .factory .runtime_context .basedir = os .getcwd ()
45
+ out , status = self .factory .executor (self .t , kwargs , self .factory .runtime_context )
33
46
if status != "success" :
34
47
raise WorkflowStatus (out , status )
35
48
else :
@@ -47,18 +60,24 @@ def __init__(
47
60
executor : Optional [JobExecutor ] = None ,
48
61
loading_context : Optional [LoadingContext ] = None ,
49
62
runtime_context : Optional [RuntimeContext ] = None ,
63
+ argsl : Optional [list [str ]] = None ,
64
+ args : Optional [argparse .Namespace ] = None ,
50
65
) -> None :
66
+ """Create a CWL Process factory from a CWL document."""
67
+ if argsl is not None :
68
+ args = arg_parser ().parse_args (argsl )
51
69
if executor is None :
52
- executor = SingleJobExecutor ()
53
- self .executor = executor
70
+ self .executor : JobExecutor = SingleJobExecutor ()
71
+ else :
72
+ self .executor = executor
54
73
if runtime_context is None :
55
- self .runtime_context = RuntimeContext ()
74
+ self .runtime_context = RuntimeContext (vars (args ) if args else {})
75
+ self ._fix_runtime_context ()
56
76
else :
57
77
self .runtime_context = runtime_context
58
78
if loading_context is None :
59
- self .loading_context = LoadingContext ()
60
- self .loading_context .singularity = self .runtime_context .singularity
61
- self .loading_context .podman = self .runtime_context .podman
79
+ self .loading_context = LoadingContext (vars (args ) if args else {})
80
+ self ._fix_loading_context (self .runtime_context )
62
81
else :
63
82
self .loading_context = loading_context
64
83
@@ -68,3 +87,45 @@ def make(self, cwl: Union[str, dict[str, Any]]) -> Callable:
68
87
if isinstance (load , int ):
69
88
raise WorkflowException ("Error loading tool" )
70
89
return Callable (load , self )
90
+
91
+ def _fix_loading_context (self , runtime_context : RuntimeContext ) -> None :
92
+ self .loading_context .resolver = getdefault (self .loading_context .resolver , tool_resolver )
93
+ self .loading_context .singularity = runtime_context .singularity
94
+ self .loading_context .podman = runtime_context .podman
95
+
96
+ def _fix_runtime_context (self ) -> None :
97
+ self .runtime_context .basedir = os .getcwd ()
98
+ self .runtime_context .find_default_container = functools .partial (
99
+ find_default_container , default_container = None , use_biocontainers = None
100
+ )
101
+
102
+ if sys .platform == "darwin" :
103
+ default_mac_path = "/private/tmp/docker_tmp"
104
+ if self .runtime_context .tmp_outdir_prefix == DEFAULT_TMP_PREFIX :
105
+ self .runtime_context .tmp_outdir_prefix = default_mac_path
106
+
107
+ for dirprefix in ("tmpdir_prefix" , "tmp_outdir_prefix" , "cachedir" ):
108
+ if (
109
+ getattr (self .runtime_context , dirprefix )
110
+ and getattr (self .runtime_context , dirprefix ) != DEFAULT_TMP_PREFIX
111
+ ):
112
+ sl = (
113
+ "/"
114
+ if getattr (self .runtime_context , dirprefix ).endswith ("/" )
115
+ or dirprefix == "cachedir"
116
+ else ""
117
+ )
118
+ setattr (
119
+ self .runtime_context ,
120
+ dirprefix ,
121
+ os .path .abspath (getattr (self .runtime_context , dirprefix )) + sl ,
122
+ )
123
+ if not os .path .exists (os .path .dirname (getattr (self .runtime_context , dirprefix ))):
124
+ try :
125
+ os .makedirs (os .path .dirname (getattr (self .runtime_context , dirprefix )))
126
+ except Exception as e :
127
+ print ("Failed to create directory: %s" , e )
128
+
129
+ self .runtime_context .secret_store = getdefault (
130
+ self .runtime_context .secret_store , SecretStore ()
131
+ )
0 commit comments