From 08c9bd2884e83360d52b5daaa00a0461007747a5 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 10 Feb 2021 13:32:17 -0800 Subject: [PATCH 01/13] Update utilities for NAN codes: * update export utility to export, validate, and test the missing cols * add deletion coding to the archiver, make it expect missing cols, and let it handle comparisons between missing and non-missing CSVs --- _delphi_utils_python/delphi_utils/archive.py | 30 +++-- _delphi_utils_python/delphi_utils/export.py | 42 ++++++- _delphi_utils_python/tests/test_archive.py | 121 ++++++++++++++----- _delphi_utils_python/tests/test_export.py | 76 +++++++++++- usafacts/delphi_usafacts/run.py | 22 +++- 5 files changed, 249 insertions(+), 42 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..5c73d7a2f 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 18a6a1885..1b5c3733a 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,32 @@ from datetime import datetime from os.path import join from typing import Optional +import logging import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -15,7 +37,8 @@ def create_export_csv( metric: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, - remove_null_samples: Optional[bool] = False + remove_null_samples: Optional[bool] = False, + logger: Optional[logging.Logger] = None ): """Export data in the format expected by the Delphi API. @@ -39,6 +62,8 @@ def create_export_csv( Latest date to export or None if no maximum date restrictions should be applied. remove_null_samples: Optional[bool] Whether to remove entries whose sample sizes are null. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -64,7 +89,20 @@ def create_export_csv( else: export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..bf26da278 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -13,9 +13,12 @@ import pytest from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ - archiver_from_params + archiver_from_params, Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +26,40 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } CSVS_AFTER = { @@ -45,23 +68,45 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } - class TestArchiveDiffer: def test_stubs(self): @@ -80,10 +125,14 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -106,7 +155,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +163,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv", "csv4.csv", "csv4.csv.diff"} assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -132,7 +181,7 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"} assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,12 +308,16 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -346,7 +399,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +440,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,12 +521,16 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..aedc2933b 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,11 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans def _clean_directory(directory): """Clean files out of a directory.""" @@ -43,6 +46,34 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.UNKNOWN] + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.UNKNOWN] + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -235,3 +266,46 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False, + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) diff --git a/usafacts/delphi_usafacts/run.py b/usafacts/delphi_usafacts/run.py index adb06434c..435a8d5c6 100644 --- a/usafacts/delphi_usafacts/run.py +++ b/usafacts/delphi_usafacts/run.py @@ -14,7 +14,8 @@ create_export_csv, get_structured_logger, S3ArchiveDiffer, - Smoother + Smoother, + Nans ) from .geo import geo_map @@ -116,8 +117,23 @@ def run_module(params: Dict[str, Dict[str, Any]]): ) df["se"] = np.nan df["sample_size"] = np.nan - # Drop early entries where data insufficient for smoothing - df = df.loc[~df["val"].isnull(), :] + + # Default missing code + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.NOT_APPLICABLE + df["missing_sample_size"] = Nans.NOT_APPLICABLE + + # Mark early smoothing entries as data insufficient + if smoother == "seven_day_average": + df.sort_index(inplace=True) + min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) + df.loc[idx[:min_time_value, :], "missing_val"] = Nans.DATA_INSUFFICIENT + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING) + df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN + + df.reset_index(inplace=True) sensor_name = SENSOR_NAME_MAP[sensor][0] # if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]): # metric = f"wip_{metric}" From 6a2c215f4539eb4d6a9bec54b55f119a2413e05d Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Thu, 18 Feb 2021 13:53:21 -0800 Subject: [PATCH 02/13] NANs USAFacts: * keep nan values in "val" column * mark early smoothing data as "data insufficient" * add missing column outputs --- usafacts/delphi_usafacts/run.py | 53 ++++++++++++++++++++------------- usafacts/tests/test_run.py | 15 ++++++++-- 2 files changed, 45 insertions(+), 23 deletions(-) diff --git a/usafacts/delphi_usafacts/run.py b/usafacts/delphi_usafacts/run.py index 435a8d5c6..f5f20013a 100644 --- a/usafacts/delphi_usafacts/run.py +++ b/usafacts/delphi_usafacts/run.py @@ -9,6 +9,7 @@ from itertools import product from typing import Dict, Any +import pandas as pd import numpy as np from delphi_utils import ( create_export_csv, @@ -64,6 +65,27 @@ ] +def add_nancodes(df, smoother): + """Add nancodes to the dataframe.""" + idx = pd.IndexSlice + + # Default nancodes + df["missing_val"] = Nans.NOT_MISSING + df["missing_se"] = Nans.NOT_APPLICABLE + df["missing_sample_size"] = Nans.NOT_APPLICABLE + + # Mark early smoothing entries as data insufficient + if smoother == "seven_day_average": + df.sort_index(inplace=True) + min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) + df.loc[idx[:min_time_value, :], "missing_val"] = Nans.PRIVACY + + # Mark any remaining nans with unknown + remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING) + df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN + return df + + def run_module(params: Dict[str, Dict[str, Any]]): """Run the usafacts indicator. @@ -112,37 +134,28 @@ def run_module(params: Dict[str, Dict[str, Any]]): df = dfs[metric] # Aggregate to appropriate geographic resolution df = geo_map(df, geo_res, sensor) - df["val"] = df[["geo_id", sensor]].groupby("geo_id")[sensor].transform( - SMOOTHERS_MAP[smoother][0].smooth - ) - df["se"] = np.nan - df["sample_size"] = np.nan + df.set_index(["timestamp", "geo_id"], inplace=True) - # Default missing code - df["missing_val"] = Nans.NOT_MISSING - df["missing_se"] = Nans.NOT_APPLICABLE - df["missing_sample_size"] = Nans.NOT_APPLICABLE + # Smooth + smooth_obj, smoother_prefix, _, smoother_lag = SMOOTHERS_MAP[smoother] + df["val"] = df[sensor].groupby(level=1).transform(smooth_obj.smooth) - # Mark early smoothing entries as data insufficient - if smoother == "seven_day_average": - df.sort_index(inplace=True) - min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) - df.loc[idx[:min_time_value, :], "missing_val"] = Nans.DATA_INSUFFICIENT + # USAFacts is not a survey indicator + df["se"] = np.nan + df["sample_size"] = np.nan - # Mark any remaining nans with unknown - remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING) - df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN + df = add_nancodes(df, smoother) df.reset_index(inplace=True) sensor_name = SENSOR_NAME_MAP[sensor][0] - # if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]): + # if (SENSOR_NAME_MAP[sensor][1] or is_smooth_wip): # metric = f"wip_{metric}" # sensor_name = WIP_SENSOR_NAME_MAP[sensor][0] - sensor_name = SMOOTHERS_MAP[smoother][1] + sensor_name + sensor_name = smoother_prefix + sensor_name exported_csv_dates = create_export_csv( df, export_dir=export_dir, - start_date=SMOOTHERS_MAP[smoother][3](export_start_date), + start_date=smoother_lag(export_start_date), metric=metric, geo_res=geo_res, sensor=sensor_name, diff --git a/usafacts/tests/test_run.py b/usafacts/tests/test_run.py index d22a514ca..bb51eba4a 100644 --- a/usafacts/tests/test_run.py +++ b/usafacts/tests/test_run.py @@ -52,8 +52,6 @@ def test_output_files_exist(self): for date in dates: for geo in geos: for metric in metrics: - if "7dav" in metric and date in dates[:6]: - continue # there are no 7dav signals for first 6 days expected_files += [date + "_" + geo + "_" + metric + ".csv"] assert set(csv_files) == set(expected_files) @@ -65,4 +63,15 @@ def test_output_file_format(self): df = pd.read_csv( join("receiving", "20200310_state_confirmed_cumulative_num.csv") ) - assert (df.columns.values == ["geo_id", "val", "se", "sample_size"]).all() + assert ( + df.columns.values + == [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size", + ] + ).all() From de3abd66a94c61a06c12069b32aee426a23eac12 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 10 Feb 2021 13:32:17 -0800 Subject: [PATCH 03/13] Update utilities for NAN codes: * update export utility to export, validate, and test the missing cols * handle deleted rows: replaced with nan values * handle deleted files: replace with an empty CSV file * handle comparisons between CSVs with/without missing cols --- _delphi_utils_python/delphi_utils/archive.py | 50 ++++-- _delphi_utils_python/delphi_utils/export.py | 42 ++++- _delphi_utils_python/tests/test_archive.py | 172 ++++++++++++++++--- _delphi_utils_python/tests/test_export.py | 76 +++++++- 4 files changed, 296 insertions(+), 44 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..55b507b70 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -240,6 +254,15 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: new_issues_df.to_csv(diff_file, na_rep="NA") common_diffs[after_file] = diff_file + # Replace deleted files with empty versions, but only if the cached version is not + # already empty + for deleted_file in deleted_files: + deleted_df = pd.read_csv(deleted_file) + if not deleted_df.empty: + empty_df = deleted_df[0:0] + new_deleted_filename = join(self.export_dir, basename(deleted_file)) + empty_df.to_csv(new_deleted_filename, index=False) + return deleted_files, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: @@ -266,9 +289,10 @@ def filter_exports(self, common_diffs: FileDiffMap): Filter export directory to only contain relevant files. Filters down the export_dir to only contain: - 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only. - Should be called after archive_exports() so we archive the raw exports before - potentially modifying them. + 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows + only, and 3) Deleted files replaced with empty CSVs with the same name. Should + be called after archive_exports() so we archive the raw exports before potentially + modifying them. Parameters ---------- @@ -297,9 +321,9 @@ def run(self): self.update_cache() # Diff exports, and make incremental versions - _, common_diffs, new_files = self.diff_exports() + deleted_files, common_diffs, new_files = self.diff_exports() - # Archive changed and new files only + # Archive changed, new, and emptied deleted files to_archive = [f for f, diff in common_diffs.items() if diff is not None] to_archive += new_files diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 18a6a1885..1b5c3733a 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,32 @@ from datetime import datetime from os.path import join from typing import Optional +import logging import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -15,7 +37,8 @@ def create_export_csv( metric: Optional[str] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, - remove_null_samples: Optional[bool] = False + remove_null_samples: Optional[bool] = False, + logger: Optional[logging.Logger] = None ): """Export data in the format expected by the Delphi API. @@ -39,6 +62,8 @@ def create_export_csv( Latest date to export or None if no maximum date restrictions should be applied. remove_null_samples: Optional[bool] Whether to remove entries whose sample sizes are null. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -64,7 +89,20 @@ def create_export_csv( else: export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..8091eddcc 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -13,9 +13,12 @@ import pytest from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ - archiver_from_params + archiver_from_params, Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +26,40 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } CSVS_AFTER = { @@ -45,20 +68,43 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } @@ -80,10 +126,27 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) + + csv2_deleted = pd.DataFrame( + np.empty(0, dtype=[ + ("geo_id", str), + ("val", float), + ("se", float), + ("sample_size", float), + ("missing_val", int), + ("missing_se", int), + ("missing_sample_size", int) + ]), + index=[] + ) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -106,7 +169,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +177,10 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", + "csv3.csv", "csv4.csv", "csv4.csv.diff", + "csv2.csv" + } assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -131,8 +197,11 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) - # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + # Check exports directory just has incremental and deleted changes + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,15 +328,34 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame( + np.empty(0, dtype=[ + ("geo_id", str), + ("val", float), + ("se", float), + ("sample_size", float), + ("missing_val", int), + ("missing_se", int), + ("missing_sample_size", int) + ]), + index=[] + ) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) class TestGitArchiveDiffer: @@ -346,7 +434,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +475,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,15 +556,35 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame( + np.empty(0, dtype=[ + ("geo_id", str), + ("val", float), + ("se", float), + ("sample_size", float), + ("missing_val", int), + ("missing_se", int), + ("missing_sample_size", int) + ]), + index=[] + ) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) + class TestFromParams: diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..aedc2933b 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,11 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans def _clean_directory(directory): """Clean files out of a directory.""" @@ -43,6 +46,34 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.UNKNOWN] + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.UNKNOWN, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.UNKNOWN] + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -235,3 +266,46 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False, + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) From 1cfc192f0450766fc1a43c2ea0d5a9d2e9b0b9a0 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 3 May 2021 17:29:16 -0700 Subject: [PATCH 04/13] Fix: test archiver import --- _delphi_utils_python/tests/test_archive.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 8091eddcc..b3ab8d47f 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -13,7 +13,8 @@ import pytest from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ - archiver_from_params, Nans + archiver_from_params +from delphi_utils.nancodes import Nans CSV_DTYPES = { "geo_id": str, "val": float, "se": float, "sample_size": float, From 8bec0fc55e264e6dbecee97f1812c251a4cdb32c Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 3 May 2021 17:34:14 -0700 Subject: [PATCH 05/13] Fix: archive deletion handling --- _delphi_utils_python/delphi_utils/archive.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 55b507b70..31b88a1d1 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -259,6 +259,8 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: for deleted_file in deleted_files: deleted_df = pd.read_csv(deleted_file) if not deleted_df.empty: + print( + f"Diff has deleted {deleted_file} and replaced it with an empty CSV.") empty_df = deleted_df[0:0] new_deleted_filename = join(self.export_dir, basename(deleted_file)) empty_df.to_csv(new_deleted_filename, index=False) From 8c752d93dc28e617f1b6f0bda32e30a67a6e0816 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 10 Feb 2021 13:32:17 -0800 Subject: [PATCH 06/13] Update utilities for NAN codes: * update export utility to export, validate, and test the missing cols * handle deleted rows: replaced with nan values * handle deleted files: replace with an empty CSV file * handle comparisons between CSVs with/without missing cols --- _delphi_utils_python/delphi_utils/archive.py | 52 ++++-- _delphi_utils_python/delphi_utils/export.py | 42 ++++- _delphi_utils_python/tests/test_archive.py | 172 ++++++++++++++++--- _delphi_utils_python/tests/test_export.py | 76 +++++++- 4 files changed, 298 insertions(+), 44 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 5d1036bcd..31b88a1d1 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -40,9 +40,11 @@ from git import Repo from git.refs.head import Head import pandas as pd +import numpy as np from .utils import read_params from .logger import get_structured_logger +from .nancodes import Nans Files = List[str] FileDiffMap = Dict[str, Optional[str]] @@ -73,8 +75,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = {"geo_id": str, "val": float, - "se": float, "sample_size": float} + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) before_df.set_index("geo_id", inplace=True) @@ -89,12 +93,22 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # Exact comparisons, treating NA == NA as True - same_mask = before_df_cmn == after_df_cmn - same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + # If CSVs have different columns (no missingness), mark all values as new + if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + same_mask = after_df_cmn.copy() + same_mask.loc[:] = False + else: + # Exact comparisons, treating NA == NA as True + same_mask = before_df_cmn == after_df_cmn + same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) + + # Code deleted entries as nans with the deleted missing code + deleted_df = before_df.loc[deleted_idx, :].copy() + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( - before_df.loc[deleted_idx, :], + deleted_df, after_df_cmn.loc[~(same_mask.all(axis=1)), :], after_df.loc[added_idx, :]) @@ -227,11 +241,11 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: deleted_df, changed_df, added_df = diff_export_csv( before_file, after_file) - new_issues_df = pd.concat([changed_df, added_df], axis=0) + new_issues_df = pd.concat([deleted_df, changed_df, added_df], axis=0) if len(deleted_df) > 0: print( - f"Warning, diff has deleted indices in {after_file} that will be ignored") + f"Diff has deleted indices in {after_file} that have been coded as nans.") # Write the diffs to diff_file, if applicable if len(new_issues_df) > 0: @@ -240,6 +254,17 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: new_issues_df.to_csv(diff_file, na_rep="NA") common_diffs[after_file] = diff_file + # Replace deleted files with empty versions, but only if the cached version is not + # already empty + for deleted_file in deleted_files: + deleted_df = pd.read_csv(deleted_file) + if not deleted_df.empty: + print( + f"Diff has deleted {deleted_file} and replaced it with an empty CSV.") + empty_df = deleted_df[0:0] + new_deleted_filename = join(self.export_dir, basename(deleted_file)) + empty_df.to_csv(new_deleted_filename, index=False) + return deleted_files, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: @@ -266,9 +291,10 @@ def filter_exports(self, common_diffs: FileDiffMap): Filter export directory to only contain relevant files. Filters down the export_dir to only contain: - 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows only. - Should be called after archive_exports() so we archive the raw exports before - potentially modifying them. + 1) New files, 2) Changed files, filtered-down to the ADDED and CHANGED rows + only, and 3) Deleted files replaced with empty CSVs with the same name. Should + be called after archive_exports() so we archive the raw exports before potentially + modifying them. Parameters ---------- @@ -297,9 +323,9 @@ def run(self): self.update_cache() # Diff exports, and make incremental versions - _, common_diffs, new_files = self.diff_exports() + deleted_files, common_diffs, new_files = self.diff_exports() - # Archive changed and new files only + # Archive changed, new, and emptied deleted files to_archive = [f for f, diff in common_diffs.items() if diff is not None] to_archive += new_files diff --git a/_delphi_utils_python/delphi_utils/export.py b/_delphi_utils_python/delphi_utils/export.py index 5a3b804b2..afc1a4c8a 100644 --- a/_delphi_utils_python/delphi_utils/export.py +++ b/_delphi_utils_python/delphi_utils/export.py @@ -3,10 +3,32 @@ from datetime import datetime from os.path import join from typing import Optional +import logging import numpy as np import pandas as pd +from .nancodes import Nans + +def filter_contradicting_missing_codes(df, sensor, metric, date, logger=None): + """Find values with contradictory missingness codes, filter them, and log.""" + columns = ["val", "se", "sample_size"] + # Get indicies where the XNOR is true (i.e. both are true or both are false). + masks = [ + ~(df[column].isna() ^ df["missing_" + column].eq(Nans.NOT_MISSING)) + for column in columns + ] + for mask in masks: + if not logger is None and df.loc[mask].size > 0: + logger.info( + "Filtering contradictory missing code in " + + "{0}_{1}_{2}.".format(sensor, metric, date.strftime(format="%Y-%m-%d")) + ) + df = df.loc[~mask] + elif logger is None and df.loc[mask].size > 0: + df = df.loc[~mask] + return df + def create_export_csv( df: pd.DataFrame, export_dir: str, @@ -16,7 +38,8 @@ def create_export_csv( start_date: Optional[datetime] = None, end_date: Optional[datetime] = None, remove_null_samples: Optional[bool] = False, - write_empty_days: Optional[bool] = False + write_empty_days: Optional[bool] = False, + logger: Optional[logging.Logger] = None ): """Export data in the format expected by the Delphi API. @@ -43,6 +66,8 @@ def create_export_csv( write_empty_days: Optional[bool] If true, every day in between start_date and end_date will have a CSV file written even if there is no data for the day. If false, only the days present are written. + logger: Optional[logging.Logger] + Pass a logger object here to log information about contradictory missing codes. Returns --------- @@ -70,7 +95,20 @@ def create_export_csv( else: export_filename = f"{date.strftime('%Y%m%d')}_{geo_res}_{metric}_{sensor}.csv" export_file = join(export_dir, export_filename) - export_df = df[df["timestamp"] == date][["geo_id", "val", "se", "sample_size",]] + expected_columns = [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size" + ] + export_df = df[df["timestamp"] == date].filter(items=expected_columns) + if "missing_val" in export_df.columns: + export_df = filter_contradicting_missing_codes( + export_df, sensor, metric, date, logger=logger + ) if remove_null_samples: export_df = export_df[export_df["sample_size"].notnull()] export_df = export_df.round({"val": 7, "se": 7}) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 1b068f898..4c7d1fc57 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -14,8 +14,12 @@ from delphi_utils.archive import ArchiveDiffer, GitArchiveDiffer, S3ArchiveDiffer,\ archiver_from_params +from delphi_utils.nancodes import Nans -CSV_DTYPES = {"geo_id": str, "val": float, "se": float, "sample_size": float} +CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se":int, "missing_sample_size": int + } CSVS_BEFORE = { # Common @@ -23,20 +27,40 @@ "geo_id": ["1", "2", "3"], "val": [1.000000001, 2.00000002, 3.00000003], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [np.nan, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Deleted "csv2": pd.DataFrame({ "geo_id": ["1"], "val": [1.0], "se": [0.1], - "sample_size": [10.0]}), + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0] + }), } CSVS_AFTER = { @@ -45,23 +69,45 @@ "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.10000001, 0.20000002, 0.30000003], - "sample_size": [10.0, 20.0, 30.0]}), + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), "csv1": pd.DataFrame({ "geo_id": ["1", "2", "4"], "val": [1.0, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], - "sample_size": [10.0, 21.0, 40.0]}), + "sample_size": [10.0, 21.0, 40.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }), # Added "csv3": pd.DataFrame({ "geo_id": ["2"], "val": [2.0000002], "se": [0.2], - "sample_size": [20.0]}), + "sample_size": [20.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), + + # Common, but updated with missing columns + "csv4": pd.DataFrame({ + "geo_id": ["1"], + "val": [1.0], + "se": [0.1], + "sample_size": [10.0], + "missing_val": [Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING], + }), } - class TestArchiveDiffer: def test_stubs(self): @@ -80,10 +126,27 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(export_dir) csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) + + csv2_deleted = pd.DataFrame( + np.empty(0, dtype=[ + ("geo_id", str), + ("val", float), + ("se", float), + ("sample_size", float), + ("missing_val", int), + ("missing_se", int), + ("missing_sample_size", int) + ]), + index=[] + ) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -106,7 +169,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -114,7 +177,10 @@ def test_diff_and_filter_exports(self, tmp_path): # Check filesystem for actual files assert set(listdir(export_dir)) == { - "csv0.csv", "csv1.csv", "csv1.csv.diff", "csv3.csv"} + "csv0.csv", "csv1.csv", "csv1.csv.diff", + "csv3.csv", "csv4.csv", "csv4.csv.diff", + "csv2.csv" + } assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), csv1_diff) @@ -131,8 +197,11 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) - # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + # Check exports directory just has incremental and deleted changes + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) @@ -259,15 +328,34 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame( + np.empty(0, dtype=[ + ("geo_id", str), + ("val", float), + ("se", float), + ("sample_size", float), + ("missing_val", int), + ("missing_se", int), + ("missing_sample_size", int) + ]), + index=[] + ) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) class TestGitArchiveDiffer: @@ -346,7 +434,11 @@ def test_diff_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -383,7 +475,11 @@ def test_archive_exports(self, tmp_path): "geo_id": ["1", "2", "3"], "val": [1.0, 2.0, 3.0], "se": [0.1, 0.2, 0.3], - "sample_size": [10.0, 20.0, 30.0]}) + "sample_size": [10.0, 20.0, 30.0], + "missing_val": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.NOT_MISSING] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 3, + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -460,15 +556,35 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv2.csv", "csv3.csv", "csv4.csv"} csv1_diff = pd.DataFrame({ - "geo_id": ["2", "4"], - "val": [2.1, 4.0], - "se": [0.21, np.nan], - "sample_size": [21.0, 40.0]}) + "geo_id": ["3", "2", "4"], + "val": [np.nan, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [np.nan, 21.0, 40.0], + "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) + csv2_deleted = pd.DataFrame( + np.empty(0, dtype=[ + ("geo_id", str), + ("val", float), + ("se", float), + ("sample_size", float), + ("missing_val", int), + ("missing_se", int), + ("missing_sample_size", int) + ]), + index=[] + ) + assert_frame_equal( + pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), + csv2_deleted) + class TestFromParams: diff --git a/_delphi_utils_python/tests/test_export.py b/_delphi_utils_python/tests/test_export.py index 31ec5c113..b22a710cd 100644 --- a/_delphi_utils_python/tests/test_export.py +++ b/_delphi_utils_python/tests/test_export.py @@ -3,8 +3,11 @@ from os import listdir, remove from os.path import join +import mock +import numpy as np import pandas as pd -from delphi_utils import create_export_csv + +from delphi_utils import create_export_csv, Nans def _clean_directory(directory): """Clean files out of a directory.""" @@ -43,6 +46,34 @@ class TestExport: } ) + # A sample data frame with missingness. + DF2 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [3.12345678910, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER] + } + ) + + # A sample data frame with contradictory missing codes. + DF3 = pd.DataFrame( + { + "geo_id": ["51093", "51175", "51175", "51620"], + "timestamp": TIMES, + "val": [np.nan, np.nan, 2.2, 2.6], + "se": [0.15, 0.22, np.nan, 0.34], + "sample_size": [100, 100, 101, None], + "missing_val": [Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.NOT_MISSING, Nans.OTHER, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING] * 3 + [Nans.OTHER] + } + ) + # Directory in which to store tests. TEST_DIR = "test_dir" @@ -235,3 +266,46 @@ def test_export_without_null_removal(self): ] ) assert pd.read_csv(join(self.TEST_DIR, "20200606_state_test.csv")).size > 0 + + def test_export_df_with_missingness(self): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF2.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + + @mock.patch("delphi_utils.logger") + def test_export_df_with_contradictory_missingness(self, mock_logger): + _clean_directory(self.TEST_DIR) + + create_export_csv( + df=self.DF3.copy(), + export_dir=self.TEST_DIR, + geo_res="state", + sensor="test", + remove_null_samples=False, + logger=mock_logger + ) + assert _non_ignored_files_set(self.TEST_DIR) == set( + [ + "20200215_state_test.csv", + "20200301_state_test.csv", + "20200315_state_test.csv", + ] + ) + assert pd.read_csv(join(self.TEST_DIR, "20200315_state_test.csv")).size > 0 + mock_logger.info.assert_called_once_with( + "Filtering contradictory missing code in test_None_2020-02-15." + ) From 83cb333f9965ccd06afe954eba5443b84920b9b4 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 12 May 2021 14:29:50 -0700 Subject: [PATCH 07/13] Nans: update archiver deletion handling --- _delphi_utils_python/delphi_utils/archive.py | 6 +++++- _delphi_utils_python/tests/test_archive.py | 1 + 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 31b88a1d1..0eb302e7d 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -256,7 +256,9 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: # Replace deleted files with empty versions, but only if the cached version is not # already empty + new_deleted_files = [] for deleted_file in deleted_files: + breakpoint() deleted_df = pd.read_csv(deleted_file) if not deleted_df.empty: print( @@ -264,8 +266,9 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: empty_df = deleted_df[0:0] new_deleted_filename = join(self.export_dir, basename(deleted_file)) empty_df.to_csv(new_deleted_filename, index=False) + new_deleted_files.append(deleted_file) - return deleted_files, common_diffs, new_files + return new_deleted_files, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: """ @@ -329,6 +332,7 @@ def run(self): to_archive = [f for f, diff in common_diffs.items() if diff is not None] to_archive += new_files + to_archive += deleted_files _, fails = self.archive_exports(to_archive) # Filter existing exports to exclude those that failed to archive diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 4c7d1fc57..092d4d8a8 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -298,6 +298,7 @@ def test_run(self, tmp_path, s3_client): export_dir = join(str(tmp_path), "export") mkdir(cache_dir) mkdir(export_dir) + breakpoint() # Set up current buckets to be `CSVS_BEFORE`. s3_client.create_bucket(Bucket=self.bucket_name) From cd140b153c92e2deeb39f41d1969c82e783d81bb Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Thu, 13 May 2021 15:51:13 -0700 Subject: [PATCH 08/13] Nans: update archiver deletion --- _delphi_utils_python/delphi_utils/archive.py | 32 ++++++---- _delphi_utils_python/tests/test_archive.py | 66 +++++++++----------- 2 files changed, 48 insertions(+), 50 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 0eb302e7d..ce6553018 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -254,21 +254,26 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: new_issues_df.to_csv(diff_file, na_rep="NA") common_diffs[after_file] = diff_file + export_csv_dtypes = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": int, "missing_se": int, "missing_sample_size": int + } + # Replace deleted files with empty versions, but only if the cached version is not # already empty - new_deleted_files = [] + deleted_files_export = [] for deleted_file in deleted_files: - breakpoint() - deleted_df = pd.read_csv(deleted_file) - if not deleted_df.empty: - print( - f"Diff has deleted {deleted_file} and replaced it with an empty CSV.") - empty_df = deleted_df[0:0] - new_deleted_filename = join(self.export_dir, basename(deleted_file)) - empty_df.to_csv(new_deleted_filename, index=False) - new_deleted_files.append(deleted_file) - - return new_deleted_files, common_diffs, new_files + deleted_df = pd.read_csv(deleted_file, dtype=export_csv_dtypes) + print( + f"Diff has deleted {deleted_file}; generating a CSV with deleted rows." + ) + deleted_df[["val", "se", "sample_size"]] = np.nan + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED + filename = join(self.export_dir, basename(deleted_file)) + deleted_df.to_csv(filename, index=False) + deleted_files_export.append(filename) + + return deleted_files_export, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: """ @@ -444,6 +449,9 @@ def archive_exports(self, # pylint: disable=arguments-differ archive_success.append(exported_file) except FileNotFoundError: archive_fail.append(exported_file) + except shutil.SameFileError: + # no need to copy if the cached file is the same + archive_success.append(exported_file) self._exports_archived = True diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 092d4d8a8..0a21ecbb9 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -135,18 +135,15 @@ def test_diff_and_filter_exports(self, tmp_path): "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, }) - csv2_deleted = pd.DataFrame( - np.empty(0, dtype=[ - ("geo_id", str), - ("val", float), - ("se", float), - ("sample_size", float), - ("missing_val", int), - ("missing_se", int), - ("missing_sample_size", int) - ]), - index=[] - ) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -167,7 +164,7 @@ def test_diff_and_filter_exports(self, tmp_path): deleted_files, common_diffs, new_files = arch_diff.diff_exports() # Check return values - assert set(deleted_files) == {join(cache_dir, "csv2.csv")} + assert set(deleted_files) == {join(export_dir, "csv2.csv")} assert set(common_diffs.keys()) == { join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} @@ -298,7 +295,6 @@ def test_run(self, tmp_path, s3_client): export_dir = join(str(tmp_path), "export") mkdir(cache_dir) mkdir(export_dir) - breakpoint() # Set up current buckets to be `CSVS_BEFORE`. s3_client.create_bucket(Bucket=self.bucket_name) @@ -342,18 +338,15 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) - csv2_deleted = pd.DataFrame( - np.empty(0, dtype=[ - ("geo_id", str), - ("val", float), - ("se", float), - ("sample_size", float), - ("missing_val", int), - ("missing_se", int), - ("missing_sample_size", int) - ]), - index=[] - ) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), csv2_deleted) @@ -570,18 +563,15 @@ def test_run(self, tmp_path): assert_frame_equal( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff) - csv2_deleted = pd.DataFrame( - np.empty(0, dtype=[ - ("geo_id", str), - ("val", float), - ("se", float), - ("sample_size", float), - ("missing_val", int), - ("missing_se", int), - ("missing_sample_size", int) - ]), - index=[] - ) + csv2_deleted = pd.DataFrame({ + "geo_id": ["1"], + "val": [np.nan], + "se": [np.nan], + "sample_size": [np.nan], + "missing_val": [Nans.DELETED], + "missing_se": [Nans.DELETED], + "missing_sample_size": [Nans.DELETED], + }) assert_frame_equal( pd.read_csv(join(export_dir, "csv2.csv"), dtype=CSV_DTYPES), csv2_deleted) From 7a9e45cda05151c740759d3bb65e0851e1a0a438 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 31 Aug 2021 11:50:29 -0700 Subject: [PATCH 09/13] Nancodes archiver: rename variable for clarity --- _delphi_utils_python/delphi_utils/archive.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index ce6553018..994dfd7df 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -261,19 +261,19 @@ def diff_exports(self) -> Tuple[Files, FileDiffMap, Files]: # Replace deleted files with empty versions, but only if the cached version is not # already empty - deleted_files_export = [] + deleted_files_nanfilled = [] for deleted_file in deleted_files: deleted_df = pd.read_csv(deleted_file, dtype=export_csv_dtypes) print( - f"Diff has deleted {deleted_file}; generating a CSV with deleted rows." + f"Diff has deleted {deleted_file}; generating a CSV with corresponding deleted rows." ) deleted_df[["val", "se", "sample_size"]] = np.nan deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED filename = join(self.export_dir, basename(deleted_file)) deleted_df.to_csv(filename, index=False) - deleted_files_export.append(filename) + deleted_files_nanfilled.append(filename) - return deleted_files_export, common_diffs, new_files + return deleted_files_nanfilled, common_diffs, new_files def archive_exports(self, exported_files: Files) -> Tuple[Files, Files]: """ From 19e8be40494bf25e4b0e44289da93c87709af4c7 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 31 Aug 2021 12:03:22 -0700 Subject: [PATCH 10/13] Nancodes archiver: small formatting change --- _delphi_utils_python/tests/test_archive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 0a21ecbb9..111acf92f 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -133,7 +133,7 @@ def test_diff_and_filter_exports(self, tmp_path): "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - }) + }) csv2_deleted = pd.DataFrame({ "geo_id": ["1"], From 0e97f0341b2eec5482d2a62a6c215a25a3facde4 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 31 Aug 2021 13:27:14 -0700 Subject: [PATCH 11/13] Nancodes: update nancode names --- usafacts/delphi_usafacts/run.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/usafacts/delphi_usafacts/run.py b/usafacts/delphi_usafacts/run.py index 8ee235b73..5d3bdea5a 100644 --- a/usafacts/delphi_usafacts/run.py +++ b/usafacts/delphi_usafacts/run.py @@ -77,11 +77,11 @@ def add_nancodes(df, smoother): if smoother == "seven_day_average": df.sort_index(inplace=True) min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) - df.loc[idx[:min_time_value, :], "missing_val"] = Nans.PRIVACY + df.loc[idx[:min_time_value, :], "missing_val"] = Nans.CENSORED # Mark any remaining nans with unknown remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING) - df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN + df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER return df From 08bc2fe97a5662af551d15368555ec5a2e06f11a Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 31 Aug 2021 13:49:50 -0700 Subject: [PATCH 12/13] Nans usafacts: update tests --- usafacts/tests/test_run.py | 114 +++++++++++++++++++------------------ 1 file changed, 59 insertions(+), 55 deletions(-) diff --git a/usafacts/tests/test_run.py b/usafacts/tests/test_run.py index 4c3840d98..c12625b88 100644 --- a/usafacts/tests/test_run.py +++ b/usafacts/tests/test_run.py @@ -1,6 +1,7 @@ """Tests for running the USAFacts indicator.""" +import unittest from itertools import product -from os import listdir +from os import listdir, remove from os.path import join from unittest.mock import patch @@ -11,8 +12,14 @@ def local_fetch(url, cache): return pd.read_csv(url) +def clean_dir(dir_path): + """Remove csv files from a directory.""" + csv_files = [f for f in listdir(dir_path) if f.endswith(".csv")] + for f in csv_files: + remove(join(dir_path, f)) + @patch("delphi_usafacts.pull.fetch", local_fetch) -class TestRun: +class TestRun(unittest.TestCase): """Tests for the `run_module()` function.""" PARAMS = { "common": { @@ -25,62 +32,59 @@ class TestRun: } } - def test_output_files_exist(self): - """Test that the expected output files exist.""" + def test_run_module(self): + """Test that run module produces reasonable files.""" + clean_dir(self.PARAMS["common"]["export_dir"]) run_module(self.PARAMS) - csv_files = [f for f in listdir("receiving") if f.endswith(".csv")] + with self.subTest("Test that the expected output files exist."): + csv_files = [f for f in listdir("receiving") if f.endswith(".csv")] - dates = [ - "20200229", - "20200301", - "20200302", - "20200303", - "20200304", - "20200305", - "20200306", - "20200307", - "20200308", - "20200309", - "20200310", - ] - geos = ["county", "hrr", "msa", "state", "hhs", "nation"] + dates = [ + "20200229", + "20200301", + "20200302", + "20200303", + "20200304", + "20200305", + "20200306", + "20200307", + "20200308", + "20200309", + "20200310", + ] + geos = ["county", "hrr", "msa", "state", "hhs", "nation"] - # enumerate metric names. - metrics = [] - for event, span, stat in product(["deaths", "confirmed"], - ["cumulative", "incidence"], - ["num", "prop"]): - metrics.append("_".join([event, span, stat])) - metrics.append("_".join([event, "7dav", span, stat])) + # enumerate metric names. + metrics = [] + for event, span, stat in product(["deaths", "confirmed"], + ["cumulative", "incidence"], + ["num", "prop"]): + metrics.append("_".join([event, span, stat])) + metrics.append("_".join([event, "7dav", span, stat])) - expected_files = [] - for date in dates: - for geo in geos: - for metric in metrics: - if "7dav" in metric and date in dates[:6]: - continue # there are no 7dav signals for first 6 days - if "7dav" in metric and "cumulative" in metric: - continue - expected_files += [date + "_" + geo + "_" + metric + ".csv"] - assert set(csv_files) == set(expected_files) + expected_files = [] + for date in dates: + for geo in geos: + for metric in metrics: + if "7dav" in metric and "cumulative" in metric: + continue + expected_files += [date + "_" + geo + "_" + metric + ".csv"] + assert set(csv_files) == set(expected_files) - def test_output_file_format(self): - """Test that the output files have the proper format.""" - run_module(self.PARAMS) - - df = pd.read_csv( - join("receiving", "20200310_state_confirmed_cumulative_num.csv") - ) - assert ( - df.columns.values - == [ - "geo_id", - "val", - "se", - "sample_size", - "missing_val", - "missing_se", - "missing_sample_size", - ] - ).all() + with self.subTest(" Test that the output files have the proper format."): + df = pd.read_csv( + join("receiving", "20200310_state_confirmed_cumulative_num.csv") + ) + assert ( + df.columns.values + == [ + "geo_id", + "val", + "se", + "sample_size", + "missing_val", + "missing_se", + "missing_sample_size", + ] + ).all() From 251bb9319293010f024ac43941b93d33feb5780a Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Tue, 31 Aug 2021 16:24:16 -0700 Subject: [PATCH 13/13] Nancodes usafacts: fix smoothing nan filling range --- usafacts/delphi_usafacts/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/usafacts/delphi_usafacts/run.py b/usafacts/delphi_usafacts/run.py index 5d3bdea5a..7406956df 100644 --- a/usafacts/delphi_usafacts/run.py +++ b/usafacts/delphi_usafacts/run.py @@ -76,7 +76,7 @@ def add_nancodes(df, smoother): # Mark early smoothing entries as data insufficient if smoother == "seven_day_average": df.sort_index(inplace=True) - min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) + min_time_value = df.index.min()[0] + 5 * pd.Timedelta(days=1) df.loc[idx[:min_time_value, :], "missing_val"] = Nans.CENSORED # Mark any remaining nans with unknown