diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..eb8aac8d2 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -414,6 +428,9 @@ def archive_exports(self, # pylint: disable=arguments-differ archive_success.append(exported_file) except FileNotFoundError: archive_fail.append(exported_file) + except shutil.SameFileError: + # no need to copy if the cached file is the same + archive_success.append(exported_file) self._exports_archived = True diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index cb96cfa33..f62a1afcf 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,11 +3,33 @@ from datetime import datetime from os.path import join from typing import Optional +import logging from epiweeks import Week import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -18,6 +40,7 @@ def create_export_csv( end_date: Optional[datetime] = None, remove_null_samples: Optional[bool] = False, write_empty_days: Optional[bool] = False, + logger: Optional[logging.Logger] = None, weekly_dates = False, ): """Export data in the format expected by the Delphi API. @@ -45,6 +68,8 @@ def create_export_csv( write_empty_days: Optional[bool] If true, every day in between start_date and end_date will have a CSV file written even if there is no data for the day. If false, only the days present are written. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -77,7 +102,20 @@ def create_export_csv( else: export_filename = f"{date_str}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..3050908f2 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -14,8 +14,12 @@ from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ archiver_from_params +from delphi_utils.nancodes import Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +27,51 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), + + # Common, but missing columns removed + "csv5": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } CSVS_AFTER = { @@ -45,23 +80,53 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but missing columns removed + "csv5": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } - class TestArchiveDiffer: def test_stubs(self): @@ -80,10 +145,14 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -106,7 +175,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +183,10 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", + "csv3.csv", "csv4.csv", "csv4.csv.diff", + "csv5.csv", "csv5.csv.diff" + } assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -132,7 +204,7 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,12 +331,16 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -346,7 +422,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +463,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,12 +544,16 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..d9906300d 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,12 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans + def _clean_directory(directory): """Clean files out of a directory.""" @@ -26,6 +30,7 @@ def _non_ignored_files_set(directory): class TestExport: """Tests for exporting CSVs.""" + # List of times for data points. TIMES = [ datetime.strptime(x, "%Y-%m-%d") @@ -43,6 +48,54 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [ + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + Nans.NOT_MISSING, + ], + "missing_se": [ + Nans.NOT_MISSING, + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + ], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER], + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [ + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + Nans.NOT_MISSING, + ], + "missing_se": [ + Nans.NOT_MISSING, + Nans.NOT_MISSING, + Nans.OTHER, + Nans.NOT_MISSING, + ], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER], + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -85,10 +138,14 @@ def test_export_rounding(self): ) pd.testing.assert_frame_equal( pd.read_csv(join(self.TEST_DIR, "20200215_county_deaths_test.csv")), - pd.DataFrame({"geo_id": [51093, 51175], - "val": [round(3.12345678910, 7), 2.1], - "se": [0.15, 0.22], - "sample_size": [100, 100]}) + pd.DataFrame( + { + "geo_id": [51093, 51175], + "val": [round(3.12345678910, 7), 2.1], + "se": [0.15, 0.22], + "sample_size": [100, 100], + } + ), ) def test_export_without_metric(self): @@ -180,13 +237,16 @@ def test_export_with_null_removal(self): """Test that `remove_null_samples = True` removes entries with null samples.""" _clean_directory(self.TEST_DIR) - df_with_nulls = self.DF.copy().append({ - "geo_id": "66666", - "timestamp": datetime(2020, 6, 6), - "val": 10, - "se": 0.2, - "sample_size": pd.NA}, - ignore_index=True) + df_with_nulls = self.DF.copy().append( + { + "geo_id": "66666", + "timestamp": datetime(2020, 6, 6), + "val": 10, + "se": 0.2, + "sample_size": pd.NA, + }, + ignore_index=True, + ) create_export_csv( df=df_with_nulls, @@ -210,13 +270,16 @@ def test_export_without_null_removal(self): """Test that `remove_null_samples = False` does not remove entries with null samples.""" _clean_directory(self.TEST_DIR) - df_with_nulls = self.DF.copy().append({ - "geo_id": "66666", - "timestamp": datetime(2020, 6, 6), - "val": 10, - "se": 0.2, - "sample_size": pd.NA}, - ignore_index=True) + df_with_nulls = self.DF.copy().append( + { + "geo_id": "66666", + "timestamp": datetime(2020, 6, 6), + "val": 10, + "se": 0.2, + "sample_size": pd.NA, + }, + ignore_index=True, + ) create_export_csv( df=df_with_nulls, @@ -235,3 +298,77 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_without_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF.copy(), export_dir=self.TEST_DIR, geo_res="county", sensor="test" + ) + df = pd.read_csv(join(self.TEST_DIR, "20200215_county_test.csv")).astype( + {"geo_id": str, "sample_size": int} + ) + expected_df = pd.DataFrame( + { + "geo_id": ["51093", "51175"], + "val": [3.12345678910, 2.1], + "se": [0.15, 0.22], + "sample_size": [100, 100], + } + ).astype({"geo_id": str, "sample_size": int}) + pd.testing.assert_frame_equal(df, expected_df) + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="county", + sensor="test", + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_county_test.csv", + "20200301_county_test.csv", + "20200315_county_test.csv", + ] + ) + df = pd.read_csv(join(self.TEST_DIR, "20200215_county_test.csv")).astype( + {"geo_id": str, "sample_size": int} + ) + expected_df = pd.DataFrame( + { + "geo_id": ["51093", "51175"], + "val": [3.12345678910, np.nan], + "se": [0.15, 0.22], + "sample_size": [100, 100], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER], + "missing_se": [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.NOT_MISSING] * 2, + } + ).astype({"geo_id": str, "sample_size": int}) + pd.testing.assert_frame_equal(df, expected_df) + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + )