From eb455f9ab472245605656683ae0769e8bff2ae2e Mon Sep 17 00:00:00 2001 From: "Bruno P. Kinoshita" Date: Thu, 3 Feb 2022 00:53:16 +1300 Subject: [PATCH] Move functions to its own module, load them dynamically in expr apply. --- wdl2cwl/errors.py | 7 +- wdl2cwl/expr.py | 259 +++++++++++++++ wdl2cwl/functions.py | 270 ++++++++++++++++ wdl2cwl/main.py | 480 ++-------------------------- wdl2cwl/tests/__init__.py | 1 + wdl2cwl/tests/test_cwl.py | 3 +- wdl2cwl/tests/test_wdlsourceline.py | 3 +- wdl2cwl/util.py | 64 ++++ 8 files changed, 627 insertions(+), 460 deletions(-) create mode 100644 wdl2cwl/expr.py create mode 100644 wdl2cwl/functions.py create mode 100644 wdl2cwl/util.py diff --git a/wdl2cwl/errors.py b/wdl2cwl/errors.py index ff43961b..bda90435 100644 --- a/wdl2cwl/errors.py +++ b/wdl2cwl/errors.py @@ -5,7 +5,6 @@ from typing import Any, Callable, Optional, Type, cast import WDL -from WDL._error_util import SourcePosition # Inspired by https://github.com/common-workflow-language/schema_salad/blob/661fb0fa8c745ed70253dda93bd12002007f6b33/schema_salad/sourceline.py#L232 @@ -46,7 +45,7 @@ def __exit__( def makeLead(self) -> str: """Caculate the error message prefix.""" - pos: SourcePosition = cast(SourcePosition, self.item.pos) + pos: WDL.SourcePosition = cast(WDL.SourcePosition, self.item.pos) return f"{pos.uri}:{pos.line}:{pos.column}:" def makeError(self, msg: str) -> Any: @@ -69,3 +68,7 @@ def makeError(self, msg: str) -> Any: else: errs.append(f"{lead} {m}") return self.raise_type("\n".join(errs)) + + +class ConversionException(Exception): + """Error during conversion.""" diff --git a/wdl2cwl/expr.py b/wdl2cwl/expr.py new file mode 100644 index 00000000..e9bb37f6 --- /dev/null +++ b/wdl2cwl/expr.py @@ -0,0 +1,259 @@ +"""WDL Expressions (literal values, arithmetic, comparison, conditional, string interpolation, array, map, and functions).""" + +from typing import Union, Any, cast + +import WDL +from wdl2cwl.errors import WDLSourceLine, ConversionException +from wdl2cwl.util import get_input, ConversionContext + + +def get_literal_name( + expr: Union[WDL.Expr.Boolean, WDL.Expr.Int, WDL.Expr.Float, WDL.Expr.Array] +) -> str: + """Translate WDL Boolean, Int, Float, or Array Expression.""" + # if the literal expr is used inside WDL.Expr.Apply + # the literal value is what's needed + parent = expr.parent # type: ignore[union-attr] + if isinstance(parent, (WDL.Expr.Apply, WDL.Expr.IfThenElse)): + return expr.literal.value # type: ignore + raise WDLSourceLine(expr, ConversionException).makeError( + f"The parent expression for {expr} is not WDL.Expr.Apply, but {parent}." + ) + + +def get_expr_ifthenelse( + wdl_ifthenelse: WDL.Expr.IfThenElse, ctx: ConversionContext +) -> str: + """Translate WDL IfThenElse Expressions.""" + condition = get_expr(wdl_ifthenelse.condition, ctx) + if_true = get_expr(wdl_ifthenelse.consequent, ctx) + if_false = get_expr(wdl_ifthenelse.alternative, ctx) + return f"{condition} ? {if_true} : {if_false}" + + +def translate_wdl_placeholder( + wdl_placeholder: WDL.Expr.Placeholder, ctx: ConversionContext +) -> str: + """Translate WDL Expr Placeholder to a valid CWL command string.""" + cwl_command_str = "" + expr = wdl_placeholder.expr + if expr is None: + raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( + f"Placeholder '{wdl_placeholder}' has no expr." + ) + placeholder_expr = get_expr(expr, ctx) + options = wdl_placeholder.options + if options: + if "true" in options: + true_value = options["true"] + false_value = options["false"] + true_str = f'"{true_value}"' if '"' not in true_value else f"'{true_value}'" + false_str = ( + f'"{false_value}"' if '"' not in false_value else f"'{false_value}'" + ) + is_optional = False + if isinstance(expr, WDL.Expr.Get): + is_optional = expr.type.optional + elif isinstance(expr, WDL.Expr.Apply): + is_optional = expr.arguments[0].type.optional + if not is_optional: + cwl_command_str = f"$({placeholder_expr} ? {true_str} : {false_str})" + else: + cwl_command_str = ( + f"$({placeholder_expr} === null ? {false_str} : {true_str})" + ) + elif "sep" in options: + seperator = options["sep"] + if isinstance(expr.type, WDL.Type.Array): + item_type: WDL.Expr.Base = expr.type.item_type # type: ignore + if isinstance(item_type, WDL.Type.String): + cwl_command_str = f'$({placeholder_expr}.join("{seperator}"))' + elif isinstance(item_type, WDL.Type.File): + cwl_command_str = ( + f"$({placeholder_expr}.map(" + + 'function(el) {return el.path}).join("' + + seperator + + '"))' + ) + else: + raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( + f"{wdl_placeholder} with separator and item type {item_type} is not yet handled" + ) + else: + raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( + f"{wdl_placeholder} with expr of type {expr.type} is not yet handled" + ) + else: + raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( + f"Placeholders with options {options} are not yet handled." + ) + else: + # for the one case where the $(input.some_input_name) is used within the placeholder_expr + # we return the placeholder_expr without enclosing in another $() + cwl_command_str = ( + f"$({placeholder_expr})" + if placeholder_expr[-1] != ")" + else placeholder_expr + ) + # sometimes placeholders are used inside WDL.Expr.String. + # with the parent and grand_parent we can confirm that we are in + # the command string (WDL.Expr.String) and task (WDL.Tree.Task) respectively + parent = wdl_placeholder.parent # type: ignore + grand_parent = parent.parent + return ( + cwl_command_str + if isinstance(parent, WDL.Expr.String) + and isinstance(grand_parent, WDL.Tree.Task) + else cwl_command_str[2:-1] + ) + + +def get_expr_string(wdl_expr_string: WDL.Expr.String, ctx: ConversionContext) -> str: + """Translate WDL String Expressions.""" + if wdl_expr_string.literal is not None: + return f'"{wdl_expr_string.literal.value}"' + string = "" + parts = wdl_expr_string.parts + for index, part in enumerate(parts[1:-1], start=1): + if isinstance( + part, + (WDL.Expr.Placeholder, WDL.Expr.Apply, WDL.Expr.Get, WDL.Expr.Ident), + ): + placeholder = get_expr(part, ctx) + part = ( + "" if parts[index - 1] == '"' or parts[index - 1] == "'" else "' + " # type: ignore + ) + part += placeholder + part += ( + "" if parts[index + 1] == '"' or parts[index + 1] == "'" else " + '" # type: ignore + ) + string += part + # condition to determine if the opening and closing quotes should be added to string + # for cases where a placeholder begins or ends a WDL.Expr.String + if type(parts[1]) == str: + string = "'" + string + if type(parts[-2]) == str: + string = string + "'" + return string + + +def get_expr_name(wdl_expr: WDL.Expr.Ident) -> str: + """Extract name from WDL expr.""" + if not hasattr(wdl_expr, "name"): + raise WDLSourceLine(wdl_expr, ConversionException).makeError( + f"{type(wdl_expr)} has not attribute 'name'" + ) + return get_input(wdl_expr.name) + + +def get_expr(wdl_expr: Any, ctx: ConversionContext) -> str: + """Translate WDL Expressions.""" + if isinstance(wdl_expr, WDL.Expr.Apply): + return get_expr_apply(wdl_expr, ctx) + elif isinstance(wdl_expr, WDL.Expr.Get): + return get_expr_get(wdl_expr, ctx) + elif isinstance(wdl_expr, WDL.Expr.IfThenElse): + return get_expr_ifthenelse(wdl_expr, ctx) + elif isinstance(wdl_expr, WDL.Expr.Placeholder): + return translate_wdl_placeholder(wdl_expr, ctx) + elif isinstance(wdl_expr, WDL.Expr.String): + return get_expr_string(wdl_expr, ctx) + elif isinstance(wdl_expr, WDL.Tree.Decl): + return get_expr(wdl_expr.expr, ctx) + elif isinstance( + wdl_expr, + ( + WDL.Expr.Boolean, + WDL.Expr.Int, + WDL.Expr.Float, + WDL.Expr.Array, + ), + ): + return get_literal_name(wdl_expr) + else: + raise WDLSourceLine(wdl_expr, ConversionException).makeError( + f"The expression '{wdl_expr}' is not handled yet." + ) + + +def get_expr_apply(wdl_apply_expr: WDL.Expr.Apply, ctx: ConversionContext) -> str: + """Translate WDL Apply Expressions.""" + # N.B: This import here avoids circular dependency error when loading the modules. + from wdl2cwl import functions + + function_name = wdl_apply_expr.function_name + arguments = wdl_apply_expr.arguments + if not arguments: + raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError( + f"The '{wdl_apply_expr}' expression has no arguments." + ) + treat_as_optional = wdl_apply_expr.type.optional + + # Call the function if we have it in our wdl2cwl.functions module + if hasattr(functions, function_name): + kwargs = { + "treat_as_optional": treat_as_optional, + "wdl_apply_expr": wdl_apply_expr, + } + return cast( + str, + getattr(functions, function_name)(arguments, ctx, **kwargs), + ) + + raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError( + f"Function name '{function_name}' not yet handled." + ) + + +def get_expr_get(wdl_get_expr: WDL.Expr.Get, ctx: ConversionContext) -> str: + """Translate WDL Get Expressions.""" + member = wdl_get_expr.member + if not member: + return get_expr_ident(wdl_get_expr.expr, ctx) # type: ignore + struct_name = get_expr(wdl_get_expr.expr, ctx) + member_str = f"{struct_name}.{member}" + return ( + member_str + if not isinstance(wdl_get_expr.type, WDL.Type.File) + else f"{member_str}.path" + ) + + +def get_expr_ident(wdl_ident_expr: WDL.Expr.Ident, ctx: ConversionContext) -> str: + """Translate WDL Ident Expressions.""" + id_name = wdl_ident_expr.name + ident_name = get_input(id_name) + referee: Any = wdl_ident_expr.referee + optional = wdl_ident_expr.type.optional + if referee: + with WDLSourceLine(referee, ConversionException): + if isinstance(referee, WDL.Tree.Call): + return id_name + if referee.expr and ( + wdl_ident_expr.name in ctx.optional_cwl_null + or wdl_ident_expr.name not in ctx.non_static_values + ): + return get_expr(referee.expr, ctx) + if optional and isinstance(wdl_ident_expr.type, WDL.Type.File): + # To prevent null showing on the terminal for inputs of type File + name_with_file_check = get_expr_name_with_is_file_check(wdl_ident_expr) + return f'{ident_name} === null ? "" : {name_with_file_check}' + return ( + ident_name + if not isinstance(wdl_ident_expr.type, WDL.Type.File) + else f"{ident_name}.path" + ) + + +def get_expr_name_with_is_file_check(wdl_expr: WDL.Expr.Ident) -> str: + """Extract name from WDL expr and check if it's a file path.""" + if wdl_expr is None or not hasattr(wdl_expr, "name"): + raise WDLSourceLine(wdl_expr, ConversionException).makeError( + f"{type(wdl_expr)} has not attribute 'name'" + ) + expr_name = get_input(wdl_expr.name) + is_file = isinstance(wdl_expr.type, WDL.Type.File) + return expr_name if not is_file else f"{expr_name}.path" + + +__all__ = ["get_expr", "get_expr_string", "translate_wdl_placeholder"] diff --git a/wdl2cwl/functions.py b/wdl2cwl/functions.py new file mode 100644 index 00000000..de9cb546 --- /dev/null +++ b/wdl2cwl/functions.py @@ -0,0 +1,270 @@ +"""WDL Stdlib Functions.""" + +import re +from typing import List, Any, cast + +import WDL +from WDL.Expr import Base +from wdl2cwl.errors import WDLSourceLine, ConversionException +from wdl2cwl.expr import ( + get_expr, + get_expr_get, + get_expr_name, + get_expr_apply, + get_expr_name_with_is_file_check, +) +from wdl2cwl.util import ConversionContext, get_input, get_mem_in_bytes + + +def _add(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib add function.""" + add_left_operand = arguments[0] + add_right_operand = get_expr(arguments[1], ctx) + add_left_operand_value = get_expr(add_left_operand, ctx) + treat_as_optional = ( + kwargs["treat_as_optional"] if "treat_as_optional" in kwargs else False + ) + referer = "" + if getattr(add_left_operand, "function_name", None) == "basename": + if "wdl_apply_expr" not in kwargs: + raise ConversionException('Missing "wdl_apply_expr" in _add') + referer = kwargs["wdl_apply_expr"].parent.name + treat_as_optional = True if referer in ctx.non_static_values else False + return ( + f"{add_left_operand_value} + {add_right_operand}" + if not treat_as_optional + else f"{get_input(referer)} === null ? {add_left_operand_value} + {add_right_operand} : {get_input(referer)}" + ) + + +def basename(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib basename function.""" + if len(arguments) == 1: + only_operand = arguments[0] + is_file = isinstance(only_operand.type, WDL.Type.File) + with WDLSourceLine(only_operand, ConversionException): + only_operand_name = get_expr_name(only_operand.expr) # type: ignore + return ( + f"{only_operand_name}.basename" + if is_file + else f"{only_operand_name}.split('/').reverse()[0]" + ) + elif len(arguments) == 2: + operand, suffix = arguments + is_file = isinstance(operand.type, WDL.Type.File) + if isinstance(operand, WDL.Expr.Get): + operand = get_expr_name(operand.expr) # type: ignore + elif isinstance(operand, WDL.Expr.Apply): + operand = get_expr(operand, ctx) # type: ignore + suffix_str = suffix.literal.value # type: ignore + regex_str = re.escape(suffix_str) + return ( + f"{operand}.basename.replace(/{regex_str}$/, '') " + if is_file + else f"{operand}.split('/').reverse()[0].replace(/{regex_str}$/, '')" + ) + raise ConversionException( + f"Invalid number of arguments for basename {len(arguments)}" + ) + + +# noinspection PyUnusedLocal +def defined(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib defined function.""" + only_arg = cast(WDL.Expr.Ident, arguments[0].expr) # type: ignore + return get_expr_name(only_arg) + + +def _interpolation_add( + arguments: List[Base], ctx: ConversionContext, **kwargs: Any +) -> str: + """WDL Stdlib _interpolation_add function.""" + arg_value, arg_name = arguments + if isinstance(arg_name, WDL.Expr.String) and isinstance(arg_value, WDL.Expr.Apply): + return f"{get_expr_apply(arg_value, ctx)} + {get_expr(arg_name, ctx)}" + just_arg_name = get_expr_name(arg_name.expr) # type: ignore + arg_name_with_file_check = get_expr_name_with_is_file_check( + arg_name.expr # type: ignore + ) + treat_as_optional = ( + kwargs["treat_as_optional"] if "treat_as_optional" in kwargs else False + ) + with WDLSourceLine(arg_value, ConversionException): + arg_value = arg_value.literal.value # type: ignore + return ( + f'{just_arg_name} === null ? "" : "{arg_value}" + {arg_name_with_file_check}' + if treat_as_optional + else f"{arg_value} $({arg_name_with_file_check})" + ) + + +def sub(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib sub function.""" + wdl_apply, arg_string, arg_sub = arguments + wdl_apply_expr = get_expr(wdl_apply, ctx) + arg_string_expr = get_expr(arg_string, ctx) + arg_sub_expr = get_expr(arg_sub, ctx) + return f"{wdl_apply_expr}.replace({arg_string_expr}, {arg_sub_expr}) " + + +def _at(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _at function.""" + iterable_object, index = arguments + iterable_object_expr, index_expr = get_expr(iterable_object, ctx), get_expr( + index, ctx + ) + return f"{iterable_object_expr}[{index_expr}]" + + +def _gt(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _gt function.""" + left_operand, right_operand = arguments + if isinstance(left_operand, WDL.Expr.Get): + left_operand_expr = get_expr(left_operand, ctx) + else: + left_operand_expr = get_expr_apply(left_operand, ctx) # type: ignore + right_operand_expr = get_expr(right_operand, ctx) + return f"{left_operand_expr} > {right_operand_expr}" + + +def _lt(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _lt function.""" + left_operand, right_operand = arguments + if isinstance(left_operand, WDL.Expr.Get): + left_operand_expr = get_expr(left_operand, ctx) + else: + left_operand_expr = get_expr_apply(left_operand, ctx) # type: ignore + right_operand_expr = get_expr(right_operand, ctx) + return f"{left_operand_expr} < {right_operand_expr}" + + +def _lor(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _lor function.""" + left_operand, right_operand = arguments + left_operand_expr = get_expr_apply(left_operand, ctx) # type: ignore + right_operand_expr = get_expr(right_operand, ctx) + return f"{left_operand_expr} || {right_operand_expr}" + + +def length(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib length function.""" + only_arg_expr = get_expr_get(cast(WDL.Expr.Get, arguments[0]), ctx) + return f"{only_arg_expr}.length" + + +def _neq(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _neq function.""" + left_operand, right_operand = arguments + if isinstance(left_operand, WDL.Expr.Apply): + left_operand = get_expr_apply(left_operand, ctx) # type: ignore + if isinstance(right_operand, WDL.Expr.Apply): + right_operand = get_expr_apply(right_operand, ctx) # type: ignore + return f"{left_operand} !== {right_operand}" + + +def read_string(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib read_string function.""" + return get_expr(arguments[0], ctx) + + +def read_float(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib read_float function.""" + return get_expr(arguments[0], ctx) + + +def glob(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib glob function.""" + return get_expr(arguments[0], ctx) + + +def select_first(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib select_first function.""" + array_obj = arguments[0] + array_items = [get_expr(item, ctx) for item in array_obj.items] # type: ignore + items_str = ", ".join(array_items) + return f"[{items_str}].find(element => element !== null) " + + +def _mul(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _mul function.""" + left_operand, right_operand = arguments + left_str = get_expr(left_operand, ctx) + right_str = get_expr(right_operand, ctx) + return f"{left_str}*{right_str}" + + +def _eqeq(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _eqeq function.""" + left_operand, right_operand = arguments + left_str = get_expr(left_operand, ctx) + right_str = get_expr(right_operand, ctx) + return f"{left_str} === {right_str}" + + +def ceil(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib ceil function.""" + only_arg = get_expr(arguments[0], ctx) + return f"Math.ceil({only_arg}) " + + +def _div(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _div function.""" + left_operand, right_operand = arguments + left_str = get_expr(left_operand, ctx) + right_str = get_expr(right_operand, ctx) + return f"{left_str}/{right_str}" + + +def _sub(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib _sub function.""" + left_operand, right_operand = arguments + left_str = get_expr(left_operand, ctx) + right_str = get_expr(right_operand, ctx) + return f"{left_str}-{right_str}" + + +def size(arguments: List[Base], ctx: ConversionContext, **kwargs: Any) -> str: + """WDL Stdlib size function.""" + left_operand, right_operand = arguments + if isinstance(left_operand, WDL.Expr.Array): + array_items = [get_expr(item, ctx) for item in left_operand.items] + left = ", ".join(array_items) + left_str = f"[{left}]" + else: + left_str = get_expr(left_operand, ctx) + size_unit = get_expr(right_operand, ctx)[1:-1] + unit_value = get_mem_in_bytes(size_unit) + return ( + "(function(size_of=0)" + + "{" + + f"{left_str}.forEach(function(element)" + + "{ if (element) {" + + "size_of += element.size" + + "}})}" + + f") / {unit_value}" + ) + + +__all__ = [ + "_add", + "basename", + "defined", + "_interpolation_add", + "sub", + "_at", + "_gt", + "_lt", + "_lor", + "length", + "_neq", + "read_string", + "read_float", + "glob", + "select_first", + "_mul", + "_eqeq", + "ceil", + "_div", + "_sub", + "size", +] diff --git a/wdl2cwl/main.py b/wdl2cwl/main.py index b2028f0f..494d7d2f 100644 --- a/wdl2cwl/main.py +++ b/wdl2cwl/main.py @@ -4,33 +4,24 @@ import re import sys import textwrap -from typing import Any, Dict, List, Optional, Set, Union, cast +from typing import Any, Dict, List, Optional, Union, cast import cwl_utils.parser.cwl_v1_2 as cwl -import regex # type: ignore -import WDL from ruamel.yaml import scalarstring from ruamel.yaml.main import YAML +import WDL from wdl2cwl import _logger -from wdl2cwl.errors import WDLSourceLine - -valid_js_identifier = regex.compile( - r"^(?!(?:do|if|in|for|let|new|try|var|case|else|enum|eval|null|this|true|" - r"void|with|break|catch|class|const|false|super|throw|while|yield|delete|export|" - r"import|public|return|static|switch|typeof|default|extends|finally|package|" - r"private|continue|debugger|function|arguments|interface|protected|implements|" - r"instanceof)$)(?:[$_\p{ID_Start}])(?:[$_\u200C\u200D\p{ID_Continue}])*$" +from wdl2cwl.errors import WDLSourceLine, ConversionException +from wdl2cwl.expr import ( + get_expr, + get_expr_apply, + get_expr_get, + get_expr_name, + get_expr_string, + translate_wdl_placeholder, ) - -# ^^ is a combination of https://github.com/tc39/proposal-regexp-unicode-property-escapes#other-examples -# and regex at the bottom of https://stackoverflow.com/a/9392578 -# double checked against https://262.ecma-international.org/5.1/#sec-7.6 -# eval is not on the official list of reserved words, but it is a built-in function - - -class ConversionException(Exception): - """Error during conversion.""" +from wdl2cwl.util import ConversionContext, get_mem_in_bytes def convert(doc: str) -> Dict[str, Any]: @@ -69,32 +60,6 @@ def get_cwl_type(input_type: WDL.Type.Base) -> str: return type_of -def get_mem_in_bytes(unit: str) -> str: - """Determine the value of a memory unit in bytes.""" - with WDLSourceLine(unit, ConversionException): - if unit == "KiB" or unit == "Ki": - mem_in_bytes = "1024^1" - elif unit == "MiB" or unit == "Mi": - mem_in_bytes = "1024^2" - elif unit == "GiB" or unit == "Gi": - mem_in_bytes = "1024^3" - elif unit == "TiB" or unit == "Ti": - mem_in_bytes = "1024^4" - elif unit == "B": - mem_in_bytes = "1024^0" - elif unit == "KB" or unit == "K": - mem_in_bytes = "1000^1" - elif unit == "MB" or unit == "M": - mem_in_bytes = "1000^2" - elif unit == "GB" or unit == "G": - mem_in_bytes = "1000^3" - elif unit == "TB" or unit == "T": - mem_in_bytes = "1000^4" - else: - raise ConversionException(f"Invalid memory unit: ${unit}") - return mem_in_bytes - - def get_outdir_requirement(outdir: Union[WDL.Expr.Get, WDL.Expr.Apply]) -> int: """Produce the memory requirement for the output directory from WDL runtime disks.""" # This is yet to be implemented. After Feature Parity. @@ -102,13 +67,6 @@ def get_outdir_requirement(outdir: Union[WDL.Expr.Get, WDL.Expr.Apply]) -> int: return int(outdir.literal.value) * 1024 # type: ignore -def get_input(input_name: str) -> str: - """Produce a concise, valid CWL expr/param reference lookup string for a given input name.""" - if valid_js_identifier.match(input_name): - return f"inputs.{input_name}" - return f'inputs["{input_name}"]' - - def get_cwl_docker_requirements( wdl_docker: Union[WDL.Expr.Get, WDL.Expr.String] ) -> cwl.ProcessRequirement: @@ -139,52 +97,12 @@ def get_cwl_docker_requirements( return cwl.DockerRequirement(dockerPull=dockerpull) -def get_literal_name( - expr: Union[ - WDL.Expr.Boolean, - WDL.Expr.Int, - WDL.Expr.Float, - WDL.Expr.Array, - ], -) -> str: - """Translate WDL Boolean, Int, Float, or Array Expression.""" - # if the literal expr is used inside WDL.Expr.Apply - # the literal value is what's needed - parent = expr.parent # type: ignore[union-attr] - if isinstance(parent, (WDL.Expr.Apply, WDL.Expr.IfThenElse)): - return expr.literal.value # type: ignore - raise WDLSourceLine(expr, ConversionException).makeError( - f"The parent expression for {expr} is not WDL.Expr.Apply, but {parent}." - ) - - -def get_expr_name(wdl_expr: WDL.Expr.Ident) -> str: - """Extract name from WDL expr.""" - if not hasattr(wdl_expr, "name"): - raise WDLSourceLine(wdl_expr, ConversionException).makeError( - f"{type(wdl_expr)} has not attribute 'name'" - ) - return get_input(wdl_expr.name) - - -def get_expr_name_with_is_file_check(wdl_expr: WDL.Expr.Ident) -> str: - """Extract name from WDL expr and check if it's a file path.""" - if wdl_expr is None or not hasattr(wdl_expr, "name"): - raise WDLSourceLine(wdl_expr, ConversionException).makeError( - f"{type(wdl_expr)} has not attribute 'name'" - ) - expr_name = get_input(wdl_expr.name) - is_file = isinstance(wdl_expr.type, WDL.Type.File) - return expr_name if not is_file else f"{expr_name}.path" - - class Converter: """Object that handles WDL Workflows and task conversion to CWL.""" def __init__(self) -> None: - """Initialize the sets used by the object and prevent inconsistent behaviours.""" - self.non_static_values: Set[str] = set() - self.optional_cwl_null: Set[str] = set() + """Construct a converter.""" + self.ctx = ConversionContext() def load_wdl_objects( self, obj: Union[WDL.Tree.Task, WDL.Tree.Workflow] @@ -200,7 +118,7 @@ def get_workflow_input_expr( ) -> str: """Get name of expression referenced in workflow call inputs.""" if isinstance(wf_expr, WDL.Expr.String): - return self.get_expr_string(wf_expr)[1:-1] + return get_expr_string(wf_expr, self.ctx)[1:-1] wdl_expr = wf_expr.expr if not isinstance(wdl_expr, WDL.Expr.Ident): raise WDLSourceLine(wdl_expr, ConversionException).makeError( @@ -383,7 +301,7 @@ def get_time_minutes_requirement( if isinstance(time_minutes, (WDL.Expr.Int, WDL.Expr.Float)): literal = time_minutes.literal.value # type: ignore return literal * 60 # type: ignore - time_minutes_str = self.get_expr(time_minutes) + time_minutes_str = get_expr(time_minutes, self.ctx) return f"$({time_minutes_str} * 60)" def get_memory_requirement( @@ -403,12 +321,12 @@ def get_memory_literal(self, memory_runtime: WDL.Expr.String) -> float: _, placeholder, unit, _ = memory_runtime.parts with WDLSourceLine(placeholder, ConversionException): if isinstance(placeholder.expr, WDL.Expr.Get): # type: ignore - value_name = self.get_expr_get(placeholder.expr) # type: ignore + value_name = get_expr_get(placeholder.expr, self.ctx) # type: ignore else: - value_name = self.get_expr_apply(placeholder.expr) # type: ignore + value_name = get_expr_apply(placeholder.expr, self.ctx) # type: ignore return self.get_ram_min_js(value_name, unit.strip()) # type: ignore - ram_min = self.get_expr_string(memory_runtime)[1:-1] + ram_min = get_expr_string(memory_runtime, self.ctx)[1:-1] unit_result = re.search(r"[a-zA-Z]+", ram_min) if not unit_result: raise ConversionException("Missing Memory units, yet still a string?") @@ -448,280 +366,6 @@ def get_ram_min_js(self, ram_min_ref_name: str, unit: str) -> str: return js_str - def get_expr(self, wdl_expr: Any) -> str: - """Translate WDL Expressions.""" - if isinstance(wdl_expr, WDL.Expr.Apply): - return self.get_expr_apply(wdl_expr) - elif isinstance(wdl_expr, WDL.Expr.Get): - return self.get_expr_get(wdl_expr) - elif isinstance(wdl_expr, WDL.Expr.IfThenElse): - return self.get_expr_ifthenelse(wdl_expr) - elif isinstance(wdl_expr, WDL.Expr.Placeholder): - return self.translate_wdl_placeholder(wdl_expr) - elif isinstance(wdl_expr, WDL.Expr.String): - return self.get_expr_string(wdl_expr) - elif isinstance(wdl_expr, WDL.Tree.Decl): - return self.get_expr(wdl_expr.expr) - elif isinstance( - wdl_expr, - ( - WDL.Expr.Boolean, - WDL.Expr.Int, - WDL.Expr.Float, - WDL.Expr.Array, - ), - ): - return get_literal_name(wdl_expr) - else: - raise WDLSourceLine(wdl_expr, ConversionException).makeError( - f"The expression '{wdl_expr}' is not handled yet." - ) - - def get_expr_string(self, wdl_expr_string: WDL.Expr.String) -> str: - """Translate WDL String Expressions.""" - if wdl_expr_string.literal is not None: - return f'"{wdl_expr_string.literal.value}"' - string = "" - parts = wdl_expr_string.parts - for index, part in enumerate(parts[1:-1], start=1): - if isinstance( - part, - (WDL.Expr.Placeholder, WDL.Expr.Apply, WDL.Expr.Get, WDL.Expr.Ident), - ): - placeholder = self.get_expr(part) - part = ( - "" if parts[index - 1] == '"' or parts[index - 1] == "'" else "' + " # type: ignore - ) - part += placeholder - part += ( - "" if parts[index + 1] == '"' or parts[index + 1] == "'" else " + '" # type: ignore - ) - string += part - # condition to determine if the opening and closing quotes should be added to string - # for cases where a placeholder begins or ends a WDL.Expr.String - if type(parts[1]) == str: - string = "'" + string - if type(parts[-2]) == str: - string = string + "'" - return string - - def get_expr_ifthenelse(self, wdl_ifthenelse: WDL.Expr.IfThenElse) -> str: - """Translate WDL IfThenElse Expressions.""" - condition = self.get_expr(wdl_ifthenelse.condition) - if_true = self.get_expr(wdl_ifthenelse.consequent) - if_false = self.get_expr(wdl_ifthenelse.alternative) - return f"{condition} ? {if_true} : {if_false}" - - def get_expr_apply(self, wdl_apply_expr: WDL.Expr.Apply) -> str: - """Translate WDL Apply Expressions.""" - function_name = wdl_apply_expr.function_name - arguments = wdl_apply_expr.arguments - if not arguments: - raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError( - f"The '{wdl_apply_expr}' expression has no arguments." - ) - treat_as_optional = wdl_apply_expr.type.optional - if function_name == "_add": - add_left_operand = arguments[0] - add_right_operand = self.get_expr(arguments[1]) - add_left_operand_value = self.get_expr(add_left_operand) - if getattr(add_left_operand, "function_name", None) == "basename": - referer = wdl_apply_expr.parent.name # type: ignore - treat_as_optional = True if referer in self.non_static_values else False - return ( - f"{add_left_operand_value} + {add_right_operand}" - if not treat_as_optional - else f"{get_input(referer)} === null ? {add_left_operand_value} + {add_right_operand} : {get_input(referer)}" - ) - elif function_name == "basename": - if len(arguments) == 1: - only_operand = arguments[0] - is_file = isinstance(only_operand.type, WDL.Type.File) - with WDLSourceLine(only_operand, ConversionException): - only_operand_name = get_expr_name(only_operand.expr) # type: ignore[attr-defined] - return ( - f"{only_operand_name}.basename" - if is_file - else f"{only_operand_name}.split('/').reverse()[0]" - ) - elif len(arguments) == 2: - operand, suffix = arguments - is_file = isinstance(operand.type, WDL.Type.File) - if isinstance(operand, WDL.Expr.Get): - operand = get_expr_name(operand.expr) # type: ignore - elif isinstance(operand, WDL.Expr.Apply): - operand = self.get_expr(operand) # type: ignore - suffix_str = suffix.literal.value # type: ignore - regex_str = re.escape(suffix_str) - return ( - f"{operand}.basename.replace(/{regex_str}$/, '') " - if is_file - else f"{operand}.split('/').reverse()[0].replace(/{regex_str}$/, '')" - ) - elif function_name == "defined": - only_operand = arguments[0] - return get_expr_name(only_operand.expr) # type: ignore - elif function_name == "_interpolation_add": - arg_value, arg_name = arguments - if isinstance(arg_name, WDL.Expr.String) and isinstance( - arg_value, WDL.Expr.Apply - ): - return f"{self.get_expr_apply(arg_value)} + {self.get_expr(arg_name)}" - just_arg_name = get_expr_name(arg_name.expr) # type: ignore - arg_name_with_file_check = get_expr_name_with_is_file_check( - arg_name.expr # type: ignore - ) - with WDLSourceLine(arg_value, ConversionException): - arg_value = arg_value.literal.value # type: ignore - return ( - f'{just_arg_name} === null ? "" : "{arg_value}" + {arg_name_with_file_check}' - if treat_as_optional - else f"{arg_value} $({arg_name_with_file_check})" - ) - elif function_name == "sub": - wdl_apply, arg_string, arg_sub = arguments - wdl_apply_expr = self.get_expr(wdl_apply) # type: ignore - arg_string_expr = self.get_expr(arg_string) - arg_sub_expr = self.get_expr(arg_sub) - return f"{wdl_apply_expr}.replace({arg_string_expr}, {arg_sub_expr}) " - - elif function_name == "_at": - iterable_object, index = arguments - iterable_object_expr, index_expr = self.get_expr( - iterable_object - ), self.get_expr(index) - return f"{iterable_object_expr}[{index_expr}]" - elif function_name == "_gt": - left_operand, right_operand = arguments - if isinstance(left_operand, WDL.Expr.Get): - left_operand_expr = self.get_expr(left_operand) - else: - left_operand_expr = self.get_expr_apply(left_operand) # type: ignore - right_operand_expr = self.get_expr(right_operand) - return f"{left_operand_expr} > {right_operand_expr}" - elif function_name == "_lt": - left_operand, right_operand = arguments - if isinstance(left_operand, WDL.Expr.Get): - left_operand_expr = self.get_expr(left_operand) - else: - left_operand_expr = self.get_expr_apply(left_operand) # type: ignore - right_operand_expr = self.get_expr(right_operand) - return f"{left_operand_expr} < {right_operand_expr}" - elif function_name == "_lor": - left_operand, right_operand = arguments - left_operand_expr = self.get_expr_apply(left_operand) # type: ignore - right_operand_expr = self.get_expr(right_operand) - return f"{left_operand_expr} || {right_operand_expr}" - elif function_name == "length": - only_arg_expr = self.get_expr_get(arguments[0]) # type: ignore - return f"{only_arg_expr}.length" - elif function_name == "_neq": - left_operand, right_operand = arguments - if isinstance(left_operand, WDL.Expr.Apply): - left_operand = self.get_expr_apply(left_operand) # type: ignore - if isinstance(right_operand, WDL.Expr.Apply): - right_operand = self.get_expr_apply(right_operand) # type: ignore - return f"{left_operand} !== {right_operand}" - elif function_name == "read_string": - only_arg = arguments[0] - return self.get_expr(only_arg) - elif function_name == "read_float": - only_arg = arguments[0] - return self.get_expr(only_arg) - elif function_name == "glob": - only_arg = arguments[0] - return self.get_expr(only_arg) - elif function_name == "select_first": - array_obj = arguments[0] - array_items = [self.get_expr(item) for item in array_obj.items] # type: ignore - items_str = ", ".join(array_items) - return f"[{items_str}].find(element => element !== null) " - elif function_name == "_mul": - left_operand, right_operand = arguments - left_str = self.get_expr(left_operand) - right_str = self.get_expr(right_operand) - return f"{left_str}*{right_str}" - elif function_name == "_eqeq": - left_operand, right_operand = arguments - left_str = self.get_expr(left_operand) - right_str = self.get_expr(right_operand) - return f"{left_str} === {right_str}" - elif function_name == "ceil": - only_arg = self.get_expr(arguments[0]) # type: ignore - return f"Math.ceil({only_arg}) " - elif function_name == "_div": - left_operand, right_operand = arguments - left_str = self.get_expr(left_operand) - right_str = self.get_expr(right_operand) - return f"{left_str}/{right_str}" - elif function_name == "_sub": - left_operand, right_operand = arguments - left_str = self.get_expr(left_operand) - right_str = self.get_expr(right_operand) - return f"{left_str}-{right_str}" - elif function_name == "size": - left_operand, right_operand = arguments - if isinstance(left_operand, WDL.Expr.Array): - array_items = [self.get_expr(item) for item in left_operand.items] - left = ", ".join(array_items) - left_str = f"[{left}]" - else: - left_str = self.get_expr(left_operand) - size_unit = self.get_expr(right_operand)[1:-1] - unit_value = get_mem_in_bytes(size_unit) - return ( - "(function(size_of=0)" - + "{" - + f"{left_str}.forEach(function(element)" - + "{ if (element) {" - + "size_of += element.size" - + "}})}" - + f") / {unit_value}" - ) - - raise WDLSourceLine(wdl_apply_expr, ConversionException).makeError( - f"Function name '{function_name}' not yet handled." - ) - - def get_expr_get(self, wdl_get_expr: WDL.Expr.Get) -> str: - """Translate WDL Get Expressions.""" - member = wdl_get_expr.member - - if not member: - return self.get_expr_ident(wdl_get_expr.expr) # type: ignore - struct_name = self.get_expr(wdl_get_expr.expr) - member_str = f"{struct_name}.{member}" - return ( - member_str - if not isinstance(wdl_get_expr.type, WDL.Type.File) - else f"{member_str}.path" - ) - - def get_expr_ident(self, wdl_ident_expr: WDL.Expr.Ident) -> str: - """Translate WDL Ident Expressions.""" - id_name = wdl_ident_expr.name - ident_name = get_input(id_name) - referee = wdl_ident_expr.referee - optional = wdl_ident_expr.type.optional - if referee: - with WDLSourceLine(referee, ConversionException): - if isinstance(referee, WDL.Tree.Call): - return id_name - if referee.expr and ( - wdl_ident_expr.name in self.optional_cwl_null - or wdl_ident_expr.name not in self.non_static_values - ): - return self.get_expr(referee.expr) - if optional and isinstance(wdl_ident_expr.type, WDL.Type.File): - # To prevent null showing on the terminal for inputs of type File - name_with_file_check = get_expr_name_with_is_file_check(wdl_ident_expr) - return f'{ident_name} === null ? "" : {name_with_file_check}' - return ( - ident_name - if not isinstance(wdl_ident_expr.type, WDL.Type.File) - else f"{ident_name}.path" - ) - def get_cpu_requirement(self, cpu_runtime: WDL.Expr.Base) -> str: """Translate WDL Runtime CPU requirement to CWL Resource Requirement.""" if isinstance(cpu_runtime, (WDL.Expr.Int, WDL.Expr.Float)): @@ -733,7 +377,7 @@ def get_cpu_requirement(self, cpu_runtime: WDL.Expr.Base) -> str: int(literal_str) if "." not in literal_str else float(literal_str) ) return numeral # type: ignore - cpu_str = self.get_expr(cpu_runtime) + cpu_str = get_expr(cpu_runtime, self.ctx) return f"$({cpu_str})" def get_cwl_command_requirements( @@ -745,87 +389,13 @@ def get_cwl_command_requirements( if isinstance(wdl_command, str): command_str += wdl_command.replace("$(", "\\$(") elif isinstance(wdl_command, WDL.Expr.Placeholder): - command_str += self.translate_wdl_placeholder(wdl_command) + command_str += translate_wdl_placeholder(wdl_command, self.ctx) command_str = textwrap.dedent(command_str) return cwl.InitialWorkDirRequirement( listing=[cwl.Dirent(entry=command_str, entryname="script.bash")] ) - def translate_wdl_placeholder(self, wdl_placeholder: WDL.Expr.Placeholder) -> str: - """Translate WDL Expr Placeholder to a valid CWL command string.""" - cwl_command_str = "" - expr = wdl_placeholder.expr - if expr is None: - raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( - f"Placeholder '{wdl_placeholder}' has no expr." - ) - placeholder_expr = self.get_expr(expr) - options = wdl_placeholder.options - if options: - if "true" in options: - true_value = options["true"] - false_value = options["false"] - true_str = ( - f'"{true_value}"' if '"' not in true_value else f"'{true_value}'" - ) - false_str = ( - f'"{false_value}"' if '"' not in false_value else f"'{false_value}'" - ) - is_optional = False - if isinstance(expr, WDL.Expr.Get): - is_optional = expr.type.optional - elif isinstance(expr, WDL.Expr.Apply): - is_optional = expr.arguments[0].type.optional - if not is_optional: - cwl_command_str = ( - f"$({placeholder_expr} ? {true_str} : {false_str})" - ) - else: - cwl_command_str = ( - f"$({placeholder_expr} === null ? {false_str} : {true_str})" - ) - elif "sep" in options: - seperator = options["sep"] - if isinstance(expr.type, WDL.Type.Array): - item_type = expr.type.item_type - if isinstance(item_type, WDL.Type.String): - cwl_command_str = f'$({placeholder_expr}.join("{seperator}"))' - elif isinstance(item_type, WDL.Type.File): - cwl_command_str = ( - f"$({placeholder_expr}.map(" - + 'function(el) {return el.path}).join("' - + seperator - + '"))' - ) - else: - raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( - f"{wdl_placeholder} with expr of type {expr.type} is not yet handled" - ) - else: - raise WDLSourceLine(wdl_placeholder, ConversionException).makeError( - f"Placeholders with options {options} are not yet handled." - ) - else: - # for the one case where the $(input.some_input_name) is used within the placeholder_expr - # we return the placeholder_expr without enclosing in another $() - cwl_command_str = ( - f"$({placeholder_expr})" - if placeholder_expr[-1] != ")" - else placeholder_expr - ) - # sometimes placeholders are used inside WDL.Expr.String. - # with the parent and grand_parent we can confirm that we are in - # the command string (WDL.Expr.String) and task (WDL.Tree.Task) respectively - parent = wdl_placeholder.parent # type: ignore - grand_parent = parent.parent - return ( - cwl_command_str - if isinstance(parent, WDL.Expr.String) - and isinstance(grand_parent, WDL.Tree.Task) - else cwl_command_str[2:-1] - ) - def get_cwl_task_inputs( self, wdl_inputs: Optional[List[WDL.Tree.Decl]] ) -> List[cwl.CommandInputParameter]: @@ -837,7 +407,7 @@ def get_cwl_task_inputs( for wdl_input in wdl_inputs: input_name = wdl_input.name - self.non_static_values.add(input_name) + self.ctx.non_static_values.add(input_name) input_value = None type_of: Union[ str, cwl.CommandInputArraySchema, cwl.CommandInputRecordSchema @@ -872,7 +442,7 @@ def get_cwl_task_inputs( cwl.CommandInputRecordSchema, ] = [type_of, "null"] if isinstance(wdl_input.expr, WDL.Expr.Apply): - self.optional_cwl_null.add(input_name) + self.ctx.optional_cwl_null.add(input_name) else: final_type_of = type_of @@ -942,7 +512,7 @@ def get_cwl_task_outputs( isinstance(wdl_output.expr, WDL.Expr.Apply) and wdl_output.expr.function_name == "read_string" ): - glob_expr = self.get_expr(wdl_output) + glob_expr = get_expr(wdl_output, self.ctx) is_literal = wdl_output.expr.arguments[0].literal if is_literal: glob_str = glob_expr[ @@ -966,7 +536,7 @@ def get_cwl_task_outputs( isinstance(wdl_output.expr, WDL.Expr.Apply) and wdl_output.expr.function_name == "read_float" ): - glob_expr = self.get_expr(wdl_output) + glob_expr = get_expr(wdl_output, self.ctx) is_literal = wdl_output.expr.arguments[0].literal if is_literal: glob_str = glob_expr[ @@ -997,7 +567,7 @@ def get_cwl_task_outputs( ) ) else: - glob_expr = self.get_expr(wdl_output) + glob_expr = get_expr(wdl_output, self.ctx) glob_str = f"$({glob_expr})" if wdl_output.type.optional: diff --git a/wdl2cwl/tests/__init__.py b/wdl2cwl/tests/__init__.py index e69de29b..d420712d 100644 --- a/wdl2cwl/tests/__init__.py +++ b/wdl2cwl/tests/__init__.py @@ -0,0 +1 @@ +"""Tests.""" diff --git a/wdl2cwl/tests/test_cwl.py b/wdl2cwl/tests/test_cwl.py index db4316cb..338c633e 100644 --- a/wdl2cwl/tests/test_cwl.py +++ b/wdl2cwl/tests/test_cwl.py @@ -6,7 +6,8 @@ import pytest -from ..main import ConversionException, Converter, main +from wdl2cwl.main import Converter, main +from wdl2cwl.errors import ConversionException def get_file(path: str) -> str: diff --git a/wdl2cwl/tests/test_wdlsourceline.py b/wdl2cwl/tests/test_wdlsourceline.py index 7267cfcb..32b0cf7f 100644 --- a/wdl2cwl/tests/test_wdlsourceline.py +++ b/wdl2cwl/tests/test_wdlsourceline.py @@ -5,8 +5,7 @@ import pytest import WDL -from ..errors import WDLSourceLine -from ..main import ConversionException, Converter +from wdl2cwl.errors import WDLSourceLine, ConversionException from .test_cwl import get_file diff --git a/wdl2cwl/util.py b/wdl2cwl/util.py new file mode 100644 index 00000000..d5d92f07 --- /dev/null +++ b/wdl2cwl/util.py @@ -0,0 +1,64 @@ +"""Temporary module for WDL functions that do not belong to the converter.""" + +from dataclasses import dataclass, field +from typing import Set + +import regex # type: ignore + +from wdl2cwl.errors import ConversionException, WDLSourceLine + +# A combination of https://github.com/tc39/proposal-regexp-unicode-property-escapes#other-examples +# and regex at the bottom of https://stackoverflow.com/a/9392578. +# Double-checked against https://262.ecma-international.org/5.1/#sec-7.6. +# ``eval`` is not on the official list of reserved-words, but it is a built-in function. +valid_js_identifier = regex.compile( + r"^(?!(?:do|if|in|for|let|new|try|var|case|else|enum|eval|null|this|true|" + r"void|with|break|catch|class|const|false|super|throw|while|yield|delete|export|" + r"import|public|return|static|switch|typeof|default|extends|finally|package|" + r"private|continue|debugger|function|arguments|interface|protected|implements|" + r"instanceof)$)(?:[$_\p{ID_Start}])(?:[$_\u200C\u200D\p{ID_Continue}])*$" +) + + +def get_input(input_name: str) -> str: + """Produce a concise, valid CWL expr/param reference lookup string for a given input name.""" + if valid_js_identifier.match(input_name): + return f"inputs.{input_name}" + return f'inputs["{input_name}"]' + + +def get_mem_in_bytes(unit: str) -> str: + """Determine the value of a memory unit in bytes.""" + with WDLSourceLine(unit, ConversionException): + if unit == "KiB" or unit == "Ki": + mem_in_bytes = "1024^1" + elif unit == "MiB" or unit == "Mi": + mem_in_bytes = "1024^2" + elif unit == "GiB" or unit == "Gi": + mem_in_bytes = "1024^3" + elif unit == "TiB" or unit == "Ti": + mem_in_bytes = "1024^4" + elif unit == "B": + mem_in_bytes = "1024^0" + elif unit == "KB" or unit == "K": + mem_in_bytes = "1000^1" + elif unit == "MB" or unit == "M": + mem_in_bytes = "1000^2" + elif unit == "GB" or unit == "G": + mem_in_bytes = "1000^3" + elif unit == "TB" or unit == "T": + mem_in_bytes = "1000^4" + else: + raise ConversionException(f"Invalid memory unit: ${unit}") + return mem_in_bytes + + +@dataclass +class ConversionContext: + """Converstion context, including variables used throughout the conversion process.""" + + non_static_values: Set[str] = field(default_factory=set) + optional_cwl_null: Set[str] = field(default_factory=set) + + +__all__ = ["get_input", "get_mem_in_bytes", "ConversionContext"]