Skip to content

Update archiver and export utils for nancodes and deletion-handling #1252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Sep 28, 2021
33 changes: 25 additions & 8 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
from git import Repo
from git.refs.head import Head
import pandas as pd
import numpy as np

from .utils import read_params
from .logger import get_structured_logger
from .nancodes import Nans

Files = List[str]
FileDiffMap = Dict[str, Optional[str]]
Expand Down Expand Up @@ -73,8 +75,10 @@ def diff_export_csv(
changed_df is the pd.DataFrame of common rows from after_csv with changed values.
added_df is the pd.DataFrame of added rows from after_csv.
"""
export_csv_dtypes = {"geo_id": str, "val": float,
"se": float, "sample_size": float}
export_csv_dtypes = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}

before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes)
before_df.set_index("geo_id", inplace=True)
Expand All @@ -89,12 +93,22 @@ def diff_export_csv(
before_df_cmn = before_df.reindex(common_idx)
after_df_cmn = after_df.reindex(common_idx)

# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)
# If CSVs have different columns (no missingness), mark all values as new
if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns):
same_mask = after_df_cmn.copy()
same_mask.loc[:] = False
else:
# Exact comparisons, treating NA == NA as True
same_mask = before_df_cmn == after_df_cmn
same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn)

# Code deleted entries as nans with the deleted missing code
deleted_df = before_df.loc[deleted_idx, :].copy()
deleted_df[["val", "se", "sample_size"]] = np.nan
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

return (
before_df.loc[deleted_idx, :],
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
after_df.loc[added_idx, :])

Expand Down Expand Up @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]:

deleted_df, changed_df, added_df = diff_export_csv(
before_file, after_file)
new_issues_df = pd.concat([changed_df, added_df], axis=0)
new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0)

if len(deleted_df) > 0:
print(
f"Warning, diff has deleted indices in {after_file} that will be ignored")
f"Diff has deleted indices in {after_file} that have been coded as nans.")

# Write the diffs to diff_file, if applicable
if len(new_issues_df) > 0:
Expand Down Expand Up @@ -414,6 +428,9 @@ def archive_exports(self, # pylint: disable=arguments-differ
archive_success.append(exported_file)
except FileNotFoundError:
archive_fail.append(exported_file)
except shutil.SameFileError:
# no need to copy if the cached file is the same
archive_success.append(exported_file)

self._exports_archived = True

Expand Down
40 changes: 39 additions & 1 deletion _delphi_utils_python/delphi_utils/export.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,33 @@
from datetime import datetime
from os.path import join
from typing import Optional
import logging

from epiweeks import Week
import numpy as np
import pandas as pd

from .nancodes import Nans

def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None):
"""Find values with contradictory missingness codes, filter them, and log."""
columns = ["val", "se", "sample_size"]
# Get indicies where the XNOR is true (i.e. both are true or both are false).
masks = [
~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING))
for column in columns
]
for mask in masks:
if not logger is None and df.loc[mask].size > 0:
logger.info(
"Filtering contradictory missing code in " +
"{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d"))
)
df = df.loc[~mask]
elif logger is None and df.loc[mask].size > 0:
df = df.loc[~mask]
return df

def create_export_csv(
df: pd.DataFrame,
export_dir: str,
Expand All @@ -18,6 +40,7 @@ def create_export_csv(
end_date: Optional[datetime] = None,
remove_null_samples: Optional[bool] = False,
write_empty_days: Optional[bool] = False,
logger: Optional[logging.Logger] = None,
weekly_dates = False,
):
"""Export data in the format expected by the Delphi API.
Expand Down Expand Up @@ -45,6 +68,8 @@ def create_export_csv(
write_empty_days: Optional[bool]
If true, every day in between start_date and end_date will have a CSV file written
even if there is no data for the day. If false, only the days present are written.
logger: Optional[logging.Logger]
Pass a logger object here to log information about contradictory missing codes.

Returns
---------
Expand Down Expand Up @@ -77,7 +102,20 @@ def create_export_csv(
else:
export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv"
export_file = join(export_dir, export_filename)
export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]]
expected_columns = [
"geo_id",
"val",
"se",
"sample_size",
"missing_val",
"missing_se",
"missing_sample_size"
]
export_df = df[df["timestamp"] == date].filter(items=expected_columns)
if "missing_val" in export_df.columns:
export_df = filter_contradicting_missing_codes(
export_df, sensor, metric, date, logger=logger
)
if remove_null_samples:
export_df = export_df[export_df["sample_size"].notnull()]
export_df = export_df.round({"val": 7, "se": 7})
Expand Down
142 changes: 115 additions & 27 deletions _delphi_utils_python/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,64 @@

from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\
archiver_from_params
from delphi_utils.nancodes import Nans

CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float}
CSV_DTYPES = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se":int, "missing_sample_size": int
}

