Skip to content

Commit 7a87d04

Browse files
Merge pull request #16 from great-expectations/m/CORE-751/configure_file_data_context_cleanup
[FEATURE] Allow configure_file_data_context to return a generator
2 parents e30a388 + 0e9edf7 commit 7a87d04

File tree

2 files changed

+159
-6
lines changed

2 files changed

+159
-6
lines changed

great_expectations_provider/operators/validate_checkpoint.py

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,16 @@
11
from __future__ import annotations
22

3-
from typing import TYPE_CHECKING, Callable, Literal
3+
import inspect
4+
from typing import TYPE_CHECKING, Callable, Generator, Literal, cast
45

56
from airflow.models import BaseOperator
7+
from great_expectations.data_context import AbstractDataContext, FileDataContext
68

79
if TYPE_CHECKING:
810
from airflow.utils.context import Context
911
from great_expectations import Checkpoint
1012
from great_expectations.checkpoint.checkpoint import CheckpointDescriptionDict
1113
from great_expectations.core.batch import BatchParameters
12-
from great_expectations.data_context import AbstractDataContext, FileDataContext
1314

1415

1516
class GXValidateCheckpointOperator(BaseOperator):
@@ -18,7 +19,9 @@ def __init__(
1819
configure_checkpoint: Callable[[AbstractDataContext], Checkpoint],
1920
batch_parameters: BatchParameters | None = None,
2021
context_type: Literal["ephemeral", "cloud", "file"] = "ephemeral",
21-
configure_file_data_context: Callable[[], FileDataContext] | None = None,
22+
configure_file_data_context: Callable[[], FileDataContext]
23+
| Callable[[], Generator[FileDataContext, None, None]]
24+
| None = None,
2225
*args,
2326
**kwargs,
2427
) -> None:
@@ -40,15 +43,50 @@ def execute(self, context: Context) -> CheckpointDescriptionDict:
4043
import great_expectations as gx
4144

4245
gx_context: AbstractDataContext
46+
file_context_generator: Generator[FileDataContext, None, None] | None = None
4347

4448
if self.context_type == "file":
4549
if not self.configure_file_data_context:
4650
raise ValueError(
4751
"Parameter `configure_file_data_context` must be specified if `context_type` is `file`"
4852
)
49-
gx_context = self.configure_file_data_context()
53+
elif inspect.isgeneratorfunction(self.configure_file_data_context):
54+
file_context_generator = self.configure_file_data_context()
55+
gx_context = self._get_value_from_generator(file_context_generator)
56+
else:
57+
file_context_fn = cast(
58+
Callable[[], FileDataContext], self.configure_file_data_context
59+
)
60+
gx_context = file_context_fn()
5061
else:
5162
gx_context = gx.get_context(mode=self.context_type)
5263
checkpoint = self.configure_checkpoint(gx_context)
5364
result = checkpoint.run(batch_parameters=self.batch_parameters)
65+
66+
if file_context_generator:
67+
self._allow_generator_teardown(file_context_generator)
68+
5469
return result.describe_dict()
70+
71+
def _get_value_from_generator(
72+
self, generator: Generator[FileDataContext, None, None]
73+
) -> FileDataContext:
74+
try:
75+
return next(generator)
76+
except StopIteration:
77+
raise RuntimeError("Generator must yield exactly once; did not yield")
78+
79+
def _allow_generator_teardown(self, generator: Generator) -> None:
80+
"""Run the generator to completion to allow for any cleanup/teardown.
81+
82+
Also does some error handling to ensure the generator doesn't yield more than once.
83+
"""
84+
try:
85+
# Check if we have another yield (this is an error case)
86+
next(generator)
87+
except StopIteration:
88+
pass
89+
else:
90+
raise RuntimeError(
91+
"Generator must yield exactly once; yielded more than once"
92+
)

tests/unit/test_validate_checkpoint_operator.py

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
import json
2-
from typing import Literal
2+
from typing import Generator, Literal
33
from unittest.mock import Mock
44

55
import pandas as pd
66
import pytest
77
from great_expectations import Checkpoint, ExpectationSuite, ValidationDefinition
8-
from great_expectations.data_context import AbstractDataContext
8+
from great_expectations.data_context import AbstractDataContext, FileDataContext
99
from great_expectations.expectations import ExpectColumnValuesToBeInSet
1010
from pytest_mock import MockerFixture
1111

@@ -191,3 +191,118 @@ def test_batch_parameters(self) -> None:
191191

192192
# assert
193193
mock_checkpoint.run.assert_called_once_with(batch_parameters=batch_parameters)
194+
195+
def test_configure_file_data_context_with_without_generator(self) -> None:
196+
"""Expect that configure_file_data_context can just return a DataContext"""
197+
# arrange
198+
mock_context = Mock(spec=AbstractDataContext)
199+
setup = Mock()
200+
teardown = Mock()
201+
configure_checkpoint = Mock()
202+
203+
def configure_file_data_context() -> FileDataContext:
204+
setup()
205+
return mock_context
206+
207+
validate_checkpoint = GXValidateCheckpointOperator(
208+
task_id="validate_checkpoint_success",
209+
configure_checkpoint=configure_checkpoint,
210+
context_type="file",
211+
configure_file_data_context=configure_file_data_context,
212+
)
213+
214+
# act
215+
validate_checkpoint.execute(context={})
216+
217+
# assert
218+
setup.assert_called_once()
219+
configure_checkpoint.assert_called_once_with(mock_context)
220+
teardown.assert_not_called()
221+
222+
def test_configure_file_data_context_with_generator(self) -> None:
223+
"""Expect that configure_file_data_context can return a generator that yeidls a DataContext."""
224+
# arrange
225+
mock_context = Mock(spec=AbstractDataContext)
226+
setup = Mock()
227+
teardown = Mock()
228+
configure_checkpoint = Mock()
229+
230+
def configure_file_data_context() -> Generator[FileDataContext, None, None]:
231+
setup()
232+
yield mock_context
233+
teardown()
234+
235+
validate_checkpoint = GXValidateCheckpointOperator(
236+
task_id="validate_checkpoint_success",
237+
configure_checkpoint=configure_checkpoint,
238+
context_type="file",
239+
configure_file_data_context=configure_file_data_context,
240+
)
241+
242+
# act
243+
validate_checkpoint.execute(context={})
244+
245+
# assert
246+
setup.assert_called_once()
247+
configure_checkpoint.assert_called_once_with(mock_context)
248+
teardown.assert_called_once()
249+
250+
def test_configure_file_data_context_with_generator_no_yield(self) -> None:
251+
"""Expect that configure_file_data_context errors if it does not yield a DataContext."""
252+
# arrange
253+
mock_context = Mock(spec=AbstractDataContext)
254+
setup = Mock()
255+
teardown = Mock()
256+
configure_checkpoint = Mock()
257+
258+
def configure_file_data_context() -> Generator[FileDataContext, None, None]:
259+
setup()
260+
if False:
261+
# Force this to be a generator for the test
262+
yield mock_context
263+
teardown()
264+
265+
validate_checkpoint = GXValidateCheckpointOperator(
266+
task_id="validate_checkpoint_success",
267+
configure_checkpoint=configure_checkpoint,
268+
context_type="file",
269+
configure_file_data_context=configure_file_data_context,
270+
)
271+
272+
# act
273+
with pytest.raises(RuntimeError, match="did not yield"):
274+
validate_checkpoint.execute(context={})
275+
276+
# assert
277+
setup.assert_called_once()
278+
configure_checkpoint.assert_not_called()
279+
teardown.assert_called_once()
280+
281+
def test_configure_file_data_context_with_generator_multiple_yields(self) -> None:
282+
"""Expect that configure_file_data_context errors if it yields multiple times."""
283+
# arrange
284+
mock_context = Mock(spec=AbstractDataContext)
285+
setup = Mock()
286+
teardown = Mock()
287+
configure_checkpoint = Mock()
288+
289+
def configure_file_data_context() -> Generator[FileDataContext, None, None]:
290+
setup()
291+
yield mock_context
292+
yield mock_context
293+
teardown()
294+
295+
validate_checkpoint = GXValidateCheckpointOperator(
296+
task_id="validate_checkpoint_success",
297+
configure_checkpoint=configure_checkpoint,
298+
context_type="file",
299+
configure_file_data_context=configure_file_data_context,
300+
)
301+
302+
# act
303+
with pytest.raises(RuntimeError, match="yielded more than once"):
304+
validate_checkpoint.execute(context={})
305+
306+
# assert
307+
setup.assert_called_once()
308+
teardown.assert_not_called()

0 commit comments

Comments
 (0)