From 4f0c6d928c80b45e640d4e29092492860e9c90bf Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Wed, 3 Jan 2024 15:47:45 +0000 Subject: [PATCH 1/6] add memorydataset to free_outputs Signed-off-by: Sajid Alam --- kedro/runner/runner.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 0570f64444..c4039ea10c 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -94,9 +94,13 @@ def run( f"Pipeline input(s) {unsatisfied} not found in the DataCatalog" ) + # Identify MemoryDataset in the catalog + memory_datasets = {ds_name for ds_name, ds in catalog._datasets.items() + if isinstance(ds, MemoryDataset)} + # Check if there's any output datasets that aren't in the catalog and don't match a pattern - # in the catalog. - free_outputs = pipeline.outputs() - set(registered_ds) + # in the catalog and include MemoryDataset. + free_outputs = (pipeline.outputs() - set(registered_ds)) | memory_datasets # Register the default dataset pattern with the catalog catalog = catalog.shallow_copy( From 1981f3dd25b85a4c603f106f5083740c4ad0ba3c Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Wed, 3 Jan 2024 16:47:51 +0000 Subject: [PATCH 2/6] add tests Signed-off-by: Sajid Alam --- kedro/runner/runner.py | 7 ++++-- tests/runner/conftest.py | 10 ++++++++ tests/runner/test_sequential_runner.py | 34 +++++++++++++++++++++++++- 3 files changed, 48 insertions(+), 3 deletions(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index c4039ea10c..8360f8ff37 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -95,8 +95,11 @@ def run( ) # Identify MemoryDataset in the catalog - memory_datasets = {ds_name for ds_name, ds in catalog._datasets.items() - if isinstance(ds, MemoryDataset)} + memory_datasets = { + ds_name + for ds_name, ds in catalog._datasets.items() + if isinstance(ds, MemoryDataset) + } # Check if there's any output datasets that aren't in the catalog and don't match a pattern # in the catalog and include MemoryDataset. diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 0ce581e624..25ca233e97 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -165,3 +165,13 @@ def two_branches_crossed_pipeline(): node(identity, "ds3_B", "ds4_B", name="node4_B"), ] ) + + +@pytest.fixture +def pipeline_with_memory_datasets(): + return pipeline( + [ + node(func=identity, inputs="Input1", outputs="MemOutput1", name="node1"), + node(func=identity, inputs="Input2", outputs="MemOutput2", name="node2"), + ] + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 3ee297c0a7..7b4ee8e809 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -7,7 +7,13 @@ import pytest from kedro.framework.hooks import _create_hook_manager -from kedro.io import AbstractDataset, DataCatalog, DatasetError, LambdaDataset +from kedro.io import ( + AbstractDataset, + DataCatalog, + DatasetError, + LambdaDataset, + MemoryDataset, +) from kedro.pipeline import node from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline from kedro.runner import SequentialRunner @@ -279,3 +285,29 @@ def test_suggest_resume_scenario( hook_manager=_create_hook_manager(), ) assert re.search(expected_pattern, caplog.text) + + +class TestMemoryDatasetBehaviour: + def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): + # Create a catalog with MemoryDataSet entries and inputs for the pipeline + catalog = DataCatalog( + { + "Input1": LambdaDataset(load=lambda: "data1", save=lambda data: None), + "Input2": LambdaDataset(load=lambda: "data2", save=lambda data: None), + "MemOutput1": MemoryDataset(), + "MemOutput2": MemoryDataset(), + } + ) + + # Add a regular dataset to the catalog + catalog.add("RegularOutput", LambdaDataset(None, None, lambda: True)) + + # Run the pipeline + output = SequentialRunner().run(pipeline_with_memory_datasets, catalog) + + # Check that MemoryDataSet outputs are included in the run results + assert "MemOutput1" in output + assert "MemOutput2" in output + assert ( + "RegularOutput" not in output + ) # This output is registered in DataCatalog and so should not be in free outputs From 18824fb97ca7fd4444f8d883b962a2d87f92a148 Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Wed, 3 Jan 2024 16:49:13 +0000 Subject: [PATCH 3/6] Update RELEASE.md Signed-off-by: Sajid Alam --- RELEASE.md | 1 + 1 file changed, 1 insertion(+) diff --git a/RELEASE.md b/RELEASE.md index 6273792449..77d568d36e 100644 --- a/RELEASE.md +++ b/RELEASE.md @@ -6,6 +6,7 @@ * Removed example pipeline requirements when examples are not selected in `tools`. * Allowed modern versions of JupyterLab and Jupyter Notebooks. * Removed setuptools dependency +* `MemoryDataset` entries are now included in free outputs. ## Breaking changes to the API * Added logging about not using async mode in `SequentiallRunner` and `ParallelRunner`. From 0f0dfb6c4737a5c4e036a424897253e407c46034 Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Thu, 4 Jan 2024 14:45:29 +0000 Subject: [PATCH 4/6] changes based on review Signed-off-by: Sajid Alam --- kedro/runner/runner.py | 2 +- tests/runner/conftest.py | 23 +++++++++++++++++++++++ tests/runner/test_sequential_runner.py | 19 +++++++++++++++++++ 3 files changed, 43 insertions(+), 1 deletion(-) diff --git a/kedro/runner/runner.py b/kedro/runner/runner.py index 8360f8ff37..0aa9f07e13 100644 --- a/kedro/runner/runner.py +++ b/kedro/runner/runner.py @@ -103,7 +103,7 @@ def run( # Check if there's any output datasets that aren't in the catalog and don't match a pattern # in the catalog and include MemoryDataset. - free_outputs = (pipeline.outputs() - set(registered_ds)) | memory_datasets + free_outputs = pipeline.outputs() - (set(registered_ds) - memory_datasets) # Register the default dataset pattern with the catalog catalog = catalog.shallow_copy( diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 25ca233e97..8eb021a948 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -40,6 +40,11 @@ def multi_input_list_output(arg1, arg2): return [arg1, arg2] +def process_data(data): + # Dummy function + return data + + @pytest.fixture def conflicting_feed_dict(pandas_df_feed_dict): ds1 = MemoryDataset({"data": 0}) @@ -175,3 +180,21 @@ def pipeline_with_memory_datasets(): node(func=identity, inputs="Input2", outputs="MemOutput2", name="node2"), ] ) + + +@pytest.fixture +def pipeline_with_intermediate_memory_dataset(): + return pipeline( + [ + node( + func=process_data, + inputs="input_data", + outputs="intermediate_memory_data", + ), + node( + func=process_data, + inputs="intermediate_memory_data", + outputs="final_output", + ), + ] + ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 7b4ee8e809..043af327e7 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -311,3 +311,22 @@ def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): assert ( "RegularOutput" not in output ) # This output is registered in DataCatalog and so should not be in free outputs + + def test_intermediate_memory_dataset( + self, pipeline_with_intermediate_memory_dataset + ): + catalog = DataCatalog( + { + "input_data": MemoryDataset(data="initial data"), + "intermediate_memory_data": MemoryDataset(), # Intermediate dataset + "final_output": MemoryDataset(), + } + ) + + runner = SequentialRunner() + output = runner.run(pipeline_with_intermediate_memory_dataset, catalog) + + assert ( + "intermediate_memory_data" not in output + ), "Intermediate MemoryDataset should not be in free_outputs" + assert "final_output" in output, "Final output should be in free_outputs" From 9d7a018075ab67cf81c4ae9df652ce85c5fc6dfd Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Thu, 4 Jan 2024 14:59:07 +0000 Subject: [PATCH 5/6] remove redundant test Signed-off-by: Sajid Alam --- tests/runner/conftest.py | 23 ----------------------- tests/runner/test_sequential_runner.py | 19 ------------------- 2 files changed, 42 deletions(-) diff --git a/tests/runner/conftest.py b/tests/runner/conftest.py index 8eb021a948..25ca233e97 100644 --- a/tests/runner/conftest.py +++ b/tests/runner/conftest.py @@ -40,11 +40,6 @@ def multi_input_list_output(arg1, arg2): return [arg1, arg2] -def process_data(data): - # Dummy function - return data - - @pytest.fixture def conflicting_feed_dict(pandas_df_feed_dict): ds1 = MemoryDataset({"data": 0}) @@ -180,21 +175,3 @@ def pipeline_with_memory_datasets(): node(func=identity, inputs="Input2", outputs="MemOutput2", name="node2"), ] ) - - -@pytest.fixture -def pipeline_with_intermediate_memory_dataset(): - return pipeline( - [ - node( - func=process_data, - inputs="input_data", - outputs="intermediate_memory_data", - ), - node( - func=process_data, - inputs="intermediate_memory_data", - outputs="final_output", - ), - ] - ) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 043af327e7..7b4ee8e809 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -311,22 +311,3 @@ def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): assert ( "RegularOutput" not in output ) # This output is registered in DataCatalog and so should not be in free outputs - - def test_intermediate_memory_dataset( - self, pipeline_with_intermediate_memory_dataset - ): - catalog = DataCatalog( - { - "input_data": MemoryDataset(data="initial data"), - "intermediate_memory_data": MemoryDataset(), # Intermediate dataset - "final_output": MemoryDataset(), - } - ) - - runner = SequentialRunner() - output = runner.run(pipeline_with_intermediate_memory_dataset, catalog) - - assert ( - "intermediate_memory_data" not in output - ), "Intermediate MemoryDataset should not be in free_outputs" - assert "final_output" in output, "Final output should be in free_outputs" From 5395a6b6466c88f90d457eca85f00e64df6ad0d5 Mon Sep 17 00:00:00 2001 From: Sajid Alam Date: Fri, 5 Jan 2024 01:04:04 +0000 Subject: [PATCH 6/6] lint Signed-off-by: Sajid Alam --- tests/runner/test_sequential_runner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/runner/test_sequential_runner.py b/tests/runner/test_sequential_runner.py index 7b4ee8e809..0e28feed6d 100644 --- a/tests/runner/test_sequential_runner.py +++ b/tests/runner/test_sequential_runner.py @@ -289,7 +289,7 @@ def test_suggest_resume_scenario( class TestMemoryDatasetBehaviour: def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): - # Create a catalog with MemoryDataSet entries and inputs for the pipeline + # Create a catalog with MemoryDataset entries and inputs for the pipeline catalog = DataCatalog( { "Input1": LambdaDataset(load=lambda: "data1", save=lambda data: None), @@ -305,7 +305,7 @@ def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets): # Run the pipeline output = SequentialRunner().run(pipeline_with_memory_datasets, catalog) - # Check that MemoryDataSet outputs are included in the run results + # Check that MemoryDataset outputs are included in the run results assert "MemOutput1" in output assert "MemOutput2" in output assert (