diff --git a/PROCESSORS.md b/PROCESSORS.md index c8244bb..0984918 100644 --- a/PROCESSORS.md +++ b/PROCESSORS.md @@ -220,6 +220,39 @@ def dump_to_sql(tables, - `batch_size` - Maximum amount of rows to write at the same time to the DB (default 1000) - `use_bloom_filter` - Preprocess existing DB data to improve update performance (default: True) +### Flow Control + +#### conditional + +Run parts of the flow based on the structure of the datapackage at this point. + +```python +def conditional(predicate, flow): + pass +``` + +- `predicate` - a boolean function, receiving a single parameter which is a `Package.datapacakge` and returns true/false +- `flow` - a `Flow` to chain to the processing pipeline if the predicate is positive. + +Example - add a field if it doesn't exist in the first resource in the data package: + +```python +def no_such_field(field_name): + def func(dp): + return any(field_name == f.name for f in dp.resources[0].schema.fields) + return func + +Flow( + # ... + conditional( + no_such_field('my-field', Flow( + add_field('my-field', 'string', 'default-value') + )) + ) + # ... +) +``` + #### checkpoint Save results from running a series of steps, if checkpoint exists - loads from checkpoint instead of running the steps. diff --git a/dataflows/VERSION b/dataflows/VERSION index 36e6a20..2225cdf 100644 --- a/dataflows/VERSION +++ b/dataflows/VERSION @@ -1 +1 @@ -0.0.72 +0.0.73 diff --git a/dataflows/processors/__init__.py b/dataflows/processors/__init__.py index 3008426..62b857e 100644 --- a/dataflows/processors/__init__.py +++ b/dataflows/processors/__init__.py @@ -8,6 +8,7 @@ from .add_field import add_field from .checkpoint import checkpoint from .concatenate import concatenate +from .conditional import conditional from .delete_fields import delete_fields from .deduplicate import deduplicate from .duplicate import duplicate diff --git a/dataflows/processors/conditional.py b/dataflows/processors/conditional.py new file mode 100644 index 0000000..1811bea --- /dev/null +++ b/dataflows/processors/conditional.py @@ -0,0 +1,16 @@ +from .. import DataStreamProcessor + + +class conditional(DataStreamProcessor): + + def __init__(self, predicate, flow): + super().__init__() + self.predicate = predicate + self.flow = flow + + def _process(self): + ds = self.source._process() + if self.predicate(ds.dp): + return self.flow.datastream(ds) + else: + return ds diff --git a/setup.py b/setup.py index fed2df4..347e673 100644 --- a/setup.py +++ b/setup.py @@ -24,7 +24,7 @@ def read(*paths): 'tabulator>=1.23.0', 'datapackage>=1.5.0', 'tableschema>=1.5', - 'kvfile>=0.0.6', + 'kvfile>=0.0.8', 'click', 'jinja2', 'awesome-slugify', diff --git a/tests/test_lib.py b/tests/test_lib.py index 840053f..c6b119d 100644 --- a/tests/test_lib.py +++ b/tests/test_lib.py @@ -1532,7 +1532,6 @@ def test_force_temporal_format(): } ]] - # Extract missing values def test_extract_missing_values(): @@ -1632,3 +1631,29 @@ def test_extract_missing_values_options_source_is_list(): {'col1': None, 'col2': None, 'missingValues': {'col1': 'mis1', 'col2': 'mis2'}}, {'col1': 7, 'col2': 7, 'missingValues': {}}, ]] + + +def test_conditional(): + from dataflows import Flow, conditional, add_field + + tester = lambda dp: 'b' in [f.name for r in dp.resources for f in r.schema.fields] + + data1 = [ + dict(a=i, b=i) for i in range(3) + ] + data2 = [ + dict(a=i, c=i) for i in range(3) + ] + + result1, _, _ = Flow( + data1, conditional(tester, Flow(add_field('d', 'integer', lambda r: r['a']))) + ).results() + result2, _, _ = Flow( + data2, conditional(tester, Flow(add_field('d', 'integer', lambda r: r['a']))) + ).results() + assert result1[0] == [ + dict(a=i, b=i, d=i) for i in range(3) + ] + assert result2[0] == [ + dict(a=i, c=i) for i in range(3) + ]