diff --git a/grizzly/args.py b/grizzly/args.py index b7d7cd46..f512a74d 100644 --- a/grizzly/args.py +++ b/grizzly/args.py @@ -190,7 +190,7 @@ def __init__(self): "--version", "-V", action="version", - version=__version__ if __version__ else "Unknown - Package not installed.", + version=__version__, help="Show version number", ) if system().startswith("Linux"): diff --git a/grizzly/common/utils.py b/grizzly/common/utils.py index dd4f1049..aa43f84c 100644 --- a/grizzly/common/utils.py +++ b/grizzly/common/utils.py @@ -10,6 +10,7 @@ from pathlib import Path from shutil import rmtree from tempfile import gettempdir +from typing import Any, Dict, Iterable, Optional, Tuple, Union from cryptography import x509 from cryptography.hazmat.backends import default_backend @@ -37,7 +38,7 @@ __version__ = version("grizzly-framework") except PackageNotFoundError: # pragma: no cover # package is not installed - __version__ = None + __version__ = "unknown" DEFAULT_TIME_LIMIT = 30 GRZ_TMP = Path(getenv("GRZ_TMP", gettempdir()), "grizzly") @@ -50,14 +51,14 @@ class CertificateBundle: """Contains root CA, host CA and private key files.""" - def __init__(self, path, root, host, key): + def __init__(self, path: Path, root: Path, host: Path, key: Path) -> None: self._base = path self.root = root self.host = host self.key = key @classmethod - def create(cls, path=None): + def create(cls, path: Optional[Path] = None) -> "CertificateBundle": """Create certificate files. Args: @@ -71,7 +72,7 @@ def create(cls, path=None): certs = generate_certificates(path) return cls(path, certs["root"], certs["host"], certs["key"]) - def cleanup(self): + def cleanup(self) -> None: """Remove certificate files. Args: @@ -87,7 +88,7 @@ def cleanup(self): class ConfigError(Exception): """Raised to indicate invalid configuration a state""" - def __init__(self, message, exit_code): + def __init__(self, message: str, exit_code: int) -> None: super().__init__(message) self.exit_code = exit_code @@ -109,11 +110,11 @@ class Exit(IntEnum): FAILURE = 5 -def configure_logging(log_level): +def configure_logging(log_level: int) -> None: """Configure log output level and formatting. Args: - log_level (int): Set log level. + log_level: Set log level. Returns: None @@ -130,13 +131,13 @@ def configure_logging(log_level): basicConfig(format=log_fmt, datefmt=date_fmt, level=log_level) -def display_time_limits(time_limit, timeout, no_harness): +def display_time_limits(time_limit: int, timeout: int, no_harness: bool) -> None: """Output configuration of time limits and harness. Args: - time_limit (int): Time in seconds before harness attempts to close current test. - timeout (int): Time in seconds before iteration is considered a timeout. - no_harness (bool): Indicate whether harness will is disabled. + time_limit: Time in seconds before harness attempts to close current test. + timeout: Time in seconds before iteration is considered a timeout. + no_harness: Indicate whether harness will is disabled. Returns: None @@ -156,13 +157,21 @@ def display_time_limits(time_limit, timeout, no_harness): LOG.warning("TIMEOUT DISABLED, not recommended for automation") -def grz_tmp(*subdir): +def grz_tmp(*subdir: Union[str, Path]) -> Path: + """Create (if needed) a temporary working directory in a known location. + + Args: + subdir: Nested directories. + + Returns: + Path within the temporary working directory. + """ path = Path(GRZ_TMP, *subdir) path.mkdir(parents=True, exist_ok=True) return path -def generate_certificates(cert_dir: Path): +def generate_certificates(cert_dir: Path) -> Dict[str, Path]: """Generate a root CA and host certificate. Taken from https://stackoverflow.com/a/56292132 @@ -229,24 +238,25 @@ def generate_certificates(cert_dir: Path): def time_limits( - time_limit, - timeout, - tests=None, - default_limit=DEFAULT_TIME_LIMIT, - timeout_delay=TIMEOUT_DELAY, -): + time_limit: Optional[int], + timeout: Optional[int], + # NOTE: Any should be TestCase, this function should likely live somewhere else + tests: Optional[Iterable[Any]] = None, + default_limit: int = DEFAULT_TIME_LIMIT, + timeout_delay: int = TIMEOUT_DELAY, +) -> Tuple[int, int]: """Determine the test time limit and timeout. If time_limit or timeout is None it is calculated otherwise the provided value is used. Args: - time_limit (int): Test time limit. - timeout (int): Iteration timeout. - tests (iterable): Testcases that may contain time limit values. - default_limit (int): Value to use as default time limit. - timeout_delay (int): Value to use as delay when calculating timeout. + time_limit: Test time limit. + timeout: Iteration timeout. + tests: Testcases that may contain time limit values. + default_limit: Value to use as default time limit. + timeout_delay: Value to use as delay when calculating timeout. Returns: - tuple (int, int): Time limit and timeout. + Time limit and timeout. """ assert default_limit > 0 assert timeout_delay >= 0 @@ -259,10 +269,9 @@ def time_limits( # add small time buffer to duration test_limits.extend(int(x.duration) + 10 for x in tests if x.duration) time_limit = max(test_limits) - assert time_limit > 0 + assert time_limit is not None and time_limit > 0 # calculate timeout - calc_timeout = timeout is None - if calc_timeout: + if timeout is None: timeout = time_limit + timeout_delay elif calc_limit and time_limit > timeout > 0: LOG.debug("calculated time limit > given timeout, using timeout")