Skip to content

Commit dff6a37

Browse files
authored
[WIP] Extract missing values (to data) (#124)
* Added a test stub * Implemented extract_missing_values * Added a test * Added documentation * Fixed typo
1 parent 6515b09 commit dff6a37

File tree

4 files changed

+152
-0
lines changed

4 files changed

+152
-0
lines changed

PROCESSORS.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ Relevant only when _not_ loading data from a datapackage:
7777
- `load.CAST_DO_NOTHING` - Data will be passed as-is without modifications or validation
7878
- `load.CAST_WITH_SCHEMA` - Data will be parsed and casted using the schema and will error in case of faulty data
7979
- `override_schema` - Provided dictionary will be merged into the inferred schema. If `fields` key is set its contents will fully replace the inferred fields array. The same behavior will be applied for all other nested structures.
80+
- `extract_missing_values (bool|dict)` - If `True` it will extract missing values defined in a schema and place in to a new field called `missingValues` with a type `object` in a form of `{field1: value1, field2: value2}`. If a row doesn't have any missing values the field will get an empty object. This option can be a hash with 3 optional keys `source`, `target` and `values` where:
81+
- `source (str|str[])` - a field or list of fields to extract missing values (default: all fields)
82+
- `target (str)` - a field to place a missing values mapping (default: `missingValues`)
83+
- `values (str[])` - an alternative list of missing values (default: `schema['missingValues']`)
8084
- `override_fields` - Provided mapping will patch the inferred `schema.fields` array. In the mapping keys must be field names and values must be dictionaries intended to be merged into the corresponding field's metadata.
8185
- `deduplicate_headers` - (default `False`) If there are duplicate headers and the flag is set to `True` it will rename them using a `header (1), header (2), etc` approach. If there are duplicate headers and the flag is set to `False` it will raise an error.
8286
- `on_error` - Dictates how `load` will behave in case of a validation error.

data/missing_values.csv

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
col1,col2
2+
1,1
3+
err1,2
4+
3,3
5+
4,err2
6+
5,5
7+
mis1,mis2
8+
7,7

dataflows/processors/load.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ class load(DataStreamProcessor):
108108
def __init__(self, load_source, name=None, resources=None, strip=True, limit_rows=None,
109109
infer_strategy=None, cast_strategy=None,
110110
override_schema=None, override_fields=None,
111+
extract_missing_values=None,
111112
deduplicate_headers=False,
112113
on_error=raise_exception,
113114
**options):
@@ -123,6 +124,18 @@ def __init__(self, load_source, name=None, resources=None, strip=True, limit_row
123124
self.override_fields = override_fields
124125
self.deduplicate_headers = deduplicate_headers
125126

127+
# Extract missing values
128+
self.extract_missing_values = None
129+
if extract_missing_values is not None:
130+
if isinstance(extract_missing_values, bool):
131+
extract_missing_values = {}
132+
extract_missing_values.setdefault('source', None)
133+
extract_missing_values.setdefault('target', 'missingValues')
134+
extract_missing_values.setdefault('values', [])
135+
if isinstance(extract_missing_values.get('source'), str):
136+
extract_missing_values['source'] = [extract_missing_values['source']]
137+
self.extract_missing_values = extract_missing_values
138+
126139
self.load_dp = None
127140
self.resource_descriptors = []
128141
self.iterators = []
@@ -223,6 +236,16 @@ def safe_process_datapackage(self, dp: Package):
223236
fields = schema.get('fields', [])
224237
for field in fields:
225238
field.update(self.override_fields.get(field['name'], {}))
239+
if self.extract_missing_values:
240+
missing_values = schema.get('missingValues', [])
241+
if not self.extract_missing_values['values']:
242+
self.extract_missing_values['values'] = missing_values
243+
schema['fields'].append({
244+
'name': self.extract_missing_values['target'],
245+
'type': 'object',
246+
'format': 'default',
247+
'values': self.extract_missing_values['values'],
248+
})
226249
descriptor['schema'] = schema
227250
descriptor['format'] = self.options.get('format', stream.format)
228251
descriptor['path'] += '.{}'.format(stream.format)
@@ -252,9 +275,25 @@ def stringer(self, iterator):
252275
for k, v in r.items()
253276
)
254277

278+
def missing_values_extractor(self, iterator):
279+
source = self.extract_missing_values['source']
280+
target = self.extract_missing_values['target']
281+
values = self.extract_missing_values['values']
282+
for row in iterator:
283+
mapping = {}
284+
if values:
285+
for key, value in row.items():
286+
if not source or key in source:
287+
if value in values:
288+
mapping[key] = value
289+
row[target] = mapping
290+
yield row
291+
255292
def process_resources(self, resources):
256293
yield from super(load, self).process_resources(resources)
257294
for descriptor, it in zip(self.resource_descriptors, self.iterators):
295+
if self.extract_missing_values:
296+
it = self.missing_values_extractor(it)
258297
it = self.caster(descriptor, it)
259298
if self.strip:
260299
it = self.stripper(it)

tests/test_lib.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1531,3 +1531,104 @@ def test_force_temporal_format():
15311531
'time': datetime.time(8, 10, 4),
15321532
}
15331533
]]
1534+
1535+
1536+
# Extract missing values
1537+
1538+
def test_extract_missing_values():
1539+
from dataflows import load
1540+
schema = {
1541+
'missingValues': ['err1', 'err2', 'mis1', 'mis2'],
1542+
'fields': [
1543+
{'name': 'col1', 'type': 'number', 'format': 'default'},
1544+
{'name': 'col2', 'type': 'number', 'format': 'default'},
1545+
]
1546+
}
1547+
flow = Flow(
1548+
load('data/missing_values.csv', override_schema=schema, extract_missing_values=True),
1549+
)
1550+
data, package, stats = flow.results()
1551+
assert package.descriptor['resources'][0]['schema']['fields'][0] == schema['fields'][0]
1552+
assert package.descriptor['resources'][0]['schema']['fields'][1] == schema['fields'][1]
1553+
assert package.descriptor['resources'][0]['schema']['fields'][2] == {
1554+
'name': 'missingValues',
1555+
'type': 'object',
1556+
'format': 'default',
1557+
'values': schema['missingValues'],
1558+
}
1559+
assert data == [[
1560+
{'col1': 1, 'col2': 1, 'missingValues': {}},
1561+
{'col1': None, 'col2': 2, 'missingValues': {'col1': 'err1'}},
1562+
{'col1': 3, 'col2': 3, 'missingValues': {}},
1563+
{'col1': 4, 'col2': None, 'missingValues': {'col2': 'err2'}},
1564+
{'col1': 5, 'col2': 5, 'missingValues': {}},
1565+
{'col1': None, 'col2': None, 'missingValues': {'col1': 'mis1', 'col2': 'mis2'}},
1566+
{'col1': 7, 'col2': 7, 'missingValues': {}},
1567+
]]
1568+
1569+
def test_extract_missing_values_options():
1570+
from dataflows import load
1571+
schema = {
1572+
'missingValues': ['err1', 'err2', 'mis1', 'mis2'],
1573+
'fields': [
1574+
{'name': 'col1', 'type': 'number', 'format': 'default'},
1575+
{'name': 'col2', 'type': 'number', 'format': 'default'},
1576+
]
1577+
}
1578+
flow = Flow(
1579+
load('data/missing_values.csv', override_schema=schema, extract_missing_values={
1580+
'source': 'col1',
1581+
'target': 'notes'
1582+
}),
1583+
)
1584+
data, package, stats = flow.results()
1585+
assert package.descriptor['resources'][0]['schema']['fields'][0] == schema['fields'][0]
1586+
assert package.descriptor['resources'][0]['schema']['fields'][1] == schema['fields'][1]
1587+
assert package.descriptor['resources'][0]['schema']['fields'][2] == {
1588+
'name': 'notes',
1589+
'type': 'object',
1590+
'format': 'default',
1591+
'values': schema['missingValues'],
1592+
}
1593+
assert data == [[
1594+
{'col1': 1, 'col2': 1, 'notes': {}},
1595+
{'col1': None, 'col2': 2, 'notes': {'col1': 'err1'}},
1596+
{'col1': 3, 'col2': 3, 'notes': {}},
1597+
{'col1': 4, 'col2': None, 'notes': {}},
1598+
{'col1': 5, 'col2': 5, 'notes': {}},
1599+
{'col1': None, 'col2': None, 'notes': {'col1': 'mis1'}},
1600+
{'col1': 7, 'col2': 7, 'notes': {}},
1601+
]]
1602+
1603+
def test_extract_missing_values_options_source_is_list():
1604+
from dataflows import load
1605+
schema = {
1606+
'missingValues': ['err1', 'err2', 'mis1', 'mis2'],
1607+
'fields': [
1608+
{'name': 'col1', 'type': 'number', 'format': 'default'},
1609+
{'name': 'col2', 'type': 'number', 'format': 'default'},
1610+
]
1611+
}
1612+
flow = Flow(
1613+
load('data/missing_values.csv', override_schema=schema, extract_missing_values={
1614+
'source': ['col1', 'col2'],
1615+
}),
1616+
)
1617+
data, package, stats = flow.results()
1618+
assert package.descriptor['resources'][0]['schema']['fields'][0] == schema['fields'][0]
1619+
assert package.descriptor['resources'][0]['schema']['fields'][1] == schema['fields'][1]
1620+
assert package.descriptor['resources'][0]['schema']['fields'][2] == {
1621+
'name': 'missingValues',
1622+
'type': 'object',
1623+
'format': 'default',
1624+
'values': schema['missingValues'],
1625+
}
1626+
assert data == [[
1627+
{'col1': 1, 'col2': 1, 'missingValues': {}},
1628+
{'col1': None, 'col2': 2, 'missingValues': {'col1': 'err1'}},
1629+
{'col1': 3, 'col2': 3, 'missingValues': {}},
1630+
{'col1': 4, 'col2': None, 'missingValues': {'col2': 'err2'}},
1631+
{'col1': 5, 'col2': 5, 'missingValues': {}},
1632+
{'col1': None, 'col2': None, 'missingValues': {'col1': 'mis1', 'col2': 'mis2'}},
1633+
{'col1': 7, 'col2': 7, 'missingValues': {}},
1634+
]]

0 commit comments

Comments
 (0)