Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MemoryDataset entries to free_outputs #3475

Merged
merged 9 commits into from
Jan 8, 2024
1 change: 1 addition & 0 deletions RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* Allowed modern versions of JupyterLab and Jupyter Notebooks.
* Removed setuptools dependency
* Added `source_dir` explicitly in `pyproject.toml` for non-src layout project.
* `MemoryDataset` entries are now included in free outputs.

## Breaking changes to the API
* Added logging about not using async mode in `SequentiallRunner` and `ParallelRunner`.
Expand Down
11 changes: 9 additions & 2 deletions kedro/runner/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,16 @@ def run(
f"Pipeline input(s) {unsatisfied} not found in the DataCatalog"
)

# Identify MemoryDataset in the catalog
memory_datasets = {
ds_name
for ds_name, ds in catalog._datasets.items()
if isinstance(ds, MemoryDataset)
}

# Check if there's any output datasets that aren't in the catalog and don't match a pattern
# in the catalog.
free_outputs = pipeline.outputs() - set(registered_ds)
# in the catalog and include MemoryDataset.
free_outputs = pipeline.outputs() - (set(registered_ds) - memory_datasets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: The definition of free_outputs has always been confusing to me, I think what are returned here is "in_memory_dataset" as we are trying to return the dataset as long as there are no I/O penalties.

Feel free to come up with other names.


# Register the default dataset pattern with the catalog
catalog = catalog.shallow_copy(
Expand Down
10 changes: 10 additions & 0 deletions tests/runner/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,13 @@ def two_branches_crossed_pipeline():
node(identity, "ds3_B", "ds4_B", name="node4_B"),
]
)


@pytest.fixture
def pipeline_with_memory_datasets():
return pipeline(
[
node(func=identity, inputs="Input1", outputs="MemOutput1", name="node1"),
node(func=identity, inputs="Input2", outputs="MemOutput2", name="node2"),
]
)
34 changes: 33 additions & 1 deletion tests/runner/test_sequential_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
import pytest

from kedro.framework.hooks import _create_hook_manager
from kedro.io import AbstractDataset, DataCatalog, DatasetError, LambdaDataset
from kedro.io import (
AbstractDataset,
DataCatalog,
DatasetError,
LambdaDataset,
MemoryDataset,
)
from kedro.pipeline import node
from kedro.pipeline.modular_pipeline import pipeline as modular_pipeline
from kedro.runner import SequentialRunner
Expand Down Expand Up @@ -279,3 +285,29 @@ def test_suggest_resume_scenario(
hook_manager=_create_hook_manager(),
)
assert re.search(expected_pattern, caplog.text)


class TestMemoryDatasetBehaviour:
def test_run_includes_memory_datasets(self, pipeline_with_memory_datasets):
# Create a catalog with MemoryDataset entries and inputs for the pipeline
catalog = DataCatalog(
{
"Input1": LambdaDataset(load=lambda: "data1", save=lambda data: None),
"Input2": LambdaDataset(load=lambda: "data2", save=lambda data: None),
"MemOutput1": MemoryDataset(),
"MemOutput2": MemoryDataset(),
}
)

# Add a regular dataset to the catalog
catalog.add("RegularOutput", LambdaDataset(None, None, lambda: True))

# Run the pipeline
output = SequentialRunner().run(pipeline_with_memory_datasets, catalog)

# Check that MemoryDataset outputs are included in the run results
assert "MemOutput1" in output
assert "MemOutput2" in output
assert (
"RegularOutput" not in output
) # This output is registered in DataCatalog and so should not be in free outputs