From 1c6c7292aef145d21cbd64ab8c15c30af46e035b Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 12 Feb 2025 19:15:21 +0000 Subject: [PATCH 01/11] Added rank_catalog_factories to the session Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 3 +-- kedro/framework/session/catalog.py | 13 +++++++++++++ kedro/framework/session/session.py | 7 ++++++- 3 files changed, 20 insertions(+), 3 deletions(-) create mode 100644 kedro/framework/session/catalog.py diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 7f139248d4..243b8e5a0c 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -209,9 +209,8 @@ def _add_missing_datasets_to_catalog(missing_ds: list[str], catalog_path: Path) def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None: """List all dataset factories in the catalog, ranked by priority by which they are matched.""" session = _create_session(metadata.package_name, env=env) - context = session.load_context() + catalog_factories = session.rank_catalog_factories() - catalog_factories = context.catalog.config_resolver.list_patterns() if catalog_factories: click.echo(yaml.dump(catalog_factories)) else: diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py new file mode 100644 index 0000000000..1676c9e0cc --- /dev/null +++ b/kedro/framework/session/catalog.py @@ -0,0 +1,13 @@ +class CatalogCommandsMixin: + context = None + + def rank_catalog_factories(self) -> list[str] | str: + """List all dataset factories in the catalog, ranked by priority + by which they are matched. + """ + + catalog_factories = self.context.catalog.config_resolver.list_patterns() + if catalog_factories: + return catalog_factories + else: + return "There are no dataset factories in the catalog." diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 6e39646185..ff690c49dc 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -23,6 +23,7 @@ settings, validate_settings, ) +from kedro.framework.session.catalog import CatalogCommandsMixin from kedro.io.core import generate_timestamp from kedro.runner import AbstractRunner, SequentialRunner from kedro.utils import _find_kedro_project @@ -78,7 +79,7 @@ class KedroSessionError(Exception): pass -class KedroSession: +class KedroSession(CatalogCommandsMixin): """``KedroSession`` is the object that is responsible for managing the lifecycle of a Kedro run. Use `KedroSession.create()` as a context manager to construct a new KedroSession with session data @@ -128,6 +129,10 @@ def __init__( self._project_path / settings.CONF_SOURCE ) + @property + def context(self): + return self.load_context() + @classmethod def create( cls, From cfe866284c65e5d681b716f091a0a04d905cf1f6 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Wed, 12 Feb 2025 19:18:44 +0000 Subject: [PATCH 02/11] Simplified rank_catalog_factories method Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index 1676c9e0cc..5c66c11709 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,13 +1,9 @@ class CatalogCommandsMixin: context = None - def rank_catalog_factories(self) -> list[str] | str: + def rank_catalog_factories(self) -> list[str]: """List all dataset factories in the catalog, ranked by priority by which they are matched. """ - catalog_factories = self.context.catalog.config_resolver.list_patterns() - if catalog_factories: - return catalog_factories - else: - return "There are no dataset factories in the catalog." + return self.context.catalog.config_resolver.list_patterns() From 52deaee687ef03c8c6e8d26ebcbd3ffd0730ef99 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 13 Feb 2025 12:44:29 +0000 Subject: [PATCH 03/11] Implemented resolve_patterns Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 47 ++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index 5c66c11709..148e318778 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,5 +1,12 @@ +from typing import Any + +from kedro.framework.project import pipelines +from kedro.io import KedroDataCatalog + + class CatalogCommandsMixin: context = None + _logger = None def rank_catalog_factories(self) -> list[str]: """List all dataset factories in the catalog, ranked by priority @@ -7,3 +14,43 @@ def rank_catalog_factories(self) -> list[str]: """ return self.context.catalog.config_resolver.list_patterns() + + def resolve_patterns(self, include_default: bool = False) -> dict[str, Any]: + """Resolve catalog factories against pipeline datasets.""" + catalog = self.context.catalog + + # TODO: remove after moving to new catalog + if not isinstance(catalog, KedroDataCatalog): + self._logger.watring("This method is available for KedroDataCatalog only.") + return {} + + # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 + runtime_pattern = {"{default}": {"type": "MemoryDataset"}} + if include_default: + catalog.config_resolver.add_runtime_patterns(runtime_pattern) + + explicit_datasets = catalog.config_resolver.config + pipeline_datasets = set() + + for pipe in pipelines.keys(): + pl_obj = pipelines.get(pipe) + if pl_obj: + pipeline_datasets.update(pl_obj.datasets()) + + for ds_name in pipeline_datasets: + # TODO: when breaking change replace with is_parameter from kedro/io/core.py + if ( + ds_name in catalog.config_resolver.config + or ds_name.startswith("params:") + or ds_name == "parameters" + ): + continue + + ds_config = catalog.config_resolver.resolve_pattern(ds_name) + if ds_config: + explicit_datasets[ds_name] = ds_config + + if include_default: + catalog.config_resolver.remove_runtime_patterns(runtime_pattern) + + return explicit_datasets From 79d96297d9d163d0f7fb865f92202502926858e7 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 13 Feb 2025 12:53:31 +0000 Subject: [PATCH 04/11] Fixed logger message Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index 148e318778..fd45cda7c1 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -21,7 +21,9 @@ def resolve_patterns(self, include_default: bool = False) -> dict[str, Any]: # TODO: remove after moving to new catalog if not isinstance(catalog, KedroDataCatalog): - self._logger.watring("This method is available for KedroDataCatalog only.") + self._logger.warning( + "This method is available for `KedroDataCatalog` only." + ) return {} # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 From 5ed8b48c20fb12ff6de33bc705b182e9689cd48c Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 13 Feb 2025 14:34:53 +0000 Subject: [PATCH 05/11] Please mypy Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 10 +++++++--- kedro/framework/session/session.py | 2 +- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index fd45cda7c1..e36b0467a9 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,18 +1,22 @@ +import logging from typing import Any +from kedro.framework.context import KedroContext from kedro.framework.project import pipelines from kedro.io import KedroDataCatalog class CatalogCommandsMixin: - context = None - _logger = None + @property + def context(self) -> KedroContext: ... # type: ignore[empty-body] + + @property + def _logger(self) -> logging.Logger: ... # type: ignore[empty-body] def rank_catalog_factories(self) -> list[str]: """List all dataset factories in the catalog, ranked by priority by which they are matched. """ - return self.context.catalog.config_resolver.list_patterns() def resolve_patterns(self, include_default: bool = False) -> dict[str, Any]: diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index ff690c49dc..6c915d014b 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -130,7 +130,7 @@ def __init__( ) @property - def context(self): + def context(self) -> KedroContext: return self.load_context() @classmethod From 5a672c321c7debcbfd6646b2fb231c007b573294 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 13 Feb 2025 17:39:48 +0000 Subject: [PATCH 06/11] Made session retain context Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 2 +- kedro/framework/context/context.py | 14 ++++++++++++-- kedro/framework/session/catalog.py | 19 +++++++++++++++---- kedro/framework/session/session.py | 13 +++++++++---- kedro/io/core.py | 6 ++++-- 5 files changed, 41 insertions(+), 13 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 243b8e5a0c..8c99fa8ff2 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -209,7 +209,7 @@ def _add_missing_datasets_to_catalog(missing_ds: list[str], catalog_path: Path) def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None: """List all dataset factories in the catalog, ranked by priority by which they are matched.""" session = _create_session(metadata.package_name, env=env) - catalog_factories = session.rank_catalog_factories() + catalog_factories = session.list_patterns() if catalog_factories: click.echo(yaml.dump(catalog_factories)) diff --git a/kedro/framework/context/context.py b/kedro/framework/context/context.py index 0b44056374..130733cb9e 100644 --- a/kedro/framework/context/context.py +++ b/kedro/framework/context/context.py @@ -176,9 +176,10 @@ class KedroContext: _extra_params: dict[str, Any] | None = field( init=True, default=None, converter=deepcopy ) + _catalog: CatalogProtocol | None = None @property - def catalog(self) -> CatalogProtocol: + def catalog(self) -> CatalogProtocol | None: """Read-only property referring to Kedro's catalog` for this context. Returns: @@ -187,7 +188,8 @@ def catalog(self) -> CatalogProtocol: KedroContextError: Incorrect catalog registered for the project. """ - return self._get_catalog() + self._catalog = self._catalog or self._get_catalog() + return self._catalog @property def params(self) -> dict[str, Any]: @@ -209,6 +211,14 @@ def params(self) -> dict[str, Any]: return OmegaConf.to_container(params) if OmegaConf.is_config(params) else params # type: ignore[return-value] + def get_catalog( + self, + save_version: str | None = None, + load_versions: dict[str, str] | None = None, + ) -> CatalogProtocol: + self._catalog = self._catalog or self._get_catalog(save_version, load_versions) + return self._catalog + def _get_catalog( self, save_version: str | None = None, diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index e36b0467a9..e3d0a3118b 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -13,7 +13,7 @@ def context(self) -> KedroContext: ... # type: ignore[empty-body] @property def _logger(self) -> logging.Logger: ... # type: ignore[empty-body] - def rank_catalog_factories(self) -> list[str]: + def list_patterns(self) -> list[str]: """List all dataset factories in the catalog, ranked by priority by which they are matched. """ @@ -35,7 +35,6 @@ def resolve_patterns(self, include_default: bool = False) -> dict[str, Any]: if include_default: catalog.config_resolver.add_runtime_patterns(runtime_pattern) - explicit_datasets = catalog.config_resolver.config pipeline_datasets = set() for pipe in pipelines.keys(): @@ -43,10 +42,22 @@ def resolve_patterns(self, include_default: bool = False) -> dict[str, Any]: if pl_obj: pipeline_datasets.update(pl_obj.datasets()) + # We need to include datasets defined in the catalog.yaml and datasets added manually to the catalog + explicit_datasets = {} + for ds_name, ds in catalog.items(): + # TODO: when breaking change replace with is_parameter() from kedro/io/core.py + if ds_name.startswith("params:") or ds_name == "parameters": + continue + + unresolved_config, _ = catalog.config_resolver.unresolve_credentials( + ds_name, ds.to_config() + ) + explicit_datasets[ds_name] = unresolved_config + for ds_name in pipeline_datasets: - # TODO: when breaking change replace with is_parameter from kedro/io/core.py + # TODO: when breaking change replace with is_parameter() from kedro/io/core.py if ( - ds_name in catalog.config_resolver.config + ds_name in explicit_datasets or ds_name.startswith("params:") or ds_name == "parameters" ): diff --git a/kedro/framework/session/session.py b/kedro/framework/session/session.py index 6c915d014b..5e0ff85d3e 100644 --- a/kedro/framework/session/session.py +++ b/kedro/framework/session/session.py @@ -119,6 +119,7 @@ def __init__( self._package_name = package_name self._store = self._init_store() self._run_called = False + self._context = None hook_manager = _create_hook_manager() _register_hooks(hook_manager, settings.HOOKS) @@ -131,7 +132,8 @@ def __init__( @property def context(self) -> KedroContext: - return self.load_context() + self._context = self._context or self.load_context() + return self._context @classmethod def create( @@ -238,6 +240,8 @@ def store(self) -> dict[str, Any]: def load_context(self) -> KedroContext: """An instance of the project context.""" + if self._context: + return self._context env = self.store.get("env") extra_params = self.store.get("extra_params") config_loader = self._get_config_loader() @@ -252,6 +256,8 @@ def load_context(self) -> KedroContext: ) self._hook_manager.hook.after_context_created(context=context) + self._context = context + return context # type: ignore[no-any-return] def _get_config_loader(self) -> AbstractConfigLoader: @@ -343,7 +349,6 @@ def run( # noqa: PLR0913 session_id = self.store["session_id"] save_version = session_id extra_params = self.store.get("extra_params") or {} - context = self.load_context() name = pipeline_name or "__default__" @@ -369,7 +374,7 @@ def run( # noqa: PLR0913 record_data = { "session_id": session_id, "project_path": self._project_path.as_posix(), - "env": context.env, + "env": self.context.env, "kedro_version": kedro_version, "tags": tags, "from_nodes": from_nodes, @@ -384,7 +389,7 @@ def run( # noqa: PLR0913 "runner": getattr(runner, "__name__", str(runner)), } - catalog = context._get_catalog( + catalog = self.context.get_catalog( save_version=save_version, load_versions=load_versions, ) diff --git a/kedro/io/core.py b/kedro/io/core.py index 03e0d4d853..2eb9054814 100644 --- a/kedro/io/core.py +++ b/kedro/io/core.py @@ -252,16 +252,18 @@ def to_config(self) -> dict[str, Any]: return_config[VERSIONED_FLAG_KEY] = cached_ds_return_config.pop( VERSIONED_FLAG_KEY ) - # Pop metadata from configuration + # Pop metadata and data from configuration cached_ds_return_config.pop("metadata", None) + cached_ds_return_config.pop("data", None) return_config["dataset"] = cached_ds_return_config # Set `versioned` key if version present in the dataset if return_config.pop(VERSION_KEY, None): return_config[VERSIONED_FLAG_KEY] = True - # Pop metadata from configuration + # Pop metadata and data from configuration return_config.pop("metadata", None) + return_config.pop("data", None) return return_config From 3473ab341e5b7767d6f4b4bf4ba1a22679fb0b9c Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Fri, 14 Feb 2025 14:55:37 +0000 Subject: [PATCH 07/11] Added catalog to function names Signed-off-by: Elena Khaustova --- kedro/framework/cli/catalog.py | 2 +- kedro/framework/session/catalog.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kedro/framework/cli/catalog.py b/kedro/framework/cli/catalog.py index 8c99fa8ff2..9bedade8d4 100644 --- a/kedro/framework/cli/catalog.py +++ b/kedro/framework/cli/catalog.py @@ -209,7 +209,7 @@ def _add_missing_datasets_to_catalog(missing_ds: list[str], catalog_path: Path) def rank_catalog_factories(metadata: ProjectMetadata, env: str) -> None: """List all dataset factories in the catalog, ranked by priority by which they are matched.""" session = _create_session(metadata.package_name, env=env) - catalog_factories = session.list_patterns() + catalog_factories = session.list_catalog_patterns() if catalog_factories: click.echo(yaml.dump(catalog_factories)) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index e3d0a3118b..26cc5ede4a 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -13,13 +13,13 @@ def context(self) -> KedroContext: ... # type: ignore[empty-body] @property def _logger(self) -> logging.Logger: ... # type: ignore[empty-body] - def list_patterns(self) -> list[str]: + def list_catalog_patterns(self) -> list[str]: """List all dataset factories in the catalog, ranked by priority by which they are matched. """ return self.context.catalog.config_resolver.list_patterns() - def resolve_patterns(self, include_default: bool = False) -> dict[str, Any]: + def resolve_catalog_patterns(self, include_default: bool = False) -> dict[str, Any]: """Resolve catalog factories against pipeline datasets.""" catalog = self.context.catalog From 3be042d94b9106874bb6791a9b5c132c984ed5ab Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Fri, 14 Feb 2025 17:15:17 +0000 Subject: [PATCH 08/11] Maded a stub for list_catalog_datasets Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 75 ++++++++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 3 deletions(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index 26cc5ede4a..fbc371463f 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,8 +1,9 @@ import logging +from collections import defaultdict from typing import Any from kedro.framework.context import KedroContext -from kedro.framework.project import pipelines +from kedro.framework.project import pipelines as _pipelines from kedro.io import KedroDataCatalog @@ -13,6 +14,74 @@ def context(self) -> KedroContext: ... # type: ignore[empty-body] @property def _logger(self) -> logging.Logger: ... # type: ignore[empty-body] + def list_catalog_datasets(self, pipelines: list[str] | None = None) -> dict: + """Show datasets per type.""" + catalog = self.context.catalog + # TODO: remove after moving to new catalog + if not isinstance(catalog, KedroDataCatalog): + self._logger.warning( + "This method is available for `KedroDataCatalog` only." + ) + return {} + + # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 + runtime_pattern = {"{default}": {"type": "MemoryDataset"}} + catalog.config_resolver.add_runtime_patterns(runtime_pattern) + + # title = "Datasets in '{}' pipeline" + # not_mentioned = "Datasets not mentioned in pipeline" + # mentioned = "Datasets mentioned in pipeline" + # factories = "Datasets generated from factories" + + target_pipelines = pipelines or _pipelines.keys() + + result = {} + for pipe in target_pipelines: + pl_obj = _pipelines.get(pipe) + if pl_obj: + pipeline_ds = pl_obj.datasets() + else: + existing_pls = ", ".join(sorted(_pipelines.keys())) + raise ValueError( + f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" + ) + + catalog_ds = set(catalog.keys()) + # unused_ds = catalog_ds - pipeline_ds + default_ds = pipeline_ds - catalog_ds + # used_ds = catalog_ds - unused_ds + + # resolve any factory datasets in the pipeline + factory_ds_by_type = defaultdict(list) + + for ds_name in default_ds: + if catalog.config_resolver.match_pattern(ds_name): + ds_config = catalog.config_resolver.resolve_pattern(ds_name) + factory_ds_by_type[ds_config.get("type", "MemoryDataset")].append( + ds_name + ) + + # default_ds = default_ds - set( + # chain.from_iterable(factory_ds_by_type.values()) + # ) + + # unused_by_type = _map_type_to_datasets(unused_ds, datasets_meta) + # used_by_type = _map_type_to_datasets(used_ds, datasets_meta) + # + # if default_ds: + # used_by_type["MemoryDataset"].extend(default_ds) + # + # data = ( + # (mentioned, dict(used_by_type)), + # (factories, dict(factory_ds_by_type)), + # (not_mentioned, dict(unused_by_type)), + # ) + # result[title.format(pipe)] = {key: value for key, value in data if value} + # + # catalog.config_resolver.remove_runtime_patterns(runtime_pattern) + + return result + def list_catalog_patterns(self) -> list[str]: """List all dataset factories in the catalog, ranked by priority by which they are matched. @@ -37,8 +106,8 @@ def resolve_catalog_patterns(self, include_default: bool = False) -> dict[str, A pipeline_datasets = set() - for pipe in pipelines.keys(): - pl_obj = pipelines.get(pipe) + for pipe in _pipelines.keys(): + pl_obj = _pipelines.get(pipe) if pl_obj: pipeline_datasets.update(pl_obj.datasets()) From 14587843114712f2dd497251d5564ddd44ebe353 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 20 Feb 2025 15:58:52 +0000 Subject: [PATCH 09/11] Implemented list_catalog_datasets Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 66 ++++++++++++++---------------- 1 file changed, 30 insertions(+), 36 deletions(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index fbc371463f..e6af99490b 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,5 +1,4 @@ import logging -from collections import defaultdict from typing import Any from kedro.framework.context import KedroContext @@ -24,14 +23,9 @@ def list_catalog_datasets(self, pipelines: list[str] | None = None) -> dict: ) return {} - # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 - runtime_pattern = {"{default}": {"type": "MemoryDataset"}} - catalog.config_resolver.add_runtime_patterns(runtime_pattern) - - # title = "Datasets in '{}' pipeline" - # not_mentioned = "Datasets not mentioned in pipeline" - # mentioned = "Datasets mentioned in pipeline" - # factories = "Datasets generated from factories" + not_mentioned = "Datasets not mentioned in pipeline" + mentioned = "Datasets mentioned in pipeline" + factories = "Datasets generated from factories" target_pipelines = pipelines or _pipelines.keys() @@ -46,39 +40,35 @@ def list_catalog_datasets(self, pipelines: list[str] | None = None) -> dict: f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" ) + # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 + runtime_pattern = {"{default}": {"type": "MemoryDataset"}} + catalog.config_resolver.add_runtime_patterns(runtime_pattern) + catalog_ds = set(catalog.keys()) - # unused_ds = catalog_ds - pipeline_ds + unused_ds = catalog_ds - pipeline_ds default_ds = pipeline_ds - catalog_ds - # used_ds = catalog_ds - unused_ds + used_ds = catalog_ds - unused_ds - # resolve any factory datasets in the pipeline - factory_ds_by_type = defaultdict(list) + catalog.config_resolver.remove_runtime_patterns(runtime_pattern) + patterns_ds = set() for ds_name in default_ds: if catalog.config_resolver.match_pattern(ds_name): - ds_config = catalog.config_resolver.resolve_pattern(ds_name) - factory_ds_by_type[ds_config.get("type", "MemoryDataset")].append( - ds_name - ) - - # default_ds = default_ds - set( - # chain.from_iterable(factory_ds_by_type.values()) - # ) - - # unused_by_type = _map_type_to_datasets(unused_ds, datasets_meta) - # used_by_type = _map_type_to_datasets(used_ds, datasets_meta) - # - # if default_ds: - # used_by_type["MemoryDataset"].extend(default_ds) - # - # data = ( - # (mentioned, dict(used_by_type)), - # (factories, dict(factory_ds_by_type)), - # (not_mentioned, dict(unused_by_type)), - # ) - # result[title.format(pipe)] = {key: value for key, value in data if value} - # - # catalog.config_resolver.remove_runtime_patterns(runtime_pattern) + patterns_ds.add(ds_name) + + default_ds -= patterns_ds + used_ds.update(default_ds) + + used_ds_by_type = _group_ds_by_type(used_ds, catalog) + patterns_ds_by_type = _group_ds_by_type(patterns_ds, catalog) + unused_ds_by_type = _group_ds_by_type(unused_ds, catalog) + + data = ( + (mentioned, used_ds_by_type), + (factories, patterns_ds_by_type), + (not_mentioned, unused_ds_by_type), + ) + result[pipe] = {key: value for key, value in data if value} return result @@ -140,3 +130,7 @@ def resolve_catalog_patterns(self, include_default: bool = False) -> dict[str, A catalog.config_resolver.remove_runtime_patterns(runtime_pattern) return explicit_datasets + + +def _group_ds_by_type(datasets, catalog) -> dict[str, dict]: + pass From 1cec0781f4d9d67d46c7c59ac8f7208cfc9bb02e Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 20 Feb 2025 17:42:53 +0000 Subject: [PATCH 10/11] Implemented _group_ds_by_type function Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index e6af99490b..f6b5bb7e70 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,9 +1,11 @@ import logging +from collections import defaultdict from typing import Any from kedro.framework.context import KedroContext from kedro.framework.project import pipelines as _pipelines from kedro.io import KedroDataCatalog +from kedro.io.core import TYPE_KEY class CatalogCommandsMixin: @@ -132,5 +134,18 @@ def resolve_catalog_patterns(self, include_default: bool = False) -> dict[str, A return explicit_datasets -def _group_ds_by_type(datasets, catalog) -> dict[str, dict]: - pass +def _group_ds_by_type(datasets: set[str], catalog: KedroDataCatalog) -> dict[str, list]: + mapping = defaultdict(list) + for ds_name in datasets: + # TODO: when breaking change replace with is_parameter() from kedro/io/core.py + if ds_name.startswith("params:") or ds_name == "parameters": + continue + + ds = catalog[ds_name] + unresolved_config, _ = catalog.config_resolver.unresolve_credentials( + ds_name, ds.to_config() + ) + ds_type = unresolved_config.get(TYPE_KEY) + mapping[ds_type].append({ds_name: unresolved_config}) + + return mapping From bf9bfd2513c884939841b74536fbd86aeb94c7f9 Mon Sep 17 00:00:00 2001 From: Elena Khaustova Date: Thu, 20 Feb 2025 19:33:12 +0000 Subject: [PATCH 11/11] Updated list_catalog_datasets Signed-off-by: Elena Khaustova --- kedro/framework/session/catalog.py | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/kedro/framework/session/catalog.py b/kedro/framework/session/catalog.py index f6b5bb7e70..7fe2075534 100644 --- a/kedro/framework/session/catalog.py +++ b/kedro/framework/session/catalog.py @@ -1,11 +1,9 @@ import logging -from collections import defaultdict from typing import Any from kedro.framework.context import KedroContext from kedro.framework.project import pipelines as _pipelines from kedro.io import KedroDataCatalog -from kedro.io.core import TYPE_KEY class CatalogCommandsMixin: @@ -25,6 +23,9 @@ def list_catalog_datasets(self, pipelines: list[str] | None = None) -> dict: ) return {} + # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 + runtime_pattern = {"{default}": {"type": "MemoryDataset"}} + not_mentioned = "Datasets not mentioned in pipeline" mentioned = "Datasets mentioned in pipeline" factories = "Datasets generated from factories" @@ -42,16 +43,11 @@ def list_catalog_datasets(self, pipelines: list[str] | None = None) -> dict: f"'{pipe}' pipeline not found! Existing pipelines: {existing_pls}" ) - # TODO: revise setting default pattern logic based on https://github.com/kedro-org/kedro/issues/4475 - runtime_pattern = {"{default}": {"type": "MemoryDataset"}} - catalog.config_resolver.add_runtime_patterns(runtime_pattern) - catalog_ds = set(catalog.keys()) unused_ds = catalog_ds - pipeline_ds default_ds = pipeline_ds - catalog_ds used_ds = catalog_ds - unused_ds - catalog.config_resolver.remove_runtime_patterns(runtime_pattern) patterns_ds = set() for ds_name in default_ds: @@ -61,10 +57,14 @@ def list_catalog_datasets(self, pipelines: list[str] | None = None) -> dict: default_ds -= patterns_ds used_ds.update(default_ds) + catalog.config_resolver.add_runtime_patterns(runtime_pattern) + used_ds_by_type = _group_ds_by_type(used_ds, catalog) patterns_ds_by_type = _group_ds_by_type(patterns_ds, catalog) unused_ds_by_type = _group_ds_by_type(unused_ds, catalog) + catalog.config_resolver.remove_runtime_patterns(runtime_pattern) + data = ( (mentioned, used_ds_by_type), (factories, patterns_ds_by_type), @@ -134,8 +134,8 @@ def resolve_catalog_patterns(self, include_default: bool = False) -> dict[str, A return explicit_datasets -def _group_ds_by_type(datasets: set[str], catalog: KedroDataCatalog) -> dict[str, list]: - mapping = defaultdict(list) +def _group_ds_by_type(datasets: set[str], catalog: KedroDataCatalog) -> dict[str, dict]: + mapping = {} for ds_name in datasets: # TODO: when breaking change replace with is_parameter() from kedro/io/core.py if ds_name.startswith("params:") or ds_name == "parameters": @@ -145,7 +145,6 @@ def _group_ds_by_type(datasets: set[str], catalog: KedroDataCatalog) -> dict[str unresolved_config, _ = catalog.config_resolver.unresolve_credentials( ds_name, ds.to_config() ) - ds_type = unresolved_config.get(TYPE_KEY) - mapping[ds_type].append({ds_name: unresolved_config}) + mapping[ds_name] = unresolved_config return mapping