diff --git a/docs/source/conf.py b/docs/source/conf.py index db411cb073..58175c2c8b 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -11,6 +11,7 @@ # # All configuration values have a default; values that are commented out # serve to show the default. +from __future__ import annotations import importlib import os @@ -18,7 +19,6 @@ import sys from inspect import getmembers, isclass, isfunction from pathlib import Path -from typing import List, Tuple from click import secho, style @@ -55,7 +55,7 @@ "notfound.extension", ] -# enable autosummary plugin (table of contents for modules/classes/class +# enable autosummary plugin (table of contents for modules/classes/class # methods) autosummary_generate = True autosummary_generate_overwrite = False @@ -102,6 +102,9 @@ "tuple", "Any", "Dict", + "dict", + "list", + "set", "typing.Dict", "typing.Iterable", "typing.List", @@ -122,7 +125,9 @@ "kedro.framework.context.context.KedroContext", "kedro.framework.startup.ProjectMetadata", "abc.ABC", + "Path", "pathlib.Path", + "PurePosixPath", "pathlib.PurePosixPath", "requests.auth.AuthBase", "google.oauth2.credentials.Credentials", @@ -133,6 +138,7 @@ "kedro.extras.datasets.pandas.json_dataset.JSONDataSet", "kedro_datasets.pandas.json_dataset.JSONDataSet", "pluggy._manager.PluginManager", + "PluginManager", "_DI", "_DO", # The statements below were added after subclassing UserDict in AbstractConfigLoader. @@ -330,7 +336,7 @@ def remove_arrows_in_examples(lines): lines[i] = line.replace(">>>", "") -def autolink_replacements(what: str) -> List[Tuple[str, str, str]]: +def autolink_replacements(what: str) -> list[tuple[str, str, str]]: """ Create a list containing replacement tuples of the form: (``regex``, ``replacement``, ``obj``) for all classes and methods which are @@ -402,7 +408,7 @@ def autolink_replacements(what: str) -> List[Tuple[str, str, str]]: return replacements, suggestions -def log_suggestions(lines: List[str], name: str): +def log_suggestions(lines: list[str], name: str): """Use the ``suggestions`` list to log in the terminal places where the developer has forgotten to surround with double back-ticks class name/function name references. diff --git a/features/environment.py b/features/environment.py index 4bf320f922..c98246dc85 100644 --- a/features/environment.py +++ b/features/environment.py @@ -1,16 +1,16 @@ """Behave environment setup commands.""" # pylint: disable=unused-argument +from __future__ import annotations import os import shutil import tempfile import venv from pathlib import Path -from typing import Set from features.steps.sh_run import run -_PATHS_TO_REMOVE: Set[Path] = set() +_PATHS_TO_REMOVE: set[Path] = set() FRESH_VENV_TAG = "fresh_venv" diff --git a/features/steps/sh_run.py b/features/steps/sh_run.py index 52f3beafef..7e49e7a0ca 100644 --- a/features/steps/sh_run.py +++ b/features/steps/sh_run.py @@ -1,12 +1,14 @@ +from __future__ import annotations + import shlex import subprocess -from typing import Any, List, Union +from typing import Any import psutil def run( - cmd: Union[list, str], split: bool = True, print_output: bool = False, **kwargs: Any + cmd: list | str, split: bool = True, print_output: bool = False, **kwargs: Any ) -> subprocess.CompletedProcess: """Run a shell command. @@ -45,7 +47,7 @@ def run( return result -def check_run(cmd: Union[list, str], print_output: bool = False) -> None: +def check_run(cmd: list | str, print_output: bool = False) -> None: """ Run cmd using subprocess.check_call (throws error if non-zero value returned) @@ -74,7 +76,7 @@ class ChildTerminatingPopen(subprocess.Popen): dies (so-called orphan processes) """ - def __init__(self, cmd: List[str], **kwargs) -> None: + def __init__(self, cmd: list[str], **kwargs) -> None: """ Initializer pipes stderr and stdout. diff --git a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py index 2566f5e95d..4f2f1eeb89 100644 --- a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py +++ b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py @@ -1,11 +1,11 @@ """Project pipelines.""" -from typing import Dict +from __future__ import annotations from kedro.framework.project import find_pipelines from kedro.pipeline import Pipeline, pipeline -def register_pipelines() -> Dict[str, Pipeline]: +def register_pipelines() -> dict[str, Pipeline]: """Register the project's pipelines. Returns: diff --git a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/nodes.py b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/nodes.py index 87e650db74..3f9c8e1337 100644 --- a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/nodes.py +++ b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_engineering/nodes.py @@ -3,13 +3,14 @@ PLEASE DELETE THIS FILE ONCE YOU START WORKING ON YOUR OWN PROJECT! """ +from __future__ import annotations -from typing import Any, Dict +from typing import Any import pandas as pd -def split_data(data: pd.DataFrame, example_test_data_ratio: float) -> Dict[str, Any]: +def split_data(data: pd.DataFrame, example_test_data_ratio: float) -> dict[str, Any]: """Node for splitting the classical Iris data set into training and test sets, each split into features and labels. The split ratio parameter is taken from conf/project/parameters.yml. diff --git a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/nodes.py b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/nodes.py index b91e2c2c37..41efe547cd 100644 --- a/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/nodes.py +++ b/features/steps/test_starter/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipelines/data_science/nodes.py @@ -4,16 +4,17 @@ Delete this when you start working on your own Kedro project. """ # pylint: disable=invalid-name +from __future__ import annotations import logging -from typing import Any, Dict +from typing import Any import numpy as np import pandas as pd def train_model( - train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: Dict[str, Any] + train_x: pd.DataFrame, train_y: pd.DataFrame, parameters: dict[str, Any] ) -> np.ndarray: """Node for training a simple multi-class logistic regression model. The number of training iterations as well as the learning rate are taken from diff --git a/features/steps/util.py b/features/steps/util.py index ee903e0940..f65a4adfa3 100644 --- a/features/steps/util.py +++ b/features/steps/util.py @@ -1,12 +1,13 @@ """Common functions for e2e testing. """ +from __future__ import annotations import os import re from contextlib import contextmanager from pathlib import Path from time import sleep, time -from typing import Any, Callable, Iterator, List +from typing import Any, Callable, Iterator @contextmanager @@ -72,7 +73,7 @@ def wait_for( ) -def parse_csv(text: str) -> List[str]: +def parse_csv(text: str) -> list[str]: """Parse comma separated **double quoted** strings in behave steps Args: diff --git a/kedro/config/abstract_config.py b/kedro/config/abstract_config.py index 0515b96ee7..70a463fc15 100644 --- a/kedro/config/abstract_config.py +++ b/kedro/config/abstract_config.py @@ -1,8 +1,10 @@ """This module provides ``kedro.abstract_config`` with the baseline class model for a `ConfigLoader` implementation. """ +from __future__ import annotations + from collections import UserDict -from typing import Any, Dict +from typing import Any class AbstractConfigLoader(UserDict): @@ -16,8 +18,8 @@ def __init__( self, conf_source: str, env: str = None, - runtime_params: Dict[str, Any] = None, - **kwargs + runtime_params: dict[str, Any] = None, + **kwargs, ): super().__init__() self.conf_source = conf_source diff --git a/kedro/config/common.py b/kedro/config/common.py index f4ea092fb0..afa823cc9d 100644 --- a/kedro/config/common.py +++ b/kedro/config/common.py @@ -1,11 +1,12 @@ """This module contains methods and facade interfaces for various ConfigLoader implementations. """ +from __future__ import annotations import logging from glob import iglob from pathlib import Path -from typing import AbstractSet, Any, Dict, Iterable, List, Set +from typing import AbstractSet, Any, Iterable from warnings import warn from yaml.parser import ParserError @@ -28,8 +29,8 @@ def _get_config_from_patterns( conf_paths: Iterable[str], patterns: Iterable[str] = None, ac_template: bool = False, - ac_context: Dict[str, Any] = None, -) -> Dict[str, Any]: + ac_context: dict[str, Any] = None, +) -> dict[str, Any]: """Recursively scan for configuration files, load and merge them, and return them in the form of a config dictionary. @@ -64,8 +65,8 @@ def _get_config_from_patterns( "pattern to match config filenames against." ) - config: Dict[str, Any] = {} - processed_files: Set[Path] = set() + config: dict[str, Any] = {} + processed_files: set[Path] = set() for conf_path in conf_paths: if not Path(conf_path).is_dir(): @@ -104,8 +105,8 @@ def _get_config_from_patterns( def _load_config_file( - config_file: Path, ac_template: bool = False, ac_context: Dict[str, Any] = None -) -> Dict[str, Any]: + config_file: Path, ac_template: bool = False, ac_context: dict[str, Any] = None +) -> dict[str, Any]: """Load an individual config file using `anyconfig` as a backend. Args: @@ -149,8 +150,8 @@ def _load_config_file( def _load_configs( - config_filepaths: List[Path], ac_template: bool, ac_context: Dict[str, Any] = None -) -> Dict[str, Any]: + config_filepaths: list[Path], ac_template: bool, ac_context: dict[str, Any] = None +) -> dict[str, Any]: """Recursively load all configuration files, which satisfy a given list of glob patterns from a specific path. @@ -173,7 +174,7 @@ def _load_configs( """ aggregate_config = {} - seen_file_to_keys: Dict[Path, AbstractSet[str]] = {} + seen_file_to_keys: dict[Path, AbstractSet[str]] = {} for config_filepath in config_filepaths: single_config = _load_config_file( @@ -189,9 +190,9 @@ def _load_configs( def _lookup_config_filepaths( conf_path: Path, patterns: Iterable[str], - processed_files: Set[Path], + processed_files: set[Path], logger: Any, -) -> List[Path]: +) -> list[Path]: config_files = _path_lookup(conf_path, patterns) seen_files = config_files & processed_files @@ -207,7 +208,7 @@ def _lookup_config_filepaths( def _remove_duplicates(items: Iterable[str]): """Remove duplicates while preserving the order.""" - unique_items: List[str] = [] + unique_items: list[str] = [] for item in items: if item not in unique_items: unique_items.append(item) @@ -220,7 +221,7 @@ def _remove_duplicates(items: Iterable[str]): def _check_duplicate_keys( - processed_files: Dict[Path, AbstractSet[str]], filepath: Path, conf: Dict[str, Any] + processed_files: dict[Path, AbstractSet[str]], filepath: Path, conf: dict[str, Any] ) -> None: duplicates = [] @@ -238,7 +239,7 @@ def _check_duplicate_keys( raise ValueError(f"Duplicate keys found in {filepath} and:\n- {dup_str}") -def _path_lookup(conf_path: Path, patterns: Iterable[str]) -> Set[Path]: +def _path_lookup(conf_path: Path, patterns: Iterable[str]) -> set[Path]: """Return a set of all configuration files from ``conf_path`` or its subdirectories, which satisfy a given list of glob patterns. diff --git a/kedro/config/config.py b/kedro/config/config.py index 557138f9ed..f50e4b8f52 100644 --- a/kedro/config/config.py +++ b/kedro/config/config.py @@ -1,8 +1,10 @@ """This module provides ``kedro.config`` with the functionality to load one or more configuration files from specified paths. """ +from __future__ import annotations + from pathlib import Path -from typing import Any, Dict, Iterable, List +from typing import Any, Iterable from kedro.config.abstract_config import AbstractConfigLoader from kedro.config.common import _get_config_from_patterns, _remove_duplicates @@ -68,8 +70,8 @@ def __init__( self, conf_source: str, env: str = None, - runtime_params: Dict[str, Any] = None, - config_patterns: Dict[str, List[str]] = None, + runtime_params: dict[str, Any] = None, + config_patterns: dict[str, list[str]] = None, *, base_env: str = "base", default_run_env: str = "local", @@ -125,7 +127,7 @@ def conf_paths(self): """Property method to return deduplicated configuration paths.""" return _remove_duplicates(self._build_conf_paths()) - def get(self, *patterns: str) -> Dict[str, Any]: # type: ignore + def get(self, *patterns: str) -> dict[str, Any]: # type: ignore return _get_config_from_patterns( conf_paths=self.conf_paths, patterns=list(patterns) ) diff --git a/kedro/config/omegaconf_config.py b/kedro/config/omegaconf_config.py index ca1dbc2173..d04ea35b3b 100644 --- a/kedro/config/omegaconf_config.py +++ b/kedro/config/omegaconf_config.py @@ -1,11 +1,13 @@ """This module provides ``kedro.config`` with the functionality to load one or more configuration files of yaml or json type from specified paths through OmegaConf. """ +from __future__ import annotations + import io import logging import mimetypes from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional, Set # noqa +from typing import Any, Iterable import fsspec from omegaconf import OmegaConf @@ -75,9 +77,9 @@ def __init__( self, conf_source: str, env: str = None, - runtime_params: Dict[str, Any] = None, + runtime_params: dict[str, Any] = None, *, - config_patterns: Dict[str, List[str]] = None, + config_patterns: dict[str, list[str]] = None, base_env: str = "base", default_run_env: str = "local", ): @@ -130,7 +132,7 @@ def __init__( runtime_params=runtime_params, ) - def __getitem__(self, key) -> Dict[str, Any]: + def __getitem__(self, key) -> dict[str, Any]: """Get configuration files by key, load and merge them, and return them in the form of a config dictionary. @@ -209,8 +211,8 @@ def load_and_merge_dir_config( self, conf_path: str, patterns: Iterable[str], - read_environment_variables: Optional[bool] = False, - ) -> Dict[str, Any]: + read_environment_variables: bool | None = False, + ) -> dict[str, Any]: """Recursively load and merge all configuration files in a directory using OmegaConf, which satisfy a given list of glob patterns from a specific path. @@ -287,7 +289,7 @@ def _is_valid_config_path(self, path): ] @staticmethod - def _check_duplicates(seen_files_to_keys: Dict[Path, Set[Any]]): + def _check_duplicates(seen_files_to_keys: dict[Path, set[Any]]): duplicates = [] filepaths = list(seen_files_to_keys.keys()) @@ -311,7 +313,7 @@ def _check_duplicates(seen_files_to_keys: Dict[Path, Set[Any]]): raise ValueError(f"{dup_str}") @staticmethod - def _resolve_environment_variables(config: Dict[str, Any]) -> None: + def _resolve_environment_variables(config: dict[str, Any]) -> None: """Use the ``oc.env`` resolver to read environment variables and replace them in-place, clearing the resolver after the operation is complete if it was not registered beforehand. diff --git a/kedro/config/templated_config.py b/kedro/config/templated_config.py index c6cec1bbf5..b0e319ad8f 100644 --- a/kedro/config/templated_config.py +++ b/kedro/config/templated_config.py @@ -2,10 +2,12 @@ or more configuration files from specified paths, and format template strings with the values from the passed dictionary. """ +from __future__ import annotations + import re from copy import deepcopy from pathlib import Path -from typing import Any, Dict, Iterable, List, Optional +from typing import Any, Iterable import jmespath @@ -91,13 +93,13 @@ def __init__( self, conf_source: str, env: str = None, - runtime_params: Dict[str, Any] = None, - config_patterns: Dict[str, List[str]] = None, + runtime_params: dict[str, Any] = None, + config_patterns: dict[str, list[str]] = None, *, base_env: str = "base", default_run_env: str = "local", - globals_pattern: Optional[str] = None, - globals_dict: Optional[Dict[str, Any]] = None, + globals_pattern: str | None = None, + globals_dict: dict[str, Any] | None = None, ): """Instantiates a ``TemplatedConfigLoader``. @@ -162,7 +164,7 @@ def conf_paths(self): """Property method to return deduplicated configuration paths.""" return _remove_duplicates(self._build_conf_paths()) - def get(self, *patterns: str) -> Dict[str, Any]: # type: ignore + def get(self, *patterns: str) -> dict[str, Any]: # type: ignore """Tries to resolve the template variables in the config dictionary provided by the ``ConfigLoader`` (super class) ``get`` method using the dictionary of replacement values obtained in the ``__init__`` method. @@ -194,7 +196,7 @@ def _build_conf_paths(self) -> Iterable[str]: ] -def _format_object(val: Any, format_dict: Dict[str, Any]) -> Any: +def _format_object(val: Any, format_dict: dict[str, Any]) -> Any: """Recursive function that loops through the values of a map. In case another map or a list is encountered, it calls itself. When a string is encountered, it will use the `format_dict` to replace strings that look like `${expr}`, diff --git a/kedro/framework/cli/hooks/specs.py b/kedro/framework/cli/hooks/specs.py index 855504d585..cc8c23a9f2 100644 --- a/kedro/framework/cli/hooks/specs.py +++ b/kedro/framework/cli/hooks/specs.py @@ -2,7 +2,7 @@ For more information about these specifications, please visit [Pluggy's documentation](https://pluggy.readthedocs.io/en/stable/#specs) """ -from typing import List +from __future__ import annotations from kedro.framework.startup import ProjectMetadata @@ -16,7 +16,7 @@ class CLICommandSpecs: def before_command_run( self, project_metadata: ProjectMetadata, - command_args: List[str], + command_args: list[str], ): """Hooks to be invoked before a CLI command runs. It receives the ``project_metadata`` as well as @@ -31,7 +31,7 @@ def before_command_run( @cli_hook_spec def after_command_run( - self, project_metadata: ProjectMetadata, command_args: List[str], exit_code: int + self, project_metadata: ProjectMetadata, command_args: list[str], exit_code: int ): """Hooks to be invoked after a CLI command runs. It receives the ``project_metadata`` as well as diff --git a/kedro/framework/cli/jupyter.py b/kedro/framework/cli/jupyter.py index f2a420af56..68eb8b2c85 100644 --- a/kedro/framework/cli/jupyter.py +++ b/kedro/framework/cli/jupyter.py @@ -1,6 +1,8 @@ """A collection of helper functions to integrate with Jupyter/IPython and CLI commands for working with Kedro catalog. """ +from __future__ import annotations + import json import os import shutil @@ -8,7 +10,7 @@ from collections import Counter from glob import iglob from pathlib import Path -from typing import Any, Dict +from typing import Any from warnings import warn import click @@ -300,7 +302,7 @@ def _export_nodes(filepath: Path, output_path: Path) -> None: warn(f"Skipping notebook '{filepath}' - no nodes to export.") -def _append_source_code(cell: Dict[str, Any], path: Path) -> None: +def _append_source_code(cell: dict[str, Any], path: Path) -> None: source_code = "".join(cell["source"]).strip() + "\n" with path.open(mode="a") as file_: file_.write(source_code) diff --git a/kedro/framework/cli/micropkg.py b/kedro/framework/cli/micropkg.py index 42b3a61a0d..ce7d28fb8b 100644 --- a/kedro/framework/cli/micropkg.py +++ b/kedro/framework/cli/micropkg.py @@ -1,4 +1,5 @@ """A collection of CLI commands for working with Kedro micro-packages.""" +from __future__ import annotations import re import shutil @@ -7,7 +8,7 @@ import tempfile from importlib import import_module from pathlib import Path -from typing import Iterable, List, Optional, Set, Tuple, Union +from typing import Iterable, List, Tuple, Union import click import pkg_resources @@ -279,7 +280,7 @@ def package_micropkg( click.secho(message, fg="green") -def _get_fsspec_filesystem(location: str, fs_args: Optional[str]): +def _get_fsspec_filesystem(location: str, fs_args: str | None): # pylint: disable=import-outside-toplevel import anyconfig import fsspec @@ -314,7 +315,7 @@ def safe_extract(tar, path): tar.extractall(path) # nosec B202 -def _unpack_sdist(location: str, destination: Path, fs_args: Optional[str]) -> None: +def _unpack_sdist(location: str, destination: Path, fs_args: str | None) -> None: filesystem = _get_fsspec_filesystem(location, fs_args) if location.endswith(".tar.gz") and filesystem and filesystem.exists(location): @@ -353,10 +354,10 @@ def _refactor_code_for_unpacking( project: Project, package_path: Path, tests_path: Path, - alias: Optional[str], - destination: Optional[str], + alias: str | None, + destination: str | None, project_metadata: ProjectMetadata, -) -> Tuple[Path, Path]: +) -> tuple[Path, Path]: """This is the reverse operation of `_refactor_code_for_package`, i.e we go from: # also the root of the Rope project @@ -478,9 +479,9 @@ def _install_files( # pylint: disable=too-many-arguments, too-many-locals def _find_config_files( - source_config_dir: Path, glob_patterns: List[str] -) -> List[Tuple[Path, str]]: - config_files: List[Tuple[Path, str]] = [] + source_config_dir: Path, glob_patterns: list[str] +) -> list[tuple[Path, str]]: + config_files: list[tuple[Path, str]] = [] if source_config_dir.is_dir(): config_files = [ @@ -561,13 +562,13 @@ def _get_sdist_name(name, version): return f"{name}-{version}.tar.gz" -def _sync_path_list(source: List[Tuple[Path, str]], target: Path) -> None: +def _sync_path_list(source: list[tuple[Path, str]], target: Path) -> None: for source_path, suffix in source: target_with_suffix = (target / suffix).resolve() _sync_dirs(source_path, target_with_suffix) -def _make_install_requires(requirements_txt: Path) -> List[str]: +def _make_install_requires(requirements_txt: Path) -> list[str]: """Parses each line of requirements.txt into a version specifier valid to put in install_requires.""" if not requirements_txt.exists(): @@ -628,7 +629,7 @@ def _refactor_code_for_package( project: Project, package_path: Path, tests_path: Path, - alias: Optional[str], + alias: str | None, project_metadata: ProjectMetadata, ) -> None: """In order to refactor the imports properly, we need to recreate @@ -714,7 +715,7 @@ def _move_package_with_conflicting_name(target: Path, conflicting_name: str): def _generate_sdist_file( micropkg_name: str, destination: Path, - source_paths: Tuple[_SourcePathType, ...], + source_paths: tuple[_SourcePathType, ...], version: str, metadata: ProjectMetadata, alias: str = None, @@ -787,7 +788,7 @@ def _generate_manifest_file(output_dir: Path): def _generate_setup_file( - package_name: str, version: str, install_requires: List[str], output_dir: Path + package_name: str, version: str, install_requires: list[str], output_dir: Path ) -> Path: setup_file = output_dir / "setup.py" @@ -803,7 +804,7 @@ def _generate_setup_file( def _get_package_artifacts( source_path: Path, package_name: str -) -> Tuple[Path, Path, Path]: +) -> tuple[Path, Path, Path]: """From existing package, returns in order: source_path, tests_path, config_path """ @@ -817,7 +818,7 @@ def _get_package_artifacts( def _append_package_reqs( - requirements_txt: Path, package_reqs: List[str], package_name: str + requirements_txt: Path, package_reqs: list[str], package_name: str ) -> None: """Appends micro-package requirements to project level requirements.txt""" incoming_reqs = _safe_parse_requirements(package_reqs) @@ -854,8 +855,8 @@ def _append_package_reqs( def _safe_parse_requirements( - requirements: Union[str, Iterable[str]] -) -> Set[pkg_resources.Requirement]: + requirements: str | Iterable[str], +) -> set[pkg_resources.Requirement]: """Safely parse a requirement or set of requirements. This effectively replaces pkg_resources.parse_requirements, which blows up with a ValueError as soon as it encounters a requirement it cannot parse (e.g. `-r requirements.txt`). This way diff --git a/kedro/framework/cli/pipeline.py b/kedro/framework/cli/pipeline.py index 0ed1098805..225ad5fd74 100644 --- a/kedro/framework/cli/pipeline.py +++ b/kedro/framework/cli/pipeline.py @@ -1,9 +1,11 @@ """A collection of CLI commands for working with Kedro pipelines.""" +from __future__ import annotations + import re import shutil from pathlib import Path from textwrap import indent -from typing import List, NamedTuple, Tuple +from typing import NamedTuple import click @@ -173,7 +175,7 @@ def delete_pipeline( ) -def _echo_deletion_warning(message: str, **paths: List[Path]): +def _echo_deletion_warning(message: str, **paths: list[Path]): paths = {key: values for key, values in paths.items() if values} if paths: @@ -281,7 +283,7 @@ def _get_pipeline_artifacts( def _get_artifacts_to_package( project_metadata: ProjectMetadata, module_path: str, env: str -) -> Tuple[Path, Path, Path]: +) -> tuple[Path, Path, Path]: """From existing project, returns in order: source_path, tests_path, config_paths""" package_dir = project_metadata.source_dir / project_metadata.package_name project_conf_path = project_metadata.project_path / settings.CONF_SOURCE diff --git a/kedro/framework/cli/starters.py b/kedro/framework/cli/starters.py index 5e9aeb9960..d7a7015987 100644 --- a/kedro/framework/cli/starters.py +++ b/kedro/framework/cli/starters.py @@ -3,6 +3,8 @@ This module implements commands available from the kedro CLI for creating projects. """ +from __future__ import annotations + import os import re import shutil @@ -11,7 +13,7 @@ from collections import OrderedDict from itertools import groupby from pathlib import Path -from typing import Any, Callable, Dict, List, Optional, Tuple +from typing import Any, Callable import click import yaml @@ -49,8 +51,8 @@ class KedroStarterSpec: # pylint: disable=too-few-public-methods alias: str template_path: str - directory: Optional[str] = None - origin: Optional[str] = field(init=False) + directory: str | None = None + origin: str | None = field(init=False) _OFFICIAL_STARTER_SPECS = [ @@ -90,7 +92,7 @@ class KedroStarterSpec: # pylint: disable=too-few-public-methods # pylint: disable=unused-argument -def _remove_readonly(func: Callable, path: Path, excinfo: Tuple): # pragma: no cover +def _remove_readonly(func: Callable, path: Path, excinfo: tuple): # pragma: no cover """Remove readonly files on Windows See: https://docs.python.org/3/library/shutil.html?highlight=shutil#rmtree-example """ @@ -98,7 +100,7 @@ def _remove_readonly(func: Callable, path: Path, excinfo: Tuple): # pragma: no func(path) -def _get_starters_dict() -> Dict[str, KedroStarterSpec]: +def _get_starters_dict() -> dict[str, KedroStarterSpec]: """This function lists all the starter aliases declared in the core repo and in plugins entry points. @@ -145,10 +147,10 @@ def _get_starters_dict() -> Dict[str, KedroStarterSpec]: def _starter_spec_to_dict( - starter_specs: Dict[str, KedroStarterSpec] -) -> Dict[str, Dict[str, str]]: + starter_specs: dict[str, KedroStarterSpec] +) -> dict[str, dict[str, str]]: """Convert a dictionary of starters spec to a nicely formatted dictionary""" - format_dict: Dict[str, Dict[str, str]] = {} + format_dict: dict[str, dict[str, str]] = {} for alias, spec in starter_specs.items(): format_dict[alias] = {} # Each dictionary represent 1 starter format_dict[alias]["template_path"] = spec.template_path @@ -244,7 +246,7 @@ def list_starters(): starters_dict = _get_starters_dict() # Group all specs by origin as nested dict and sort it. - sorted_starters_dict: Dict[str, Dict[str, KedroStarterSpec]] = { + sorted_starters_dict: dict[str, dict[str, KedroStarterSpec]] = { origin: dict(sorted(starters_dict_by_origin)) for origin, starters_dict_by_origin in groupby( starters_dict.items(), lambda item: item[1].origin @@ -263,7 +265,7 @@ def list_starters(): ) -def _fetch_config_from_file(config_path: str) -> Dict[str, str]: +def _fetch_config_from_file(config_path: str) -> dict[str, str]: """Obtains configuration for a new kedro project non-interactively from a file. Args: @@ -294,10 +296,10 @@ def _fetch_config_from_file(config_path: str) -> Dict[str, str]: def _make_cookiecutter_args( - config: Dict[str, str], + config: dict[str, str], checkout: str, directory: str, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Creates a dictionary of arguments to pass to cookiecutter. Args: @@ -330,7 +332,7 @@ def _make_cookiecutter_args( return cookiecutter_args -def _create_project(template_path: str, cookiecutter_args: Dict[str, Any]): +def _create_project(template_path: str, cookiecutter_args: dict[str, Any]): """Creates a new kedro project using cookiecutter. Args: @@ -416,7 +418,7 @@ def _get_cookiecutter_dir( return Path(cookiecutter_dir) -def _get_prompts_required(cookiecutter_dir: Path) -> Optional[Dict[str, Any]]: +def _get_prompts_required(cookiecutter_dir: Path) -> dict[str, Any] | None: """Finds the information a user must supply according to prompts.yml.""" prompts_yml = cookiecutter_dir / "prompts.yml" if not prompts_yml.is_file(): @@ -432,8 +434,8 @@ def _get_prompts_required(cookiecutter_dir: Path) -> Optional[Dict[str, Any]]: def _fetch_config_from_user_prompts( - prompts: Dict[str, Any], cookiecutter_context: OrderedDict -) -> Dict[str, str]: + prompts: dict[str, Any], cookiecutter_context: OrderedDict +) -> dict[str, str]: """Interactively obtains information from user prompts. Args: @@ -448,7 +450,7 @@ def _fetch_config_from_user_prompts( from cookiecutter.environment import StrictEnvironment from cookiecutter.prompt import read_user_variable, render_variable - config: Dict[str, str] = {} + config: dict[str, str] = {} for variable_name, prompt_dict in prompts.items(): prompt = _Prompt(**prompt_dict) @@ -507,7 +509,7 @@ def validate(self, user_input: str) -> None: raise ValueError(message, self.error_message) -def _get_available_tags(template_path: str) -> List: +def _get_available_tags(template_path: str) -> list: # Not at top level so that kedro CLI works without a working git executable. # pylint: disable=import-outside-toplevel import git @@ -527,7 +529,7 @@ def _get_available_tags(template_path: str) -> List: return sorted(unique_tags) -def _validate_config_file(config: Dict[str, str], prompts: Dict[str, Any]): +def _validate_config_file(config: dict[str, str], prompts: dict[str, Any]): """Checks that the configuration file contains all needed variables. Args: diff --git a/kedro/framework/cli/utils.py b/kedro/framework/cli/utils.py index 1e1cce3f53..bd1c59a2ec 100644 --- a/kedro/framework/cli/utils.py +++ b/kedro/framework/cli/utils.py @@ -1,4 +1,6 @@ """Utilities for use with click.""" +from __future__ import annotations + import difflib import logging import re @@ -14,7 +16,7 @@ from importlib import import_module from itertools import chain from pathlib import Path -from typing import Dict, Iterable, List, Mapping, Sequence, Set, Tuple, Union +from typing import Iterable, Sequence import click import importlib_metadata @@ -39,7 +41,7 @@ logger = logging.getLogger(__name__) -def call(cmd: List[str], **kwargs): # pragma: no cover +def call(cmd: list[str], **kwargs): # pragma: no cover """Run a subprocess command and raise if it fails. Args: @@ -110,7 +112,7 @@ def _suggest_cli_command( class CommandCollection(click.CommandCollection): """Modified from the Click one to still run the source groups function.""" - def __init__(self, *groups: Tuple[str, Sequence[click.MultiCommand]]): + def __init__(self, *groups: tuple[str, Sequence[click.MultiCommand]]): self.groups = [ (title, self._merge_same_name_collections(cli_list)) for title, cli_list in groups @@ -137,7 +139,7 @@ def _dedupe_commands(cli_collections: Sequence[click.CommandCollection]): """Deduplicate commands by keeping the ones from the last source in the list. """ - seen_names: Set[str] = set() + seen_names: set[str] = set() for cli_collection in reversed(cli_collections): for cmd_group in reversed(cli_collection.sources): cmd_group.commands = { # type: ignore @@ -157,8 +159,8 @@ def _dedupe_commands(cli_collections: Sequence[click.CommandCollection]): @staticmethod def _merge_same_name_collections(groups: Sequence[click.MultiCommand]): - named_groups: Mapping[str, List[click.MultiCommand]] = defaultdict(list) - helps: Mapping[str, list] = defaultdict(list) + named_groups: defaultdict[str, list[click.MultiCommand]] = defaultdict(list) + helps: defaultdict[str, list] = defaultdict(list) for group in groups: named_groups[group.name].append(group) if group.help: @@ -176,7 +178,7 @@ def _merge_same_name_collections(groups: Sequence[click.MultiCommand]): if cli_list ] - def resolve_command(self, ctx: click.core.Context, args: List): + def resolve_command(self, ctx: click.core.Context, args: list): try: return super().resolve_command(ctx, args) except click.exceptions.UsageError as exc: @@ -199,7 +201,7 @@ def format_commands( group.format_commands(ctx, formatter) -def get_pkg_version(reqs_path: (Union[str, Path]), package_name: str) -> str: +def get_pkg_version(reqs_path: (str | Path), package_name: str) -> str: """Get package version from requirements.txt. Args: @@ -295,7 +297,7 @@ def split_string(ctx, param, value): # pylint: disable=unused-argument # pylint: disable=unused-argument,missing-param-doc,missing-type-doc -def split_node_names(ctx, param, to_split: str) -> List[str]: +def split_node_names(ctx, param, to_split: str) -> list[str]: """Split string by comma, ignoring commas enclosed by square parentheses. This avoids splitting the string of nodes names on commas included in default node names, which have the pattern @@ -417,7 +419,7 @@ def _config_file_callback(ctx, param, value): # pylint: disable=unused-argument return value -def _reformat_load_versions(ctx, param, value) -> Dict[str, str]: +def _reformat_load_versions(ctx, param, value) -> dict[str, str]: """Reformat data structure from tuple to dictionary for `load-version`, e.g.: ('dataset1:time1', 'dataset2:time2') -> {"dataset1": "time1", "dataset2": "time2"}. """ @@ -475,7 +477,7 @@ def _split_load_versions(ctx, param, value): return _reformat_load_versions(ctx, param, lv_tuple) if value else {} -def _get_values_as_tuple(values: Iterable[str]) -> Tuple[str, ...]: +def _get_values_as_tuple(values: Iterable[str]) -> tuple[str, ...]: return tuple(chain.from_iterable(value.split(",") for value in values)) diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index 8681e0bafc..699da24329 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -1,9 +1,10 @@ """This module provides context for Kedro project.""" +from __future__ import annotations import logging from copy import deepcopy from pathlib import Path, PurePosixPath, PureWindowsPath -from typing import Any, Dict, Optional, Union +from typing import Any from urllib.parse import urlparse from warnings import warn @@ -52,8 +53,8 @@ def _is_relative_path(path_string: str) -> bool: def _convert_paths_to_absolute_posix( - project_path: Path, conf_dictionary: Dict[str, Any] -) -> Dict[str, Any]: + project_path: Path, conf_dictionary: dict[str, Any] +) -> dict[str, Any]: """Turn all relative paths inside ``conf_dictionary`` into absolute paths by appending them to ``project_path`` and convert absolute Windows paths to POSIX format. This is a hack to make sure that we don't have to change user's working directory for logging and datasets to @@ -143,7 +144,7 @@ def _find_conflicts(): ) -def _update_nested_dict(old_dict: Dict[Any, Any], new_dict: Dict[Any, Any]) -> None: +def _update_nested_dict(old_dict: dict[Any, Any], new_dict: dict[Any, Any]) -> None: """Update a nested dict with values of new_dict. Args: @@ -171,11 +172,11 @@ class KedroContext: def __init__( self, package_name: str, - project_path: Union[Path, str], + project_path: Path | str, config_loader: ConfigLoader, hook_manager: PluginManager, env: str = None, - extra_params: Dict[str, Any] = None, + extra_params: dict[str, Any] = None, ): # pylint: disable=too-many-arguments """Create a context object by providing the root of a Kedro project and the environment configuration subfolders @@ -204,7 +205,7 @@ def __init__( self._hook_manager = hook_manager @property # type: ignore - def env(self) -> Optional[str]: + def env(self) -> str | None: """Property for the current Kedro environment. Returns: @@ -236,7 +237,7 @@ def catalog(self) -> DataCatalog: return self._get_catalog() @property - def params(self) -> Dict[str, Any]: + def params(self) -> dict[str, Any]: """Read-only property referring to Kedro's parameters for this context. Returns: @@ -265,7 +266,7 @@ def config_loader(self): def _get_catalog( self, save_version: str = None, - load_versions: Dict[str, str] = None, + load_versions: dict[str, str] = None, ) -> DataCatalog: """A hook for changing the creation of a DataCatalog instance. @@ -305,7 +306,7 @@ def _get_catalog( ) return catalog - def _get_feed_dict(self) -> Dict[str, Any]: + def _get_feed_dict(self) -> dict[str, Any]: """Get parameters and return the feed dictionary.""" params = self.params feed_dict = {"parameters": params} @@ -334,7 +335,7 @@ def _add_param_to_feed_dict(param_name, param_value): return feed_dict - def _get_config_credentials(self) -> Dict[str, Any]: + def _get_config_credentials(self) -> dict[str, Any]: """Getter for credentials specified in credentials directory.""" try: conf_creds = self.config_loader["credentials"] diff --git a/kedro/framework/hooks/specs.py b/kedro/framework/hooks/specs.py index 8f91452c7d..44acf28d1d 100644 --- a/kedro/framework/hooks/specs.py +++ b/kedro/framework/hooks/specs.py @@ -2,7 +2,9 @@ For more information about these specifications, please visit [Pluggy's documentation](https://pluggy.readthedocs.io/en/stable/#specs) """ -from typing import Any, Dict, Optional +from __future__ import annotations + +from typing import Any from kedro.framework.context import KedroContext from kedro.io import DataCatalog @@ -19,11 +21,11 @@ class DataCatalogSpecs: def after_catalog_created( # pylint: disable=too-many-arguments self, catalog: DataCatalog, - conf_catalog: Dict[str, Any], - conf_creds: Dict[str, Any], - feed_dict: Dict[str, Any], + conf_catalog: dict[str, Any], + conf_creds: dict[str, Any], + feed_dict: dict[str, Any], save_version: str, - load_versions: Dict[str, str], + load_versions: dict[str, str], ) -> None: """Hooks to be invoked after a data catalog is created. It receives the ``catalog`` as well as @@ -50,10 +52,10 @@ def before_node_run( # pylint: disable=too-many-arguments self, node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], + inputs: dict[str, Any], is_async: bool, session_id: str, - ) -> Optional[Dict[str, Any]]: + ) -> dict[str, Any] | None: """Hook to be invoked before a node runs. The arguments received are the same as those used by ``kedro.runner.run_node`` @@ -78,8 +80,8 @@ def after_node_run( # pylint: disable=too-many-arguments self, node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], - outputs: Dict[str, Any], + inputs: dict[str, Any], + outputs: dict[str, Any], is_async: bool, session_id: str, ) -> None: @@ -107,7 +109,7 @@ def on_node_error( # pylint: disable=too-many-arguments error: Exception, node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], + inputs: dict[str, Any], is_async: bool, session_id: str, ): @@ -133,7 +135,7 @@ class PipelineSpecs: @hook_spec def before_pipeline_run( - self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog + self, run_params: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog ) -> None: """Hook to be invoked before a pipeline runs. @@ -165,8 +167,8 @@ def before_pipeline_run( @hook_spec def after_pipeline_run( self, - run_params: Dict[str, Any], - run_result: Dict[str, Any], + run_params: dict[str, Any], + run_result: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog, ) -> None: @@ -202,7 +204,7 @@ def after_pipeline_run( def on_pipeline_error( self, error: Exception, - run_params: Dict[str, Any], + run_params: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog, ): diff --git a/kedro/framework/project/__init__.py b/kedro/framework/project/__init__.py index 96d5b839ba..741c9fe8d2 100644 --- a/kedro/framework/project/__init__.py +++ b/kedro/framework/project/__init__.py @@ -1,6 +1,8 @@ """``kedro.framework.project`` module provides utitlity to configure a Kedro project and access its settings.""" # pylint: disable=redefined-outer-name,unused-argument,global-statement +from __future__ import annotations + import importlib import logging.config import operator @@ -12,7 +14,7 @@ from collections import UserDict from collections.abc import MutableMapping from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any import click import importlib_resources @@ -159,9 +161,9 @@ class _ProjectPipelines(MutableMapping): """ def __init__(self) -> None: - self._pipelines_module: Optional[str] = None + self._pipelines_module: str | None = None self._is_data_loaded = False - self._content: Dict[str, Pipeline] = {} + self._content: dict[str, Pipeline] = {} @staticmethod def _get_pipelines_registry_callable(pipelines_module: str): @@ -185,7 +187,7 @@ def _load_data(self): self._content = project_pipelines self._is_data_loaded = True - def configure(self, pipelines_module: Optional[str] = None) -> None: + def configure(self, pipelines_module: str | None = None) -> None: """Configure the pipelines_module to load the pipelines dictionary. Reset the data loading state so that after every ``configure`` call, data are reloaded. @@ -232,7 +234,7 @@ def __init__(self): rich.traceback.install(suppress=[click, str(Path(sys.executable).parent)]) rich.pretty.install() - def configure(self, logging_config: Dict[str, Any]) -> None: + def configure(self, logging_config: dict[str, Any]) -> None: """Configure project logging using ``logging_config`` (e.g. from project logging.yml). We store this in the UserDict data so that it can be reconfigured in _bootstrap_subprocess. @@ -267,7 +269,7 @@ def configure_project(package_name: str): PACKAGE_NAME = package_name -def configure_logging(logging_config: Dict[str, Any]) -> None: +def configure_logging(logging_config: dict[str, Any]) -> None: """Configure logging according to ``logging_config`` dictionary.""" LOGGING.configure(logging_config) @@ -290,7 +292,7 @@ def validate_settings(): importlib.import_module(f"{PACKAGE_NAME}.settings") -def _create_pipeline(pipeline_module: types.ModuleType) -> Optional[Pipeline]: +def _create_pipeline(pipeline_module: types.ModuleType) -> Pipeline | None: if not hasattr(pipeline_module, "create_pipeline"): warnings.warn( f"The '{pipeline_module.__name__}' module does not " @@ -313,7 +315,7 @@ def _create_pipeline(pipeline_module: types.ModuleType) -> Optional[Pipeline]: return obj -def find_pipelines() -> Dict[str, Pipeline]: +def find_pipelines() -> dict[str, Pipeline]: """Automatically find modular pipelines having a ``create_pipeline`` function. By default, projects created using Kedro 0.18.3 and higher call this function to autoregister pipelines upon creation/addition. diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 658310f816..bd2b062da5 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -1,4 +1,6 @@ """This module implements Kedro session responsible for project lifecycle.""" +from __future__ import annotations + import getpass import logging import logging.config @@ -8,7 +10,7 @@ import traceback from copy import deepcopy from pathlib import Path -from typing import Any, Dict, Iterable, Optional, Union +from typing import Any, Iterable import click from omegaconf import OmegaConf, omegaconf @@ -30,7 +32,7 @@ from kedro.runner import AbstractRunner, SequentialRunner -def _describe_git(project_path: Path) -> Dict[str, Dict[str, Any]]: +def _describe_git(project_path: Path) -> dict[str, dict[str, Any]]: project_path = str(project_path) try: res = subprocess.check_output( @@ -38,7 +40,7 @@ def _describe_git(project_path: Path) -> Dict[str, Dict[str, Any]]: cwd=project_path, stderr=subprocess.STDOUT, ) - git_data: Dict[str, Any] = {"commit_sha": res.decode().strip()} + git_data: dict[str, Any] = {"commit_sha": res.decode().strip()} git_status_res = subprocess.check_output( ["git", "status", "--short"], cwd=project_path, @@ -56,7 +58,7 @@ def _describe_git(project_path: Path) -> Dict[str, Dict[str, Any]]: return {"git": git_data} -def _jsonify_cli_context(ctx: click.core.Context) -> Dict[str, Any]: +def _jsonify_cli_context(ctx: click.core.Context) -> dict[str, Any]: return { "args": ctx.args, "params": ctx.params, @@ -103,9 +105,9 @@ def __init__( self, session_id: str, package_name: str = None, - project_path: Union[Path, str] = None, + project_path: Path | str | None = None, save_on_close: bool = False, - conf_source: Optional[str] = None, + conf_source: str | None = None, ): self._project_path = Path(project_path or Path.cwd()).resolve() self.session_id = session_id @@ -127,12 +129,12 @@ def __init__( def create( # pylint: disable=too-many-arguments cls, package_name: str = None, - project_path: Union[Path, str] = None, + project_path: Path | str | None = None, save_on_close: bool = True, env: str = None, - extra_params: Dict[str, Any] = None, - conf_source: Optional[str] = None, - ) -> "KedroSession": + extra_params: dict[str, Any] = None, + conf_source: str | None = None, + ) -> KedroSession: """Create a new instance of ``KedroSession`` with the session data. Args: @@ -163,7 +165,7 @@ def create( # pylint: disable=too-many-arguments # have to explicitly type session_data otherwise mypy will complain # possibly related to this: https://github.com/python/mypy/issues/1430 - session_data: Dict[str, Any] = { + session_data: dict[str, Any] = { "package_name": session._package_name, "project_path": session._project_path, "session_id": session.session_id, @@ -196,7 +198,7 @@ def create( # pylint: disable=too-many-arguments return session - def _get_logging_config(self) -> Dict[str, Any]: + def _get_logging_config(self) -> dict[str, Any]: logging_config = self._get_config_loader()["logging"] if isinstance(logging_config, omegaconf.DictConfig): logging_config = OmegaConf.to_container(logging_config) @@ -254,7 +256,7 @@ def _logger(self) -> logging.Logger: return logging.getLogger(__name__) @property - def store(self) -> Dict[str, Any]: + def store(self) -> dict[str, Any]: """Return a copy of internal store.""" return dict(self._store) @@ -317,9 +319,9 @@ def run( # pylint: disable=too-many-arguments,too-many-locals to_nodes: Iterable[str] = None, from_inputs: Iterable[str] = None, to_outputs: Iterable[str] = None, - load_versions: Dict[str, str] = None, + load_versions: dict[str, str] = None, namespace: str = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Runs the pipeline with a specified runner. Args: diff --git a/kedro/framework/session/shelvestore.py b/kedro/framework/session/shelvestore.py index dbb4596f1c..3bf34157bc 100644 --- a/kedro/framework/session/shelvestore.py +++ b/kedro/framework/session/shelvestore.py @@ -1,11 +1,13 @@ """This module implements a dict-like store object used to persist Kedro sessions. This module is separated from store.py to ensure it's only imported when exported explicitly. """ +from __future__ import annotations + import dbm import shelve from multiprocessing import Lock from pathlib import Path -from typing import Any, Dict +from typing import Any from .store import BaseSessionStore @@ -20,9 +22,9 @@ class ShelveStore(BaseSessionStore): def _location(self) -> Path: return Path(self._path).expanduser().resolve() / self._session_id / "store" - def read(self) -> Dict[str, Any]: + def read(self) -> dict[str, Any]: """Read the data from disk using `shelve` package.""" - data: Dict[str, Any] = {} + data: dict[str, Any] = {} try: with shelve.open(str(self._location), flag="r") as _sh: # nosec data = dict(_sh) diff --git a/kedro/framework/session/store.py b/kedro/framework/session/store.py index 847517aadc..6aee727528 100644 --- a/kedro/framework/session/store.py +++ b/kedro/framework/session/store.py @@ -1,7 +1,9 @@ """This module implements a dict-like store object used to persist Kedro sessions.""" +from __future__ import annotations + import logging from collections import UserDict -from typing import Any, Dict +from typing import Any class BaseSessionStore(UserDict): @@ -19,7 +21,7 @@ def __init__(self, path: str, session_id: str): def _logger(self) -> logging.Logger: return logging.getLogger(__name__) - def read(self) -> Dict[str, Any]: + def read(self) -> dict[str, Any]: """Read the data from the session store. Returns: diff --git a/kedro/io/cached_dataset.py b/kedro/io/cached_dataset.py index 3a54727f34..c63241eecb 100644 --- a/kedro/io/cached_dataset.py +++ b/kedro/io/cached_dataset.py @@ -2,8 +2,10 @@ This module contains ``CachedDataSet``, a dataset wrapper which caches in memory the data saved, so that the user avoids io operations with slow storage media """ +from __future__ import annotations + import logging -from typing import Any, Dict, Union +from typing import Any from kedro.io.core import VERSIONED_FLAG_KEY, AbstractDataSet, Version from kedro.io.memory_dataset import MemoryDataSet @@ -34,7 +36,7 @@ class as shown above. def __init__( self, - dataset: Union[AbstractDataSet, Dict], + dataset: AbstractDataSet | dict, version: Version = None, copy_mode: str = None, ): @@ -84,7 +86,7 @@ def _from_config(config, version): ) return AbstractDataSet.from_config("_cached", config) - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: return { "dataset": self._dataset._describe(), # pylint: disable=protected-access "cache": self._cache._describe(), # pylint: disable=protected-access diff --git a/kedro/io/core.py b/kedro/io/core.py index d467e346ee..cdcb8a6219 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -1,6 +1,7 @@ """This module provides a set of classes which underpin the data loading and saving functionality provided by ``kedro.io``. """ +from __future__ import annotations import abc import copy @@ -13,7 +14,7 @@ from glob import iglob from operator import attrgetter from pathlib import Path, PurePath, PurePosixPath -from typing import Any, Callable, Dict, Generic, List, Optional, Tuple, Type, TypeVar +from typing import Any, Callable, Generic, TypeVar from urllib.parse import urlsplit from cachetools import Cache, cachedmethod @@ -115,12 +116,12 @@ class AbstractDataSet(abc.ABC, Generic[_DI, _DO]): @classmethod def from_config( - cls: Type, + cls: type, name: str, - config: Dict[str, Any], + config: dict[str, Any], load_version: str = None, save_version: str = None, - ) -> "AbstractDataSet": + ) -> AbstractDataSet: """Create a data set instance using the configuration provided. Args: @@ -262,7 +263,7 @@ def _save(self, data: _DI) -> None: ) @abc.abstractmethod - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: raise NotImplementedError( f"'{self.__class__.__name__}' is a subclass of AbstractDataSet and " f"it must implement the '_describe' method" @@ -312,7 +313,7 @@ def release(self) -> None: def _release(self) -> None: pass - def _copy(self, **overwrite_params) -> "AbstractDataSet": + def _copy(self, **overwrite_params) -> AbstractDataSet: dataset_copy = copy.deepcopy(self) for name, value in overwrite_params.items(): setattr(dataset_copy, name, value) @@ -354,8 +355,8 @@ class Version(namedtuple("Version", ["load", "save"])): def parse_dataset_definition( - config: Dict[str, Any], load_version: str = None, save_version: str = None -) -> Tuple[Type[AbstractDataSet], Dict[str, Any]]: + config: dict[str, Any], load_version: str = None, save_version: str = None +) -> tuple[type[AbstractDataSet], dict[str, Any]]: """Parse and instantiate a dataset class using the configuration provided. Args: @@ -424,7 +425,7 @@ def parse_dataset_definition( return class_obj, config -def _load_obj(class_path: str) -> Optional[object]: +def _load_obj(class_path: str) -> object | None: mod_path, _, class_name = class_path.rpartition(".") try: available_classes = load_obj(f"{mod_path}.__all__") @@ -506,9 +507,9 @@ class AbstractVersionedDataSet(AbstractDataSet[_DI, _DO], abc.ABC): def __init__( self, filepath: PurePosixPath, - version: Optional[Version], + version: Version | None, exists_function: Callable[[str], bool] = None, - glob_function: Callable[[str], List[str]] = None, + glob_function: Callable[[str], list[str]] = None, ): """Creates a new instance of ``AbstractVersionedDataSet``. @@ -560,7 +561,7 @@ def _fetch_latest_save_version(self) -> str: # pylint: disable=no-self-use """Generate and cache the current save version""" return generate_timestamp() - def resolve_load_version(self) -> Optional[str]: + def resolve_load_version(self) -> str | None: """Compute the version the dataset should be loaded with.""" if not self._version: return None @@ -576,7 +577,7 @@ def _get_load_path(self) -> PurePosixPath: load_version = self.resolve_load_version() return self._get_versioned_path(load_version) # type: ignore - def resolve_save_version(self) -> Optional[str]: + def resolve_save_version(self) -> str | None: """Compute the version the dataset should be saved with.""" if not self._version: return None @@ -659,7 +660,7 @@ def _release(self) -> None: self._version_cache.clear() -def _parse_filepath(filepath: str) -> Dict[str, str]: +def _parse_filepath(filepath: str) -> dict[str, str]: """Split filepath on protocol and path. Based on `fsspec.utils.infer_storage_options`. Args: @@ -700,7 +701,7 @@ def _parse_filepath(filepath: str) -> Dict[str, str]: return options -def get_protocol_and_path(filepath: str, version: Version = None) -> Tuple[str, str]: +def get_protocol_and_path(filepath: str, version: Version = None) -> tuple[str, str]: """Parses filepath on protocol and path. Args: diff --git a/kedro/io/data_catalog.py b/kedro/io/data_catalog.py index 8014a02edd..57c31682c0 100644 --- a/kedro/io/data_catalog.py +++ b/kedro/io/data_catalog.py @@ -4,12 +4,14 @@ sets. Then it will act as a single point of reference for your calls, relaying load and save functions to the underlying data sets. """ +from __future__ import annotations + import copy import difflib import logging import re from collections import defaultdict -from typing import Any, Dict, List, Optional, Set, Type, Union +from typing import Any from kedro.io.core import ( AbstractDataSet, @@ -28,8 +30,8 @@ def _get_credentials( - credentials_name: str, credentials: Dict[str, Any] -) -> Dict[str, Any]: + credentials_name: str, credentials: dict[str, Any] +) -> dict[str, Any]: """Return a set of credentials from the provided credentials dict. Args: @@ -56,8 +58,8 @@ def _get_credentials( def _resolve_credentials( - config: Dict[str, Any], credentials: Dict[str, Any] -) -> Dict[str, Any]: + config: dict[str, Any], credentials: dict[str, Any] +) -> dict[str, Any]: """Return the dataset configuration where credentials are resolved using credentials dictionary provided. @@ -97,7 +99,7 @@ class _FrozenDatasets: def __init__( self, - *datasets_collections: Union["_FrozenDatasets", Dict[str, AbstractDataSet]], + *datasets_collections: _FrozenDatasets | dict[str, AbstractDataSet], ): """Return a _FrozenDatasets instance from some datasets collections. Each collection could either be another _FrozenDatasets or a dictionary. @@ -136,9 +138,9 @@ class DataCatalog: def __init__( self, - data_sets: Dict[str, AbstractDataSet] = None, - feed_dict: Dict[str, Any] = None, - layers: Dict[str, Set[str]] = None, + data_sets: dict[str, AbstractDataSet] = None, + feed_dict: dict[str, Any] = None, + layers: dict[str, set[str]] = None, ) -> None: """``DataCatalog`` stores instances of ``AbstractDataSet`` implementations to provide ``load`` and ``save`` capabilities from @@ -179,12 +181,12 @@ def _logger(self): @classmethod def from_config( - cls: Type, - catalog: Optional[Dict[str, Dict[str, Any]]], - credentials: Dict[str, Dict[str, Any]] = None, - load_versions: Dict[str, str] = None, + cls: type, + catalog: dict[str, dict[str, Any]] | None, + credentials: dict[str, dict[str, Any]] = None, + load_versions: dict[str, str] = None, save_version: str = None, - ) -> "DataCatalog": + ) -> DataCatalog: """Create a ``DataCatalog`` instance from configuration. This is a factory method used to provide developers with a way to instantiate ``DataCatalog`` with configuration parsed from configuration files. @@ -267,7 +269,7 @@ class to be loaded is specified with the key ``type`` and their f"are not found in the catalog." ) - layers: Dict[str, Set[str]] = defaultdict(set) + layers: dict[str, set[str]] = defaultdict(set) for ds_name, ds_config in catalog.items(): ds_layer = ds_config.pop("layer", None) if ds_layer is not None: @@ -453,7 +455,7 @@ def add( self.datasets = _FrozenDatasets(self.datasets, {data_set_name: data_set}) def add_all( - self, data_sets: Dict[str, AbstractDataSet], replace: bool = False + self, data_sets: dict[str, AbstractDataSet], replace: bool = False ) -> None: """Adds a group of new data sets to the ``DataCatalog``. @@ -487,7 +489,7 @@ def add_all( for name, data_set in data_sets.items(): self.add(name, data_set, replace) - def add_feed_dict(self, feed_dict: Dict[str, Any], replace: bool = False) -> None: + def add_feed_dict(self, feed_dict: dict[str, Any], replace: bool = False) -> None: """Adds instances of ``MemoryDataSet``, containing the data provided through feed_dict. @@ -520,7 +522,7 @@ def add_feed_dict(self, feed_dict: Dict[str, Any], replace: bool = False) -> Non self.add(data_set_name, data_set, replace) - def list(self, regex_search: Optional[str] = None) -> List[str]: + def list(self, regex_search: str | None = None) -> list[str]: """ List of all ``DataSet`` names registered in the catalog. This can be filtered by providing an optional regular expression @@ -565,7 +567,7 @@ def list(self, regex_search: Optional[str] = None) -> List[str]: ) from exc return [dset_name for dset_name in self._data_sets if pattern.search(dset_name)] - def shallow_copy(self) -> "DataCatalog": + def shallow_copy(self) -> DataCatalog: """Returns a shallow copy of the current object. Returns: diff --git a/kedro/io/lambda_dataset.py b/kedro/io/lambda_dataset.py index 1cbf5ef882..2bf1e65ac4 100644 --- a/kedro/io/lambda_dataset.py +++ b/kedro/io/lambda_dataset.py @@ -2,7 +2,9 @@ providing custom load, save, and exists methods without extending ``AbstractDataSet``. """ -from typing import Any, Callable, Dict, Optional +from __future__ import annotations + +from typing import Any, Callable from kedro.io.core import AbstractDataSet, DataSetError @@ -29,7 +31,7 @@ class LambdaDataSet(AbstractDataSet): >>> data_set = LambdaDataSet(load, None) """ - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: def _to_str(func): if not func: return None @@ -76,8 +78,8 @@ def _release(self) -> None: def __init__( self, - load: Optional[Callable[[], Any]], - save: Optional[Callable[[Any], None]], + load: Callable[[], Any] | None, + save: Callable[[Any], None] | None, exists: Callable[[], bool] = None, release: Callable[[], None] = None, ): diff --git a/kedro/io/memory_dataset.py b/kedro/io/memory_dataset.py index 1e5df7b5dc..05ccef098e 100644 --- a/kedro/io/memory_dataset.py +++ b/kedro/io/memory_dataset.py @@ -1,8 +1,9 @@ """``MemoryDataSet`` is a data set implementation which handles in-memory data. """ +from __future__ import annotations import copy -from typing import Any, Dict +from typing import Any from kedro.io.core import AbstractDataSet, DataSetError @@ -66,7 +67,7 @@ def _exists(self) -> bool: def _release(self) -> None: self._data = _EMPTY - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: if self._data is not _EMPTY: return {"data": f"<{type(self._data).__name__}>"} # the string representation of datasets leaves out __init__ diff --git a/kedro/io/partitioned_dataset.py b/kedro/io/partitioned_dataset.py index fae8cfc7fa..ae5ef7b6e8 100644 --- a/kedro/io/partitioned_dataset.py +++ b/kedro/io/partitioned_dataset.py @@ -1,9 +1,11 @@ """``PartitionedDataSet`` loads and saves partitioned file-like data using the underlying dataset definition. It also uses `fsspec` for filesystem level operations. """ +from __future__ import annotations + import operator from copy import deepcopy -from typing import Any, Callable, Dict, List, Type, Union +from typing import Any, Callable from urllib.parse import urlparse from warnings import warn @@ -132,12 +134,12 @@ class PartitionedDataSet(AbstractDataSet): def __init__( # pylint: disable=too-many-arguments self, path: str, - dataset: Union[str, Type[AbstractDataSet], Dict[str, Any]], + dataset: str | type[AbstractDataSet] | dict[str, Any], filepath_arg: str = "filepath", filename_suffix: str = "", - credentials: Dict[str, Any] = None, - load_args: Dict[str, Any] = None, - fs_args: Dict[str, Any] = None, + credentials: dict[str, Any] = None, + load_args: dict[str, Any] = None, + fs_args: dict[str, Any] = None, overwrite: bool = False, ): """Creates a new instance of ``PartitionedDataSet``. @@ -249,7 +251,7 @@ def _normalized_path(self) -> str: return self._path @cachedmethod(cache=operator.attrgetter("_partition_cache")) - def _list_partitions(self) -> List[str]: + def _list_partitions(self) -> list[str]: return [ path for path in self._filesystem.find(self._normalized_path, **self._load_args) @@ -276,7 +278,7 @@ def _path_to_partition(self, path: str) -> str: path = path[: -len(self._filename_suffix)] return path - def _load(self) -> Dict[str, Callable[[], Any]]: + def _load(self) -> dict[str, Callable[[], Any]]: partitions = {} for partition in self._list_partitions(): @@ -292,7 +294,7 @@ def _load(self) -> Dict[str, Callable[[], Any]]: return partitions - def _save(self, data: Dict[str, Any]) -> None: + def _save(self, data: dict[str, Any]) -> None: if self._overwrite and self._filesystem.exists(self._normalized_path): self._filesystem.rm(self._normalized_path, recursive=True) @@ -307,7 +309,7 @@ def _save(self, data: Dict[str, Any]) -> None: dataset.save(partition_data) self._invalidate_caches() - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: clean_dataset_config = ( {k: v for k, v in self._dataset_config.items() if k != CREDENTIALS_KEY} if isinstance(self._dataset_config, dict) @@ -374,13 +376,13 @@ class IncrementalDataSet(PartitionedDataSet): def __init__( self, path: str, - dataset: Union[str, Type[AbstractDataSet], Dict[str, Any]], - checkpoint: Union[str, Dict[str, Any]] = None, + dataset: str | type[AbstractDataSet] | dict[str, Any], + checkpoint: str | dict[str, Any] | None = None, filepath_arg: str = "filepath", filename_suffix: str = "", - credentials: Dict[str, Any] = None, - load_args: Dict[str, Any] = None, - fs_args: Dict[str, Any] = None, + credentials: dict[str, Any] = None, + load_args: dict[str, Any] = None, + fs_args: dict[str, Any] = None, ): """Creates a new instance of ``IncrementalDataSet``. @@ -451,8 +453,8 @@ def __init__( self._comparison_func = comparison_func def _parse_checkpoint_config( - self, checkpoint_config: Union[str, Dict[str, Any], None] - ) -> Dict[str, Any]: + self, checkpoint_config: str | dict[str, Any] | None + ) -> dict[str, Any]: checkpoint_config = deepcopy(checkpoint_config) if isinstance(checkpoint_config, str): checkpoint_config = {"force_checkpoint": checkpoint_config} @@ -483,7 +485,7 @@ def _parse_checkpoint_config( return {**default_config, **checkpoint_config} @cachedmethod(cache=operator.attrgetter("_partition_cache")) - def _list_partitions(self) -> List[str]: + def _list_partitions(self) -> list[str]: checkpoint = self._read_checkpoint() checkpoint_path = ( self._filesystem._strip_protocol( # pylint: disable=protected-access @@ -513,7 +515,7 @@ def _checkpoint(self) -> AbstractDataSet: type_, kwargs = parse_dataset_definition(self._checkpoint_config) return type_(**kwargs) # type: ignore - def _read_checkpoint(self) -> Union[str, None]: + def _read_checkpoint(self) -> str | None: if self._force_checkpoint is not None: return self._force_checkpoint try: @@ -521,7 +523,7 @@ def _read_checkpoint(self) -> Union[str, None]: except DataSetError: return None - def _load(self) -> Dict[str, Callable[[], Any]]: + def _load(self) -> dict[str, Callable[[], Any]]: partitions = {} for partition in self._list_partitions(): diff --git a/kedro/ipython/__init__.py b/kedro/ipython/__init__.py index c9daac635b..276a15bc1f 100644 --- a/kedro/ipython/__init__.py +++ b/kedro/ipython/__init__.py @@ -2,10 +2,12 @@ This script creates an IPython extension to load Kedro-related variables in local scope. """ +from __future__ import annotations + import logging import sys from pathlib import Path -from typing import Any, Dict, Optional +from typing import Any from IPython import get_ipython from IPython.core.magic import needs_local_scope, register_line_magic @@ -60,7 +62,7 @@ def load_ipython_extension(ipython): default=None, help=PARAMS_ARG_HELP, ) -def magic_reload_kedro(line: str, local_ns: Dict[str, Any] = None): +def magic_reload_kedro(line: str, local_ns: dict[str, Any] = None): """ The `%reload_kedro` IPython line magic. See https://kedro.readthedocs.io/en/stable/notebooks_and_ipython/kedro_and_notebooks.html#reload-kedro-line-magic # pylint: disable=line-too-long @@ -73,8 +75,8 @@ def magic_reload_kedro(line: str, local_ns: Dict[str, Any] = None): def reload_kedro( path: str = None, env: str = None, - extra_params: Dict[str, Any] = None, - local_namespace: Optional[Dict[str, Any]] = None, + extra_params: dict[str, Any] = None, + local_namespace: dict[str, Any] | None = None, ) -> None: # pragma: no cover """Function that underlies the %reload_kedro Line magic. This should not be imported or run directly but instead invoked through %reload_kedro.""" @@ -111,7 +113,7 @@ def reload_kedro( def _resolve_project_path( - path: Optional[str] = None, local_namespace: Optional[Dict[str, Any]] = None + path: str | None = None, local_namespace: dict[str, Any] | None = None ) -> Path: """ Resolve the project path to use with reload_kedro, updating or adding it diff --git a/kedro/pipeline/modular_pipeline.py b/kedro/pipeline/modular_pipeline.py index fa83e37c1d..b9dbe9b403 100644 --- a/kedro/pipeline/modular_pipeline.py +++ b/kedro/pipeline/modular_pipeline.py @@ -1,6 +1,8 @@ """Helper to integrate modular pipelines into a master pipeline.""" +from __future__ import annotations + import copy -from typing import AbstractSet, Dict, Iterable, List, Set, Union +from typing import AbstractSet, Iterable from kedro.pipeline.node import Node from kedro.pipeline.pipeline import ( @@ -75,16 +77,16 @@ def _validate_datasets_exist( def _get_dataset_names_mapping( - names: Union[str, Set[str], Dict[str, str]] = None -) -> Dict[str, str]: + names: str | set[str] | dict[str, str] | None = None +) -> dict[str, str]: """Take a name or a collection of dataset names and turn it into a mapping from the old dataset names to the provided ones if necessary. Args: names: A dataset name or collection of dataset names. - When str or Set[str] is provided, the listed names will stay + When str or set[str] is provided, the listed names will stay the same as they are named in the provided pipeline. - When Dict[str, str] is provided, current names will be + When dict[str, str] is provided, current names will be mapped to new names in the resultant pipeline. Returns: A dictionary that maps the old dataset names to the provided ones. @@ -92,9 +94,9 @@ def _get_dataset_names_mapping( >>> _get_dataset_names_mapping("dataset_name") {"dataset_name": "dataset_name"} # a str name will stay the same >>> _get_dataset_names_mapping(set(["ds_1", "ds_2"])) - {"ds_1": "ds_1", "ds_2": "ds_2"} # a Set[str] of names will stay the same + {"ds_1": "ds_1", "ds_2": "ds_2"} # a set[str] of names will stay the same >>> _get_dataset_names_mapping({"ds_1": "new_ds_1_name"}) - {"ds_1": "new_ds_1_name"} # a Dict[str, str] of names will map key to value + {"ds_1": "new_ds_1_name"} # a dict[str, str] of names will map key to value """ if names is None: return {} @@ -112,8 +114,8 @@ def _normalize_param_name(name: str) -> str: def _get_param_names_mapping( - names: Union[str, Set[str], Dict[str, str]] = None -) -> Dict[str, str]: + names: str | set[str] | dict[str, str] | None = None +) -> dict[str, str]: """Take a parameter or a collection of parameter names and turn it into a mapping from existing parameter names to new ones if necessary. It follows the same rule as `_get_dataset_names_mapping` and @@ -121,9 +123,9 @@ def _get_param_names_mapping( Args: names: A parameter name or collection of parameter names. - When str or Set[str] is provided, the listed names will stay + When str or set[str] is provided, the listed names will stay the same as they are named in the provided pipeline. - When Dict[str, str] is provided, current names will be + When dict[str, str] is provided, current names will be mapped to new names in the resultant pipeline. Returns: A dictionary that maps the old parameter names to the provided ones. @@ -131,10 +133,10 @@ def _get_param_names_mapping( >>> _get_param_names_mapping("param_name") {"params:param_name": "params:param_name"} # a str name will stay the same >>> _get_param_names_mapping(set(["param_1", "param_2"])) - # a Set[str] of names will stay the same + # a set[str] of names will stay the same {"params:param_1": "params:param_1", "params:param_2": "params:param_2"} >>> _get_param_names_mapping({"param_1": "new_name_for_param_1"}) - # a Dict[str, str] of names will map key to valu + # a dict[str, str] of names will map key to valu {"params:param_1": "params:new_name_for_param_1"} """ params = {} @@ -149,12 +151,12 @@ def _get_param_names_mapping( def pipeline( - pipe: Union[Iterable[Union[Node, Pipeline]], Pipeline], + pipe: Iterable[Node | Pipeline] | Pipeline, *, - inputs: Union[str, Set[str], Dict[str, str]] = None, - outputs: Union[str, Set[str], Dict[str, str]] = None, - parameters: Union[str, Set[str], Dict[str, str]] = None, - tags: Union[str, Iterable[str]] = None, + inputs: str | set[str] | dict[str, str] | None = None, + outputs: str | set[str] | dict[str, str] | None = None, + parameters: str | set[str] | dict[str, str] | None = None, + tags: str | Iterable[str] | None = None, namespace: str = None, ) -> Pipeline: r"""Create a ``Pipeline`` from a collection of nodes and/or ``Pipeline``\s. @@ -167,24 +169,24 @@ def pipeline( inputs: A name or collection of input names to be exposed as connection points to other pipelines upstream. This is optional; if not provided, the pipeline inputs are automatically inferred from the pipeline structure. - When str or Set[str] is provided, the listed input names will stay + When str or set[str] is provided, the listed input names will stay the same as they are named in the provided pipeline. - When Dict[str, str] is provided, current input names will be + When dict[str, str] is provided, current input names will be mapped to new names. Must only refer to the pipeline's free inputs. outputs: A name or collection of names to be exposed as connection points to other pipelines downstream. This is optional; if not provided, the pipeline inputs are automatically inferred from the pipeline structure. - When str or Set[str] is provided, the listed output names will stay + When str or set[str] is provided, the listed output names will stay the same as they are named in the provided pipeline. - When Dict[str, str] is provided, current output names will be + When dict[str, str] is provided, current output names will be mapped to new names. Can refer to both the pipeline's free outputs, as well as intermediate results that need to be exposed. parameters: A name or collection of parameters to namespace. - When str or Set[str] are provided, the listed parameter names will stay + When str or set[str] are provided, the listed parameter names will stay the same as they are named in the provided pipeline. - When Dict[str, str] is provided, current parameter names will be + When dict[str, str] is provided, current parameter names will be mapped to new names. The parameters can be specified without the `params:` prefix. tags: Optional set of tags to be applied to all the pipeline nodes. @@ -257,8 +259,8 @@ def _rename(name: str): return name def _process_dataset_names( - datasets: Union[None, str, List[str], Dict[str, str]] - ) -> Union[None, str, List[str], Dict[str, str]]: + datasets: None | str | list[str] | dict[str, str] + ) -> None | str | list[str] | dict[str, str]: if datasets is None: return None if isinstance(datasets, str): diff --git a/kedro/pipeline/node.py b/kedro/pipeline/node.py index 2f948bb446..e7a3963aba 100644 --- a/kedro/pipeline/node.py +++ b/kedro/pipeline/node.py @@ -1,12 +1,14 @@ """This module provides user-friendly functions for creating nodes as parts of Kedro pipelines. """ +from __future__ import annotations + import copy import inspect import logging import re from collections import Counter -from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Union +from typing import Any, Callable, Iterable from warnings import warn from more_itertools import spy, unzip @@ -20,12 +22,12 @@ class Node: def __init__( self, func: Callable, - inputs: Union[None, str, List[str], Dict[str, str]], - outputs: Union[None, str, List[str], Dict[str, str]], + inputs: None | str | list[str] | dict[str, str], + outputs: None | str | list[str] | dict[str, str], *, name: str = None, - tags: Union[str, Iterable[str]] = None, - confirms: Union[str, List[str]] = None, + tags: str | Iterable[str] | None = None, + confirms: str | list[str] | None = None, namespace: str = None, ): """Create a node in the pipeline by providing a function to be called @@ -37,12 +39,12 @@ def __init__( inputs: The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided - function. When Dict[str, str] is provided, variable names + function. When dict[str, str] is provided, variable names will be mapped to function argument names. outputs: The name or the list of the names of variables used as outputs to the function. The number of names should match the number of outputs returned by the provided function. - When Dict[str, str] is provided, variable names will be mapped + When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns. name: Optional node name to be used when displaying the node in logs or any other visualisations. @@ -175,7 +177,7 @@ def __repr__(self): # pragma: no cover f"{repr(self._name)})" ) - def __call__(self, **kwargs) -> Dict[str, Any]: + def __call__(self, **kwargs) -> dict[str, Any]: return self.run(inputs=kwargs) @property @@ -209,7 +211,7 @@ def func(self, func: Callable): self._func = func @property - def tags(self) -> Set[str]: + def tags(self) -> set[str]: """Return the tags assigned to the node. Returns: @@ -218,7 +220,7 @@ def tags(self) -> Set[str]: """ return set(self._tags) - def tag(self, tags: Union[str, Iterable[str]]) -> "Node": + def tag(self, tags: str | Iterable[str]) -> Node: """Create a new ``Node`` which is an exact copy of the current one, but with more tags added to it. @@ -257,7 +259,7 @@ def short_name(self) -> str: return self._func_name.replace("_", " ").title() @property - def namespace(self) -> Optional[str]: + def namespace(self) -> str | None: """Node's namespace. Returns: @@ -266,7 +268,7 @@ def namespace(self) -> Optional[str]: return self._namespace @property - def inputs(self) -> List[str]: + def inputs(self) -> list[str]: """Return node inputs as a list, in the order required to bind them properly to the node's function. @@ -279,7 +281,7 @@ def inputs(self) -> List[str]: return _to_list(self._inputs) @property - def outputs(self) -> List[str]: + def outputs(self) -> list[str]: """Return node outputs as a list preserving the original order if possible. @@ -290,7 +292,7 @@ def outputs(self) -> List[str]: return _to_list(self._outputs) @property - def confirms(self) -> List[str]: + def confirms(self) -> list[str]: """Return dataset names to confirm as a list. Returns: @@ -298,7 +300,7 @@ def confirms(self) -> List[str]: """ return _to_list(self._confirms) - def run(self, inputs: Dict[str, Any] = None) -> Dict[str, Any]: + def run(self, inputs: dict[str, Any] = None) -> dict[str, Any]: """Run this node using the provided inputs and return its results in a dictionary. @@ -354,7 +356,7 @@ def run(self, inputs: Dict[str, Any] = None) -> Dict[str, Any]: self._logger.error("Node '%s' failed with error: \n%s", str(self), str(exc)) raise exc - def _run_with_no_inputs(self, inputs: Dict[str, Any]): + def _run_with_no_inputs(self, inputs: dict[str, Any]): if inputs: raise ValueError( f"Node {str(self)} expected no inputs, " @@ -364,7 +366,7 @@ def _run_with_no_inputs(self, inputs: Dict[str, Any]): return self._func() - def _run_with_one_input(self, inputs: Dict[str, Any], node_input: str): + def _run_with_one_input(self, inputs: dict[str, Any], node_input: str): if len(inputs) != 1 or node_input not in inputs: raise ValueError( f"Node {str(self)} expected one input named '{node_input}', " @@ -374,7 +376,7 @@ def _run_with_one_input(self, inputs: Dict[str, Any], node_input: str): return self._func(inputs[node_input]) - def _run_with_list(self, inputs: Dict[str, Any], node_inputs: List[str]): + def _run_with_list(self, inputs: dict[str, Any], node_inputs: list[str]): # Node inputs and provided run inputs should completely overlap if set(node_inputs) != set(inputs.keys()): raise ValueError( @@ -385,7 +387,7 @@ def _run_with_list(self, inputs: Dict[str, Any], node_inputs: List[str]): # Ensure the function gets the inputs in the correct order return self._func(*(inputs[item] for item in node_inputs)) - def _run_with_dict(self, inputs: Dict[str, Any], node_inputs: Dict[str, str]): + def _run_with_dict(self, inputs: dict[str, Any], node_inputs: dict[str, str]): # Node inputs and provided run inputs should completely overlap if set(node_inputs.values()) != set(inputs.keys()): raise ValueError( @@ -495,11 +497,11 @@ def _validate_inputs_dif_than_outputs(self): ) @staticmethod - def _process_inputs_for_bind(inputs: Union[None, str, List[str], Dict[str, str]]): + def _process_inputs_for_bind(inputs: None | str | list[str] | dict[str, str]): # Safeguard that we do not mutate list inputs inputs = copy.copy(inputs) - args: List[str] = [] - kwargs: Dict[str, str] = {} + args: list[str] = [] + kwargs: dict[str, str] = {} if isinstance(inputs, str): args = [inputs] elif isinstance(inputs, list): @@ -518,12 +520,12 @@ def _node_error_message(msg) -> str: def node( func: Callable, - inputs: Union[None, str, List[str], Dict[str, str]], - outputs: Union[None, str, List[str], Dict[str, str]], + inputs: None | str | list[str] | dict[str, str], + outputs: None | str | list[str] | dict[str, str], *, name: str = None, - tags: Union[str, Iterable[str]] = None, - confirms: Union[str, List[str]] = None, + tags: str | Iterable[str] | None = None, + confirms: str | list[str] | None = None, namespace: str = None, ) -> Node: """Create a node in the pipeline by providing a function to be called @@ -535,11 +537,11 @@ def node( inputs: The name or the list of the names of variables used as inputs to the function. The number of names should match the number of arguments in the definition of the provided function. When - Dict[str, str] is provided, variable names will be mapped to + dict[str, str] is provided, variable names will be mapped to function argument names. outputs: The name or the list of the names of variables used as outputs to the function. The number of names should match the number of - outputs returned by the provided function. When Dict[str, str] + outputs returned by the provided function. When dict[str, str] is provided, variable names will be mapped to the named outputs the function returns. name: Optional node name to be used when displaying the node in logs or @@ -562,7 +564,7 @@ def node( >>> import numpy as np >>> >>> def clean_data(cars: pd.DataFrame, - >>> boats: pd.DataFrame) -> Dict[str, pd.DataFrame]: + >>> boats: pd.DataFrame) -> dict[str, pd.DataFrame]: >>> return dict(cars_df=cars.dropna(), boats_df=boats.dropna()) >>> >>> def halve_dataframe(data: pd.DataFrame) -> List[pd.DataFrame]: @@ -592,7 +594,7 @@ def node( ) -def _dict_inputs_to_list(func: Callable[[Any], Any], inputs: Dict[str, str]): +def _dict_inputs_to_list(func: Callable[[Any], Any], inputs: dict[str, str]): """Convert a dict representation of the node inputs to a list, ensuring the appropriate order for binding them to the node's function. """ @@ -600,11 +602,11 @@ def _dict_inputs_to_list(func: Callable[[Any], Any], inputs: Dict[str, str]): return [*sig.args, *sig.kwargs.values()] -def _to_list(element: Union[None, str, Iterable[str], Dict[str, str]]) -> List[str]: +def _to_list(element: None | str | Iterable[str] | dict[str, str]) -> list[str]: """Make a list out of node inputs/outputs. Returns: - List[str]: Node input/output names as a list to standardise. + list[str]: Node input/output names as a list to standardise. """ if element is None: diff --git a/kedro/pipeline/pipeline.py b/kedro/pipeline/pipeline.py index edc59bd153..5ce57bac35 100644 --- a/kedro/pipeline/pipeline.py +++ b/kedro/pipeline/pipeline.py @@ -3,11 +3,13 @@ offers quick access to input dependencies, produced outputs and execution order. """ +from __future__ import annotations + import copy import json from collections import Counter, defaultdict from itertools import chain -from typing import Dict, Iterable, List, Set, Tuple, Union +from typing import Iterable from toposort import CircularDependencyError as ToposortCircleError from toposort import toposort @@ -18,7 +20,7 @@ TRANSCODING_SEPARATOR = "@" -def _transcode_split(element: str) -> Tuple[str, str]: +def _transcode_split(element: str) -> tuple[str, str]: """Split the name by the transcoding separator. If the transcoding part is missing, empty string will be put in. @@ -77,9 +79,9 @@ class Pipeline: # pylint: disable=too-many-public-methods def __init__( self, - nodes: Iterable[Union[Node, "Pipeline"]], + nodes: Iterable[Node | Pipeline], *, - tags: Union[str, Iterable[str]] = None, + tags: str | Iterable[str] | None = None, ): """Initialise ``Pipeline`` with a list of ``Node`` instances. @@ -150,13 +152,13 @@ def __init__( _validate_unique_confirms(nodes) # input -> nodes with input - self._nodes_by_input: Dict[str, Set[Node]] = defaultdict(set) + self._nodes_by_input: dict[str, set[Node]] = defaultdict(set) for node in nodes: for input_ in node.inputs: self._nodes_by_input[_strip_transcoding(input_)].add(node) # output -> node with output - self._nodes_by_output: Dict[str, Node] = {} + self._nodes_by_output: dict[str, Node] = {} for node in nodes: for output in node.outputs: self._nodes_by_output[_strip_transcoding(output)] = node @@ -201,7 +203,7 @@ def __or__(self, other): return NotImplemented return Pipeline(set(self.nodes + other.nodes)) - def all_inputs(self) -> Set[str]: + def all_inputs(self) -> set[str]: """All inputs for all nodes in the pipeline. Returns: @@ -210,7 +212,7 @@ def all_inputs(self) -> Set[str]: """ return set.union(set(), *(node.inputs for node in self.nodes)) - def all_outputs(self) -> Set[str]: + def all_outputs(self) -> set[str]: """All outputs of all nodes in the pipeline. Returns: @@ -219,13 +221,13 @@ def all_outputs(self) -> Set[str]: """ return set.union(set(), *(node.outputs for node in self.nodes)) - def _remove_intermediates(self, datasets: Set[str]) -> Set[str]: + def _remove_intermediates(self, datasets: set[str]) -> set[str]: intermediate = {_strip_transcoding(i) for i in self.all_inputs()} & { _strip_transcoding(o) for o in self.all_outputs() } return {d for d in datasets if _strip_transcoding(d) not in intermediate} - def inputs(self) -> Set[str]: + def inputs(self) -> set[str]: """The names of free inputs that must be provided at runtime so that the pipeline is runnable. Does not include intermediate inputs which are produced and consumed by the inner pipeline nodes. Resolves @@ -237,7 +239,7 @@ def inputs(self) -> Set[str]: """ return self._remove_intermediates(self.all_inputs()) - def outputs(self) -> Set[str]: + def outputs(self) -> set[str]: """The names of outputs produced when the whole pipeline is run. Does not include intermediate outputs that are consumed by other pipeline nodes. Resolves transcoded names where necessary. @@ -248,7 +250,7 @@ def outputs(self) -> Set[str]: """ return self._remove_intermediates(self.all_outputs()) - def data_sets(self) -> Set[str]: + def data_sets(self) -> set[str]: """The names of all data sets used by the ``Pipeline``, including inputs and outputs. @@ -321,7 +323,7 @@ def set_to_string(set_of_strings): ) @property - def node_dependencies(self) -> Dict[Node, Set[Node]]: + def node_dependencies(self) -> dict[Node, set[Node]]: """All dependencies of nodes where the first Node has a direct dependency on the second Node. @@ -329,7 +331,7 @@ def node_dependencies(self) -> Dict[Node, Set[Node]]: Dictionary where keys are nodes and values are sets made up of their parent nodes. Independent nodes have this as empty sets. """ - dependencies: Dict[Node, Set[Node]] = {node: set() for node in self._nodes} + dependencies: dict[Node, set[Node]] = {node: set() for node in self._nodes} for parent in self._nodes: for output in parent.outputs: for child in self._nodes_by_input[_strip_transcoding(output)]: @@ -338,7 +340,7 @@ def node_dependencies(self) -> Dict[Node, Set[Node]]: return dependencies @property - def nodes(self) -> List[Node]: + def nodes(self) -> list[Node]: """Return a list of the pipeline nodes in topological order, i.e. if node A needs to be run before node B, it will appear earlier in the list. @@ -350,7 +352,7 @@ def nodes(self) -> List[Node]: return list(chain.from_iterable(self._topo_sorted_nodes)) @property - def grouped_nodes(self) -> List[List[Node]]: + def grouped_nodes(self) -> list[list[Node]]: """Return a list of the pipeline nodes in topologically ordered groups, i.e. if node A needs to be run before node B, it will appear in an earlier group. @@ -361,7 +363,7 @@ def grouped_nodes(self) -> List[List[Node]]: """ return copy.copy(self._topo_sorted_nodes) - def only_nodes(self, *node_names: str) -> "Pipeline": + def only_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` which will contain only the specified nodes by name. @@ -400,7 +402,7 @@ def only_nodes(self, *node_names: str) -> "Pipeline": nodes = [self._nodes_by_name[name] for name in node_names] return Pipeline(nodes) - def only_nodes_with_namespace(self, node_namespace: str) -> "Pipeline": + def only_nodes_with_namespace(self, node_namespace: str) -> Pipeline: """Creates a new ``Pipeline`` containing only nodes with the specified namespace. @@ -425,8 +427,8 @@ def only_nodes_with_namespace(self, node_namespace: str) -> "Pipeline": return Pipeline(nodes) def _get_nodes_with_inputs_transcode_compatible( - self, datasets: Set[str] - ) -> Set[Node]: + self, datasets: set[str] + ) -> set[Node]: """Retrieves nodes that use the given `datasets` as inputs. If provided a name, but no format, for a transcoded dataset, it includes all nodes that use inputs with that name, otherwise it @@ -456,8 +458,8 @@ def _get_nodes_with_inputs_transcode_compatible( return relevant_nodes def _get_nodes_with_outputs_transcode_compatible( - self, datasets: Set[str] - ) -> Set[Node]: + self, datasets: set[str] + ) -> set[Node]: """Retrieves nodes that output to the given `datasets`. If provided a name, but no format, for a transcoded dataset, it includes the node that outputs to that name, otherwise it matches @@ -488,7 +490,7 @@ def _get_nodes_with_outputs_transcode_compatible( return relevant_nodes - def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": + def only_nodes_with_inputs(self, *inputs: str) -> Pipeline: """Create a new ``Pipeline`` object with the nodes which depend directly on the provided inputs. If provided a name, but no format, for a transcoded input, it @@ -514,7 +516,7 @@ def only_nodes_with_inputs(self, *inputs: str) -> "Pipeline": return Pipeline(nodes) - def from_inputs(self, *inputs: str) -> "Pipeline": + def from_inputs(self, *inputs: str) -> Pipeline: """Create a new ``Pipeline`` object with the nodes which depend directly or transitively on the provided inputs. If provided a name, but no format, for a transcoded input, it @@ -537,7 +539,7 @@ def from_inputs(self, *inputs: str) -> "Pipeline": """ starting = set(inputs) - result: Set[Node] = set() + result: set[Node] = set() next_nodes = self._get_nodes_with_inputs_transcode_compatible(starting) while next_nodes: @@ -554,7 +556,7 @@ def from_inputs(self, *inputs: str) -> "Pipeline": return Pipeline(result) - def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": + def only_nodes_with_outputs(self, *outputs: str) -> Pipeline: """Create a new ``Pipeline`` object with the nodes which are directly required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it @@ -579,7 +581,7 @@ def only_nodes_with_outputs(self, *outputs: str) -> "Pipeline": return Pipeline(nodes) - def to_outputs(self, *outputs: str) -> "Pipeline": + def to_outputs(self, *outputs: str) -> Pipeline: """Create a new ``Pipeline`` object with the nodes which are directly or transitively required to produce the provided outputs. If provided a name, but no format, for a transcoded dataset, it @@ -602,7 +604,7 @@ def to_outputs(self, *outputs: str) -> "Pipeline": """ starting = set(outputs) - result: Set[Node] = set() + result: set[Node] = set() next_nodes = self._get_nodes_with_outputs_transcode_compatible(starting) while next_nodes: @@ -618,7 +620,7 @@ def to_outputs(self, *outputs: str) -> "Pipeline": return Pipeline(result) - def from_nodes(self, *node_names: str) -> "Pipeline": + def from_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` object with the nodes which depend directly or transitively on the provided nodes. @@ -639,7 +641,7 @@ def from_nodes(self, *node_names: str) -> "Pipeline": res += self.from_inputs(*map(_strip_transcoding, res.all_outputs())) return res - def to_nodes(self, *node_names: str) -> "Pipeline": + def to_nodes(self, *node_names: str) -> Pipeline: """Create a new ``Pipeline`` object with the nodes required directly or transitively by the provided nodes. @@ -660,7 +662,7 @@ def to_nodes(self, *node_names: str) -> "Pipeline": res += self.to_outputs(*map(_strip_transcoding, res.all_inputs())) return res - def only_nodes_with_tags(self, *tags: str) -> "Pipeline": + def only_nodes_with_tags(self, *tags: str) -> Pipeline: """Creates a new ``Pipeline`` object with the nodes which contain *any* of the provided tags. The resulting ``Pipeline`` is empty if no tags are provided. @@ -687,7 +689,7 @@ def filter( from_inputs: Iterable[str] = None, to_outputs: Iterable[str] = None, node_namespace: str = None, - ) -> "Pipeline": + ) -> Pipeline: """Creates a new ``Pipeline`` object with the nodes that meet all of the specified filtering conditions. @@ -769,7 +771,7 @@ def filter( ) return filtered_pipeline - def tag(self, tags: Union[str, Iterable[str]]) -> "Pipeline": + def tag(self, tags: str | Iterable[str]) -> Pipeline: """Tags all the nodes in the pipeline. Args: @@ -800,9 +802,9 @@ def to_json(self): return json.dumps(pipeline_versioned) -def _validate_duplicate_nodes(nodes_or_pipes: Iterable[Union[Node, Pipeline]]): - seen_nodes: Set[str] = set() - duplicates: Dict[Union[Pipeline, None], Set[str]] = defaultdict(set) +def _validate_duplicate_nodes(nodes_or_pipes: Iterable[Node | Pipeline]): + seen_nodes: set[str] = set() + duplicates: dict[Pipeline | None, set[str]] = defaultdict(set) def _check_node(node_: Node, pipeline_: Pipeline = None): name = node_.name @@ -835,7 +837,7 @@ def _check_node(node_: Node, pipeline_: Pipeline = None): ) -def _validate_unique_outputs(nodes: List[Node]) -> None: +def _validate_unique_outputs(nodes: list[Node]) -> None: outputs = chain.from_iterable(node.outputs for node in nodes) outputs = map(_strip_transcoding, outputs) duplicates = [key for key, value in Counter(outputs).items() if value > 1] @@ -846,7 +848,7 @@ def _validate_unique_outputs(nodes: List[Node]) -> None: ) -def _validate_unique_confirms(nodes: List[Node]) -> None: +def _validate_unique_confirms(nodes: list[Node]) -> None: confirms = chain.from_iterable(node.confirms for node in nodes) confirms = map(_strip_transcoding, confirms) duplicates = [key for key, value in Counter(confirms).items() if value > 1] @@ -857,7 +859,7 @@ def _validate_unique_confirms(nodes: List[Node]) -> None: ) -def _validate_transcoded_inputs_outputs(nodes: List[Node]) -> None: +def _validate_transcoded_inputs_outputs(nodes: list[Node]) -> None: """Users should not be allowed to refer to a transcoded dataset both with and without the separator. """ @@ -883,7 +885,7 @@ def _validate_transcoded_inputs_outputs(nodes: List[Node]) -> None: ) -def _topologically_sorted(node_dependencies) -> List[List[Node]]: +def _topologically_sorted(node_dependencies) -> list[list[Node]]: """Topologically group and sort (order) nodes such that no node depends on a node that appears in the same or a later group. @@ -897,7 +899,7 @@ def _topologically_sorted(node_dependencies) -> List[List[Node]]: executed on the second step, etc. """ - def _circle_error_message(error_data: Dict[str, str]) -> str: + def _circle_error_message(error_data: dict[str, str]) -> str: """Error messages provided by the toposort library will refer to indices that are used as an intermediate step. This method can be used to replace that message with diff --git a/kedro/runner/parallel_runner.py b/kedro/runner/parallel_runner.py index b16bcf9d72..52a976aadb 100644 --- a/kedro/runner/parallel_runner.py +++ b/kedro/runner/parallel_runner.py @@ -1,6 +1,8 @@ """``ParallelRunner`` is an ``AbstractRunner`` implementation. It can be used to run the ``Pipeline`` in parallel groups formed by toposort. """ +from __future__ import annotations + import multiprocessing import os import pickle @@ -11,7 +13,7 @@ from multiprocessing.managers import BaseProxy, SyncManager # type: ignore from multiprocessing.reduction import ForkingPickler from pickle import PicklingError -from typing import Any, Dict, Iterable, Set +from typing import Any, Iterable from pluggy import PluginManager @@ -78,7 +80,7 @@ class ParallelRunnerManager(SyncManager): ) -def _bootstrap_subprocess(package_name: str, logging_config: Dict[str, Any]): +def _bootstrap_subprocess(package_name: str, logging_config: dict[str, Any]): # pylint: disable=import-outside-toplevel,cyclic-import from kedro.framework.project import configure_logging, configure_project @@ -92,7 +94,7 @@ def _run_node_synchronization( # pylint: disable=too-many-arguments is_async: bool = False, session_id: str = None, package_name: str = None, - logging_config: Dict[str, Any] = None, + logging_config: dict[str, Any] = None, ) -> Node: """Run a single `Node` with inputs from and outputs to the `catalog`. @@ -290,7 +292,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) node_dependencies = pipeline.node_dependencies todo_nodes = set(node_dependencies.keys()) - done_nodes: Set[Node] = set() + done_nodes: set[Node] = set() futures = set() done = None max_workers = self._get_required_workers_count(pipeline) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 7a2444cc6d..2ae8c462d6 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -1,6 +1,7 @@ """``AbstractRunner`` is the base class for all ``Pipeline`` runner implementations. """ +from __future__ import annotations import inspect import itertools as it @@ -14,7 +15,7 @@ as_completed, wait, ) -from typing import Any, Dict, Iterable, Iterator, List, Set +from typing import Any, Iterable, Iterator from more_itertools import interleave from pluggy import PluginManager @@ -50,7 +51,7 @@ def run( catalog: DataCatalog, hook_manager: PluginManager = None, session_id: str = None, - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Run the ``Pipeline`` using the datasets provided by ``catalog`` and save results back to the same objects. @@ -96,7 +97,7 @@ def run( def run_only_missing( self, pipeline: Pipeline, catalog: DataCatalog, hook_manager: PluginManager - ) -> Dict[str, Any]: + ) -> dict[str, Any]: """Run only the missing outputs from the ``Pipeline`` using the datasets provided by ``catalog``, and save results back to the same objects. @@ -214,7 +215,7 @@ def _suggest_resume_scenario( def _find_persistent_ancestors( pipeline: Pipeline, children: Iterable[Node], catalog: DataCatalog -) -> Set[Node]: +) -> set[Node]: """Breadth-first search approach to finding the complete set of persistent ancestors of an iterable of ``Node``s. Persistent ancestors exclusively have persisted ``Dataset``s as inputs. @@ -244,7 +245,7 @@ def _find_persistent_ancestors( return ancestor_nodes_to_run -def _enumerate_parents(pipeline: Pipeline, child: Node) -> List[Node]: +def _enumerate_parents(pipeline: Pipeline, child: Node) -> list[Node]: """For a given ``Node``, returns a list containing the direct parents of that ``Node`` in the given ``Pipeline``. @@ -326,11 +327,11 @@ def run_node( def _collect_inputs_from_hook( node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], + inputs: dict[str, Any], is_async: bool, hook_manager: PluginManager, session_id: str = None, -) -> Dict[str, Any]: +) -> dict[str, Any]: # pylint: disable=too-many-arguments inputs = inputs.copy() # shallow copy to prevent in-place modification by the hook hook_response = hook_manager.hook.before_node_run( @@ -361,11 +362,11 @@ def _collect_inputs_from_hook( def _call_node_run( node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], + inputs: dict[str, Any], is_async: bool, hook_manager: PluginManager, session_id: str = None, -) -> Dict[str, Any]: +) -> dict[str, Any]: # pylint: disable=too-many-arguments try: outputs = node.run(inputs) @@ -454,7 +455,7 @@ def _synchronous_dataset_load(dataset_name: str): return return_ds with ThreadPoolExecutor() as pool: - inputs: Dict[str, Future] = {} + inputs: dict[str, Future] = {} for name in node.inputs: inputs[name] = pool.submit(_synchronous_dataset_load, name) diff --git a/kedro/runner/thread_runner.py b/kedro/runner/thread_runner.py index 9e01517c32..0f1301c77b 100644 --- a/kedro/runner/thread_runner.py +++ b/kedro/runner/thread_runner.py @@ -2,11 +2,12 @@ be used to run the ``Pipeline`` in parallel groups formed by toposort using threads. """ +from __future__ import annotations + import warnings from collections import Counter from concurrent.futures import FIRST_COMPLETED, ThreadPoolExecutor, wait from itertools import chain -from typing import Set from pluggy import PluginManager @@ -103,7 +104,7 @@ def _run( # pylint: disable=too-many-locals,useless-suppression load_counts = Counter(chain.from_iterable(n.inputs for n in nodes)) node_dependencies = pipeline.node_dependencies todo_nodes = set(node_dependencies.keys()) - done_nodes: Set[Node] = set() + done_nodes: set[Node] = set() futures = set() done = None max_workers = self._get_required_workers_count(pipeline) diff --git a/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py b/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py index 2d4272e312..2109a75017 100644 --- a/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py +++ b/kedro/templates/project/{{ cookiecutter.repo_name }}/src/{{ cookiecutter.python_package }}/pipeline_registry.py @@ -1,11 +1,11 @@ """Project pipelines.""" -from typing import Dict +from __future__ import annotations from kedro.framework.project import find_pipelines from kedro.pipeline import Pipeline -def register_pipelines() -> Dict[str, Pipeline]: +def register_pipelines() -> dict[str, Pipeline]: """Register the project's pipelines. Returns: diff --git a/tests/config/test_config.py b/tests/config/test_config.py index dc841d5fd0..110d3de692 100644 --- a/tests/config/test_config.py +++ b/tests/config/test_config.py @@ -1,8 +1,9 @@ +from __future__ import annotations + import configparser import json import re from pathlib import Path -from typing import Dict import pytest import yaml @@ -14,13 +15,13 @@ _BASE_ENV = "base" -def _write_yaml(filepath: Path, config: Dict): +def _write_yaml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) yaml_str = yaml.dump(config) filepath.write_text(yaml_str) -def _write_json(filepath: Path, config: Dict): +def _write_json(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) json_str = json.dumps(config) filepath.write_text(json_str) diff --git a/tests/config/test_omegaconf_config.py b/tests/config/test_omegaconf_config.py index adec03cbb1..a93b284853 100644 --- a/tests/config/test_omegaconf_config.py +++ b/tests/config/test_omegaconf_config.py @@ -1,4 +1,6 @@ # pylint: disable=expression-not-assigned, pointless-statement +from __future__ import annotations + import configparser import json import os @@ -6,7 +8,6 @@ import subprocess import zipfile from pathlib import Path -from typing import Dict import pytest import yaml @@ -20,13 +21,13 @@ _BASE_ENV = "base" -def _write_yaml(filepath: Path, config: Dict): +def _write_yaml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) yaml_str = yaml.dump(config) filepath.write_text(yaml_str) -def _write_json(filepath: Path, config: Dict): +def _write_json(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) json_str = json.dumps(config) filepath.write_text(json_str) diff --git a/tests/config/test_templated_config.py b/tests/config/test_templated_config.py index 2dd4bb6b6c..9a8edbd0d4 100644 --- a/tests/config/test_templated_config.py +++ b/tests/config/test_templated_config.py @@ -1,5 +1,6 @@ +from __future__ import annotations + from pathlib import Path -from typing import Dict import pytest import yaml @@ -11,7 +12,7 @@ _BASE_ENV = "base" -def _write_yaml(filepath: Path, config: Dict): +def _write_yaml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) yaml_str = yaml.dump(config) filepath.write_text(yaml_str) @@ -440,7 +441,7 @@ class TestFormatObject: (["${a}", "X${a}"], {"a": "A"}, ["A", "XA"]), (["${b|D}"], {"a": "A"}, ["D"]), (["${b|abcDEF_.<>/@$%^&!}"], {"a": "A"}, ["abcDEF_.<>/@$%^&!"]), - # Dicts + # dicts ({"key": "${a}"}, {"a": "A"}, {"key": "A"}), ({"${a}": "value"}, {"a": "A"}, {"A": "value"}), ({"${a|D}": "value"}, {}, {"D": "value"}), diff --git a/tests/framework/cli/test_cli_hooks.py b/tests/framework/cli/test_cli_hooks.py index d8fea32b26..0f7866f45f 100644 --- a/tests/framework/cli/test_cli_hooks.py +++ b/tests/framework/cli/test_cli_hooks.py @@ -1,6 +1,7 @@ +from __future__ import annotations + import logging from collections import namedtuple -from typing import List import pytest from click.testing import CliRunner @@ -43,7 +44,7 @@ class FakeCLIHooks: def before_command_run( self, project_metadata: ProjectMetadata, - command_args: List[str], + command_args: list[str], ): print( f"Before command `{' '.join(command_args)}` run for project {project_metadata}" @@ -53,7 +54,7 @@ def before_command_run( def after_command_run( self, project_metadata: ProjectMetadata, - command_args: List[str], + command_args: list[str], exit_code: int, ): print( diff --git a/tests/framework/cli/test_starters.py b/tests/framework/cli/test_starters.py index 341ddfda6c..26fc6ac3e5 100644 --- a/tests/framework/cli/test_starters.py +++ b/tests/framework/cli/test_starters.py @@ -1,9 +1,9 @@ """This module contains unit test for the cli command 'kedro new' """ +from __future__ import annotations import shutil from pathlib import Path -from typing import Dict import pytest import yaml @@ -38,7 +38,7 @@ def mock_cookiecutter(mocker): return mocker.patch("cookiecutter.main.cookiecutter") -def _write_yaml(filepath: Path, config: Dict): +def _write_yaml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) yaml_str = yaml.dump(config) filepath.write_text(yaml_str) diff --git a/tests/framework/context/test_context.py b/tests/framework/context/test_context.py index b9df3dce22..71df1cbad2 100644 --- a/tests/framework/context/test_context.py +++ b/tests/framework/context/test_context.py @@ -1,10 +1,12 @@ +from __future__ import annotations + import configparser import json import logging import re import textwrap from pathlib import Path, PurePath, PurePosixPath, PureWindowsPath -from typing import Any, Dict +from typing import Any import pandas as pd import pytest @@ -39,19 +41,19 @@ class BadCatalog: # pylint: disable=too-few-public-methods """ -def _write_yaml(filepath: Path, config: Dict): +def _write_yaml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) yaml_str = yaml.dump(config) filepath.write_text(yaml_str) -def _write_toml(filepath: Path, config: Dict): +def _write_toml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) toml_str = toml.dumps(config) filepath.write_text(toml_str) -def _write_json(filepath: Path, config: Dict): +def _write_json(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) json_str = json.dumps(config) filepath.write_text(json_str) @@ -373,7 +375,7 @@ def test_convert_paths_raises_error_on_relative_project_path(): ], ) def test_convert_paths_to_absolute_posix_for_all_known_filepath_keys( - project_path: Path, input_conf: Dict[str, Any], expected: Dict[str, Any] + project_path: Path, input_conf: dict[str, Any], expected: dict[str, Any] ): assert _convert_paths_to_absolute_posix(project_path, input_conf) == expected @@ -394,7 +396,7 @@ def test_convert_paths_to_absolute_posix_for_all_known_filepath_keys( ], ) def test_convert_paths_to_absolute_posix_not_changing_non_relative_path( - project_path: Path, input_conf: Dict[str, Any], expected: Dict[str, Any] + project_path: Path, input_conf: dict[str, Any], expected: dict[str, Any] ): assert _convert_paths_to_absolute_posix(project_path, input_conf) == expected @@ -410,7 +412,7 @@ def test_convert_paths_to_absolute_posix_not_changing_non_relative_path( ], ) def test_convert_paths_to_absolute_posix_converts_full_windows_path_to_posix( - project_path: Path, input_conf: Dict[str, Any], expected: Dict[str, Any] + project_path: Path, input_conf: dict[str, Any], expected: dict[str, Any] ): assert _convert_paths_to_absolute_posix(project_path, input_conf) == expected @@ -491,6 +493,6 @@ def test_validate_layers_error(layers, conflicting_datasets, mocker): ), ], ) -def test_update_nested_dict(old_dict: Dict, new_dict: Dict, expected: Dict): +def test_update_nested_dict(old_dict: dict, new_dict: dict, expected: dict): _update_nested_dict(old_dict, new_dict) # _update_nested_dict change dict in place assert old_dict == expected diff --git a/tests/framework/project/test_pipeline_discovery.py b/tests/framework/project/test_pipeline_discovery.py index 1ddd2b70a0..b6f23b69ca 100644 --- a/tests/framework/project/test_pipeline_discovery.py +++ b/tests/framework/project/test_pipeline_discovery.py @@ -98,12 +98,12 @@ def test_find_pipelines_skips_modules_with_unexpected_return_value_type( (pipeline_dir / "__init__.py").write_text( textwrap.dedent( """ - from typing import Dict + from __future__ import annotations from kedro.pipeline import Pipeline, node, pipeline - def create_pipeline(**kwargs) -> Dict[str, Pipeline]: + def create_pipeline(**kwargs) -> dict[str, Pipeline]: return { "pipe1": pipeline([node(lambda: 1, None, "pipe1")]), "pipe2": pipeline([node(lambda: 2, None, "pipe2")]), diff --git a/tests/framework/session/conftest.py b/tests/framework/session/conftest.py index 4c40406e7d..1ac0de6301 100644 --- a/tests/framework/session/conftest.py +++ b/tests/framework/session/conftest.py @@ -1,8 +1,10 @@ +from __future__ import annotations + import logging from logging.handlers import QueueHandler, QueueListener from multiprocessing import Queue from pathlib import Path -from typing import Any, Dict, List +from typing import Any import pandas as pd import pytest @@ -34,20 +36,20 @@ def mock_package_name() -> str: return MOCK_PACKAGE_NAME -def _write_yaml(filepath: Path, config: Dict): +def _write_yaml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) yaml_str = yaml.dump(config) filepath.write_text(yaml_str) -def _write_toml(filepath: Path, config: Dict): +def _write_toml(filepath: Path, config: dict): filepath.parent.mkdir(parents=True, exist_ok=True) toml_str = toml.dumps(config) filepath.write_text(toml_str) def _assert_hook_call_record_has_expected_parameters( - call_record: logging.LogRecord, expected_parameters: List[str] + call_record: logging.LogRecord, expected_parameters: list[str] ): """Assert the given call record has all expected parameters.""" for param in expected_parameters: @@ -168,11 +170,11 @@ class LoggingHooks: def after_catalog_created( self, catalog: DataCatalog, - conf_catalog: Dict[str, Any], - conf_creds: Dict[str, Any], - feed_dict: Dict[str, Any], + conf_catalog: dict[str, Any], + conf_creds: dict[str, Any], + feed_dict: dict[str, Any], save_version: str, - load_versions: Dict[str, str], + load_versions: dict[str, str], ): logger.info( "Catalog created", @@ -191,7 +193,7 @@ def before_node_run( self, node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], + inputs: dict[str, Any], is_async: str, session_id: str, ) -> None: @@ -211,8 +213,8 @@ def after_node_run( self, node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], - outputs: Dict[str, Any], + inputs: dict[str, Any], + outputs: dict[str, Any], is_async: str, session_id: str, ) -> None: @@ -234,7 +236,7 @@ def on_node_error( error: Exception, node: Node, catalog: DataCatalog, - inputs: Dict[str, Any], + inputs: dict[str, Any], is_async: bool, session_id: str, ): @@ -252,7 +254,7 @@ def on_node_error( @hook_impl def before_pipeline_run( - self, run_params: Dict[str, Any], pipeline: Pipeline, catalog: DataCatalog + self, run_params: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog ) -> None: logger.info( "About to run pipeline", @@ -262,8 +264,8 @@ def before_pipeline_run( @hook_impl def after_pipeline_run( self, - run_params: Dict[str, Any], - run_result: Dict[str, Any], + run_params: dict[str, Any], + run_result: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog, ) -> None: @@ -281,7 +283,7 @@ def after_pipeline_run( def on_pipeline_error( self, error: Exception, - run_params: Dict[str, Any], + run_params: dict[str, Any], pipeline: Pipeline, catalog: DataCatalog, ) -> None: diff --git a/tests/io/test_core.py b/tests/io/test_core.py index 001ea412ca..64e0f3d6dd 100644 --- a/tests/io/test_core.py +++ b/tests/io/test_core.py @@ -1,7 +1,9 @@ +from __future__ import annotations + from decimal import Decimal from fractions import Fraction from pathlib import PurePosixPath -from typing import Any, List +from typing import Any import pytest @@ -9,7 +11,7 @@ # List sourced from https://docs.python.org/3/library/stdtypes.html#truth-value-testing. # Excludes None, as None values are not shown in the str representation. -FALSE_BUILTINS: List[Any] = [ +FALSE_BUILTINS: list[Any] = [ False, 0, 0.0, diff --git a/tests/io/test_incremental_dataset.py b/tests/io/test_incremental_dataset.py index 8177135070..bb88c73bde 100644 --- a/tests/io/test_incremental_dataset.py +++ b/tests/io/test_incremental_dataset.py @@ -1,7 +1,9 @@ +from __future__ import annotations + import os import re from pathlib import Path -from typing import Any, Dict +from typing import Any import boto3 import pandas as pd @@ -43,7 +45,7 @@ class DummyDataSet(AbstractDataSet): # pragma: no cover def __init__(self, filepath): pass - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: return {"dummy": True} def _load(self) -> Any: diff --git a/tests/runner/test_parallel_runner.py b/tests/runner/test_parallel_runner.py index 77379b7c3a..62dcc579e1 100644 --- a/tests/runner/test_parallel_runner.py +++ b/tests/runner/test_parallel_runner.py @@ -1,6 +1,8 @@ +from __future__ import annotations + import sys from concurrent.futures.process import ProcessPoolExecutor -from typing import Any, Dict +from typing import Any import pytest @@ -236,7 +238,7 @@ def _release(self) -> None: self.log.append(("release", self.name)) self.value = None - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: return {} diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index d5f2093189..d05dcdeb53 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -1,5 +1,7 @@ +from __future__ import annotations + import re -from typing import Any, Dict +from typing import Any import pandas as pd import pytest @@ -140,7 +142,7 @@ def _release(self) -> None: self.log.append(("release", self.name)) self.value = None - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: return {} diff --git a/tests/runner/test_thread_runner.py b/tests/runner/test_thread_runner.py index c9ebae79bf..5db96b1f46 100644 --- a/tests/runner/test_thread_runner.py +++ b/tests/runner/test_thread_runner.py @@ -1,5 +1,7 @@ +from __future__ import annotations + from concurrent.futures import ThreadPoolExecutor -from typing import Any, Dict +from typing import Any import pytest @@ -126,7 +128,7 @@ def _release(self) -> None: self.log.append(("release", self.name)) self.value = None - def _describe(self) -> Dict[str, Any]: + def _describe(self) -> dict[str, Any]: return {} diff --git a/tools/cli.py b/tools/cli.py index e150631abe..a978fbe919 100644 --- a/tools/cli.py +++ b/tools/cli.py @@ -1,12 +1,14 @@ -from typing import Any, Dict, Union +from __future__ import annotations + +from typing import Any import click def _recurse_cli( - cli_element: Union[click.Command, click.Group, click.CommandCollection], + cli_element: click.Command | click.Group | click.CommandCollection, ctx: click.Context, - io_dict: Dict[str, Any], + io_dict: dict[str, Any], get_help: bool = False, ) -> None: """ @@ -50,13 +52,13 @@ def _recurse_cli( def get_cli_structure( - cli_obj: Union[click.Command, click.Group, click.CommandCollection], + cli_obj: click.Command | click.Group | click.CommandCollection, get_help: bool = False, -) -> Dict[str, Any]: +) -> dict[str, Any]: """Convenience wrapper function for `_recurse_cli` to work within `click.Context` and return a `dict`. """ - output: Dict[str, Any] = dict() + output: dict[str, Any] = dict() with click.Context(cli_obj) as ctx: # type: ignore _recurse_cli(cli_obj, ctx, output, get_help) return output