Skip to content

Commit

Permalink
v0.0.73 Introduce the conditional processor
Browse files Browse the repository at this point in the history
  • Loading branch information
akariv committed May 25, 2020
1 parent dff6a37 commit eb69a10
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 3 deletions.
33 changes: 33 additions & 0 deletions PROCESSORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,39 @@ def dump_to_sql(tables,
- `batch_size` - Maximum amount of rows to write at the same time to the DB (default 1000)
- `use_bloom_filter` - Preprocess existing DB data to improve update performance (default: True)

### Flow Control

#### conditional

Run parts of the flow based on the structure of the datapackage at this point.

```python
def conditional(predicate, flow):
pass
```

- `predicate` - a boolean function, receiving a single parameter which is a `Package.datapacakge` and returns true/false
- `flow` - a `Flow` to chain to the processing pipeline if the predicate is positive.

Example - add a field if it doesn't exist in the first resource in the data package:

```python
def no_such_field(field_name):
def func(dp):
return any(field_name == f.name for f in dp.resources[0].schema.fields)
return func

Flow(
# ...
conditional(
no_such_field('my-field', Flow(
add_field('my-field', 'string', 'default-value')
))
)
# ...
)
```

#### checkpoint

Save results from running a series of steps, if checkpoint exists - loads from checkpoint instead of running the steps.
Expand Down
2 changes: 1 addition & 1 deletion dataflows/VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.0.72
0.0.73
1 change: 1 addition & 0 deletions dataflows/processors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .add_field import add_field
from .checkpoint import checkpoint
from .concatenate import concatenate
from .conditional import conditional
from .delete_fields import delete_fields
from .deduplicate import deduplicate
from .duplicate import duplicate
Expand Down
16 changes: 16 additions & 0 deletions dataflows/processors/conditional.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .. import DataStreamProcessor


class conditional(DataStreamProcessor):

def __init__(self, predicate, flow):
super().__init__()
self.predicate = predicate
self.flow = flow

def _process(self):
ds = self.source._process()
if self.predicate(ds.dp):
return self.flow.datastream(ds)
else:
return ds
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def read(*paths):
'tabulator>=1.23.0',
'datapackage>=1.5.0',
'tableschema>=1.5',
'kvfile>=0.0.6',
'kvfile>=0.0.8',
'click',
'jinja2',
'awesome-slugify',
Expand Down
27 changes: 26 additions & 1 deletion tests/test_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -1532,7 +1532,6 @@ def test_force_temporal_format():
}
]]


# Extract missing values

def test_extract_missing_values():
Expand Down Expand Up @@ -1632,3 +1631,29 @@ def test_extract_missing_values_options_source_is_list():
{'col1': None, 'col2': None, 'missingValues': {'col1': 'mis1', 'col2': 'mis2'}},
{'col1': 7, 'col2': 7, 'missingValues': {}},
]]


def test_conditional():
from dataflows import Flow, conditional, add_field

tester = lambda dp: 'b' in [f.name for r in dp.resources for f in r.schema.fields]

data1 = [
dict(a=i, b=i) for i in range(3)
]
data2 = [
dict(a=i, c=i) for i in range(3)
]

result1, _, _ = Flow(
data1, conditional(tester, Flow(add_field('d', 'integer', lambda r: r['a'])))
).results()
result2, _, _ = Flow(
data2, conditional(tester, Flow(add_field('d', 'integer', lambda r: r['a'])))
).results()
assert result1[0] == [
dict(a=i, b=i, d=i) for i in range(3)
]
assert result2[0] == [
dict(a=i, c=i) for i in range(3)
]

0 comments on commit eb69a10

Please sign in to comment.