CSVS_BEFORE = {
# Common
"csv0": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.000000001, 2.00000002, 3.00000003],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [np.nan, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Deleted
"csv2": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]}),
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but updated with missing columns
"csv4": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),

# Common, but missing columns removed
"csv5": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),
}

CSVS_AFTER = {
Expand All @@ -45,23 +80,53 @@
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.10000001, 0.20000002, 0.30000003],
"sample_size": [10.0, 20.0, 30.0]}),
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

"csv1": pd.DataFrame({
"geo_id": ["1", "2", "4"],
"val": [1.0, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [10.0, 21.0, 40.0]}),
"sample_size": [10.0, 21.0, 40.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
}),

# Added
"csv3": pd.DataFrame({
"geo_id": ["2"],
"val": [2.0000002],
"se": [0.2],
"sample_size": [20.0]}),
"sample_size": [20.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but updated with missing columns
"csv4": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0],
"missing_val": [Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING],
}),

# Common, but missing columns removed
"csv5": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),
}


class TestArchiveDiffer:

def test_stubs(self):
Expand All @@ -80,10 +145,14 @@ def test_diff_and_filter_exports(self, tmp_path):
mkdir(export_dir)

csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})

arch_diff = ArchiveDiffer(cache_dir, export_dir)

Expand All @@ -106,15 +175,18 @@ def test_diff_and_filter_exports(self, tmp_path):
# Check return values
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
assert set(common_diffs.keys()) == {
join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]}
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv"]}
assert set(new_files) == {join(export_dir, "csv3.csv")}
assert common_diffs[join(export_dir, "csv0.csv")] is None
assert common_diffs[join(export_dir, "csv1.csv")] == join(
export_dir, "csv1.csv.diff")

# Check filesystem for actual files
assert set(listdir(export_dir)) == {
"csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"}
"csv0.csv", "csv1.csv", "csv1.csv.diff",
"csv3.csv", "csv4.csv", "csv4.csv.diff",
"csv5.csv", "csv5.csv.diff"
}
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
csv1_diff)
Expand All @@ -132,7 +204,7 @@ def test_diff_and_filter_exports(self, tmp_path):
arch_diff.filter_exports(common_diffs)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"}
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -259,12 +331,16 @@ def test_run(self, tmp_path, s3_client):
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down Expand Up @@ -346,7 +422,11 @@ def test_diff_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# Write exact same CSV into cache and export, so no diffs expected
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -383,7 +463,11 @@ def test_archive_exports(self, tmp_path):
"geo_id": ["1", "2", "3"],
"val": [1.0, 2.0, 3.0],
"se": [0.1, 0.2, 0.3],
"sample_size": [10.0, 20.0, 30.0]})
"sample_size": [10.0, 20.0, 30.0],
"missing_val": [Nans.NOT_MISSING] * 3,
"missing_se": [Nans.NOT_MISSING] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 3,
})

# csv1.csv is now a dirty edit in the repo, and to be exported too
csv1.to_csv(join(cache_dir, "csv1.csv"), index=False)
Expand Down Expand Up @@ -460,12 +544,16 @@ def test_run(self, tmp_path):
original_branch.checkout()

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["2", "4"],
"val": [2.1, 4.0],
"se": [0.21, np.nan],
"sample_size": [21.0, 40.0]})
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
Expand Down
Loading