Skip to content

Commit b5825a8

Browse files
authored
Merge pull request #1252 from dshemetov/nancodes
Update archiver and export utils for nancodes and deletion-handling
2 parents 0e76a43 + 0ddf2aa commit b5825a8

File tree

4 files changed

+335
-55
lines changed

4 files changed

+335
-55
lines changed

_delphi_utils_python/delphi_utils/archive.py

+25-8
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,11 @@
4040
from git import Repo
4141
from git.refs.head import Head
4242
import pandas as pd
43+
import numpy as np
4344

4445
from .utils import read_params
4546
from .logger import get_structured_logger
47+
from .nancodes import Nans
4648

4749
Files = List[str]
4850
FileDiffMap = Dict[str, Optional[str]]
@@ -73,8 +75,10 @@ def diff_export_csv(
7375
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
7476
added_df is the pd.DataFrame of added rows from after_csv.
7577
"""
76-
export_csv_dtypes = {"geo_id": str, "val": float,
77-
"se": float, "sample_size": float}
78+
export_csv_dtypes = {
79+
"geo_id": str, "val": float, "se": float, "sample_size": float,
80+
"missing_val": int, "missing_se": int, "missing_sample_size": int
81+
}
7882

7983
before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
8084
before_df.set_index("geo_id", inplace=True)
@@ -89,12 +93,22 @@ def diff_export_csv(
8993
before_df_cmn = before_df.reindex(common_idx)
9094
after_df_cmn = after_df.reindex(common_idx)
9195

92-
# Exact comparisons, treating NA == NA as True
93-
same_mask = before_df_cmn == after_df_cmn
94-
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
96+
# If CSVs have different columns (no missingness), mark all values as new
97+
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
98+
same_mask = after_df_cmn.copy()
99+
same_mask.loc[:] = False
100+
else:
101+
# Exact comparisons, treating NA == NA as True
102+
same_mask = before_df_cmn == after_df_cmn
103+
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
104+
105+
# Code deleted entries as nans with the deleted missing code
106+
deleted_df = before_df.loc[deleted_idx, :].copy()
107+
deleted_df[["val", "se", "sample_size"]] = np.nan
108+
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED
95109

96110
return (
97-
before_df.loc[deleted_idx, :],
111+
deleted_df,
98112
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
99113
after_df.loc[added_idx, :])
100114

@@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:
227241

228242
deleted_df, changed_df, added_df = diff_export_csv(
229243
before_file, after_file)
230-
new_issues_df = pd.concat([changed_df, added_df], axis=0)
244+
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)
231245

232246
if len(deleted_df) > 0:
233247
print(
234-
f"Warning, diff has deleted indices in {after_file} that will be ignored")
248+
f"Diff has deleted indices in {after_file} that have been coded as nans.")
235249

236250
# Write the diffs to diff_file, if applicable
237251
if len(new_issues_df) > 0:
@@ -414,6 +428,9 @@ def archive_exports(self, # pylint: disable=arguments-differ
414428
archive_success.append(exported_file)
415429
except FileNotFoundError:
416430
archive_fail.append(exported_file)
431+
except shutil.SameFileError:
432+
# no need to copy if the cached file is the same
433+
archive_success.append(exported_file)
417434

418435
self._exports_archived = True
419436

_delphi_utils_python/delphi_utils/export.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,33 @@
33
from datetime import datetime
44
from os.path import join
55
from typing import Optional
6+
import logging
67

78
from epiweeks import Week
89
import numpy as np
910
import pandas as pd
1011

12+
from .nancodes import Nans
13+
14+
def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
15+
"""Find values with contradictory missingness codes, filter them, and log."""
16+
columns = ["val", "se", "sample_size"]
17+
# Get indicies where the XNOR is true (i.e. both are true or both are false).
18+
masks = [
19+
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
20+
for column in columns
21+
]
22+
for mask in masks:
23+
if not logger is None and df.loc[mask].size > 0:
24+
logger.info(
25+
"Filtering contradictory missing code in " +
26+
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
27+
)
28+
df = df.loc[~mask]
29+
elif logger is None and df.loc[mask].size > 0:
30+
df = df.loc[~mask]
31+
return df
32+
1133
def create_export_csv(
1234
df: pd.DataFrame,
1335
export_dir: str,
@@ -18,6 +40,7 @@ def create_export_csv(
1840
end_date: Optional[datetime] = None,
1941
remove_null_samples: Optional[bool] = False,
2042
write_empty_days: Optional[bool] = False,
43+
logger: Optional[logging.Logger] = None,
2144
weekly_dates = False,
2245
):
2346
"""Export data in the format expected by the Delphi API.
@@ -45,6 +68,8 @@ def create_export_csv(
4568
write_empty_days: Optional[bool]
4669
If true, every day in between start_date and end_date will have a CSV file written
4770
even if there is no data for the day. If false, only the days present are written.
71+
logger: Optional[logging.Logger]
72+
Pass a logger object here to log information about contradictory missing codes.
4873
4974
Returns
5075
---------
@@ -77,7 +102,20 @@ def create_export_csv(
77102
else:
78103
export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv"
79104
export_file = join(export_dir, export_filename)
80-
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
105+
expected_columns = [
106+
"geo_id",
107+
"val",
108+
"se",
109+
"sample_size",
110+
"missing_val",
111+
"missing_se",
112+
"missing_sample_size"
113+
]
114+
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
115+
if "missing_val" in export_df.columns:
116+
export_df = filter_contradicting_missing_codes(
117+
export_df, sensor, metric, date, logger=logger
118+
)
81119
if remove_null_samples:
82120
export_df = export_df[export_df["sample_size"].notnull()]
83121
export_df = export_df.round({"val": 7, "se": 7})

_delphi_utils_python/tests/test_archive.py

+115-27
Original file line numberDiff line numberDiff line change
@@ -14,29 +14,64 @@
1414

1515
from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
1616
archiver_from_params
17+
from delphi_utils.nancodes import Nans
1718

18-
CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
19+
CSV_DTYPES = {
20+
"geo_id": str, "val": float, "se": float, "sample_size": float,
21+
"missing_val": int, "missing_se":int, "missing_sample_size": int
22+
}
1923

2024
CSVS_BEFORE = {
2125
# Common
2226
"csv0": pd.DataFrame({
2327
"geo_id": ["1", "2", "3"],
2428
"val": [1.000000001, 2.00000002, 3.00000003],
2529
"se": [0.1, 0.2, 0.3],
26-
"sample_size": [10.0, 20.0, 30.0]}),
30+
"sample_size": [10.0, 20.0, 30.0],
31+
"missing_val": [Nans.NOT_MISSING] * 3,
32+
"missing_se": [Nans.NOT_MISSING] * 3,
33+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
34+
}),
2735

2836
"csv1": pd.DataFrame({
2937
"geo_id": ["1", "2", "3"],
3038
"val": [1.0, 2.0, 3.0],
3139
"se": [np.nan, 0.20000002, 0.30000003],
32-
"sample_size": [10.0, 20.0, 30.0]}),
40+
"sample_size": [10.0, 20.0, 30.0],
41+
"missing_val": [Nans.NOT_MISSING] * 3,
42+
"missing_se": [Nans.NOT_MISSING] * 3,
43+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
44+
}),
3345

3446
# Deleted
3547
"csv2": pd.DataFrame({
3648
"geo_id": ["1"],
3749
"val": [1.0],
3850
"se": [0.1],
39-
"sample_size": [10.0]}),
51+
"sample_size": [10.0],
52+
"missing_val": [Nans.NOT_MISSING],
53+
"missing_se": [Nans.NOT_MISSING],
54+
"missing_sample_size": [Nans.NOT_MISSING],
55+
}),
56+
57+
# Common, but updated with missing columns
58+
"csv4": pd.DataFrame({
59+
"geo_id": ["1"],
60+
"val": [1.0],
61+
"se": [0.1],
62+
"sample_size": [10.0]
63+
}),
64+
65+
# Common, but missing columns removed
66+
"csv5": pd.DataFrame({
67+
"geo_id": ["1"],
68+
"val": [1.0],
69+
"se": [0.1],
70+
"sample_size": [10.0],
71+
"missing_val": [Nans.NOT_MISSING],
72+
"missing_se": [Nans.NOT_MISSING],
73+
"missing_sample_size": [Nans.NOT_MISSING],
74+
}),
4075
}
4176

4277
CSVS_AFTER = {
@@ -45,23 +80,53 @@
4580
"geo_id": ["1", "2", "3"],
4681
"val": [1.0, 2.0, 3.0],
4782
"se": [0.10000001, 0.20000002, 0.30000003],
48-
"sample_size": [10.0, 20.0, 30.0]}),
83+
"sample_size": [10.0, 20.0, 30.0],
84+
"missing_val": [Nans.NOT_MISSING] * 3,
85+
"missing_se": [Nans.NOT_MISSING] * 3,
86+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
87+
}),
4988

5089
"csv1": pd.DataFrame({
5190
"geo_id": ["1", "2", "4"],
5291
"val": [1.0, 2.1, 4.0],
5392
"se": [np.nan, 0.21, np.nan],
54-
"sample_size": [10.0, 21.0, 40.0]}),
93+
"sample_size": [10.0, 21.0, 40.0],
94+
"missing_val": [Nans.NOT_MISSING] * 3,
95+
"missing_se": [Nans.NOT_MISSING] * 3,
96+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
97+
}),
5598

5699
# Added
57100
"csv3": pd.DataFrame({
58101
"geo_id": ["2"],
59102
"val": [2.0000002],
60103
"se": [0.2],
61-
"sample_size": [20.0]}),
104+
"sample_size": [20.0],
105+
"missing_val": [Nans.NOT_MISSING],
106+
"missing_se": [Nans.NOT_MISSING],
107+
"missing_sample_size": [Nans.NOT_MISSING],
108+
}),
109+
110+
# Common, but updated with missing columns
111+
"csv4": pd.DataFrame({
112+
"geo_id": ["1"],
113+
"val": [1.0],
114+
"se": [0.1],
115+
"sample_size": [10.0],
116+
"missing_val": [Nans.NOT_MISSING],
117+
"missing_se": [Nans.NOT_MISSING],
118+
"missing_sample_size": [Nans.NOT_MISSING],
119+
}),
120+
121+
# Common, but missing columns removed
122+
"csv5": pd.DataFrame({
123+
"geo_id": ["1"],
124+
"val": [1.0],
125+
"se": [0.1],
126+
"sample_size": [10.0]
127+
}),
62128
}
63129

64-
65130
class TestArchiveDiffer:
66131

67132
def test_stubs(self):
@@ -80,10 +145,14 @@ def test_diff_and_filter_exports(self, tmp_path):
80145
mkdir(export_dir)
81146

82147
csv1_diff = pd.DataFrame({
83-
"geo_id": ["2", "4"],
84-
"val": [2.1, 4.0],
85-
"se": [0.21, np.nan],
86-
"sample_size": [21.0, 40.0]})
148+
"geo_id": ["3", "2", "4"],
149+
"val": [np.nan, 2.1, 4.0],
150+
"se": [np.nan, 0.21, np.nan],
151+
"sample_size": [np.nan, 21.0, 40.0],
152+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
153+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
154+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
155+
})
87156

88157
arch_diff = ArchiveDiffer(cache_dir, export_dir)
89158

@@ -106,15 +175,18 @@ def test_diff_and_filter_exports(self, tmp_path):
106175
# Check return values
107176
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
108177
assert set(common_diffs.keys()) == {
109-
join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]}
178+
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv"]}
110179
assert set(new_files) == {join(export_dir, "csv3.csv")}
111180
assert common_diffs[join(export_dir, "csv0.csv")] is None
112181
assert common_diffs[join(export_dir, "csv1.csv")] == join(
113182
export_dir, "csv1.csv.diff")
114183

115184
# Check filesystem for actual files
116185
assert set(listdir(export_dir)) == {
117-
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"}
186+
"csv0.csv", "csv1.csv", "csv1.csv.diff",
187+
"csv3.csv", "csv4.csv", "csv4.csv.diff",
188+
"csv5.csv", "csv5.csv.diff"
189+
}
118190
assert_frame_equal(
119191
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
120192
csv1_diff)
@@ -132,7 +204,7 @@ def test_diff_and_filter_exports(self, tmp_path):
132204
arch_diff.filter_exports(common_diffs)
133205

134206
# Check exports directory just has incremental changes
135-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
207+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"}
136208
assert_frame_equal(
137209
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
138210
csv1_diff)
@@ -259,12 +331,16 @@ def test_run(self, tmp_path, s3_client):
259331
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)
260332

261333
# Check exports directory just has incremental changes
262-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
334+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"}
263335
csv1_diff = pd.DataFrame({
264-
"geo_id": ["2", "4"],
265-
"val": [2.1, 4.0],
266-
"se": [0.21, np.nan],
267-
"sample_size": [21.0, 40.0]})
336+
"geo_id": ["3", "2", "4"],
337+
"val": [np.nan, 2.1, 4.0],
338+
"se": [np.nan, 0.21, np.nan],
339+
"sample_size": [np.nan, 21.0, 40.0],
340+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
341+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
342+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
343+
})
268344
assert_frame_equal(
269345
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
270346
csv1_diff)
@@ -346,7 +422,11 @@ def test_diff_exports(self, tmp_path):
346422
"geo_id": ["1", "2", "3"],
347423
"val": [1.0, 2.0, 3.0],
348424
"se": [0.1, 0.2, 0.3],
349-
"sample_size": [10.0, 20.0, 30.0]})
425+
"sample_size": [10.0, 20.0, 30.0],
426+
"missing_val": [Nans.NOT_MISSING] * 3,
427+
"missing_se": [Nans.NOT_MISSING] * 3,
428+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
429+
})
350430

351431
# Write exact same CSV into cache and export, so no diffs expected
352432
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
@@ -383,7 +463,11 @@ def test_archive_exports(self, tmp_path):
383463
"geo_id": ["1", "2", "3"],
384464
"val": [1.0, 2.0, 3.0],
385465
"se": [0.1, 0.2, 0.3],
386-
"sample_size": [10.0, 20.0, 30.0]})
466+
"sample_size": [10.0, 20.0, 30.0],
467+
"missing_val": [Nans.NOT_MISSING] * 3,
468+
"missing_se": [Nans.NOT_MISSING] * 3,
469+
"missing_sample_size": [Nans.NOT_MISSING] * 3,
470+
})
387471

388472
# csv1.csv is now a dirty edit in the repo, and to be exported too
389473
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
@@ -460,12 +544,16 @@ def test_run(self, tmp_path):
460544
original_branch.checkout()
461545

462546
# Check exports directory just has incremental changes
463-
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
547+
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"}
464548
csv1_diff = pd.DataFrame({
465-
"geo_id": ["2", "4"],
466-
"val": [2.1, 4.0],
467-
"se": [0.21, np.nan],
468-
"sample_size": [21.0, 40.0]})
549+
"geo_id": ["3", "2", "4"],
550+
"val": [np.nan, 2.1, 4.0],
551+
"se": [np.nan, 0.21, np.nan],
552+
"sample_size": [np.nan, 21.0, 40.0],
553+
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
554+
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
555+
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
556+
})
469557
assert_frame_equal(
470558
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
471559
csv1_diff)

0 commit comments

Comments
 (0)