From 7248d697d1b3cc6f6ff8c61322b5c62926d42bb9 Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 9 Feb 2022 15:27:31 -0800 Subject: [PATCH 1/3] Fix archiver bug treating old deletions as new deletions, add many tests --- _delphi_utils_python/delphi_utils/archive.py | 11 + _delphi_utils_python/tests/test_archive.py | 214 +++++++++++++------ 2 files changed, 160 insertions(+), 65 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index a23f92516..597dc13f4 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -107,6 +107,17 @@ def diff_export_csv( deleted_df[["val", "se", "sample_size"]] = np.nan if "missing_val" in after_df_cmn.columns: deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED + + # Remove deleted entries that were already present + if deleted_idx.size > 0: + deleted_same_mask = deleted_df == before_df.loc[deleted_idx, :] + deleted_same_mask |= pd.isna(deleted_df) & pd.isna(before_df.loc[deleted_idx, :]) + deleted_df = deleted_df.loc[~(deleted_same_mask.all(axis=1)), :] + + # If the new file has no missing columns, then we should remove them from + # the deletions too + if "missing_val" not in after_df_cmn.columns: + deleted_df = deleted_df[["val", "se", "sample_size"]] return ( deleted_df, diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 3e30a3264..679a4b7ba 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -2,6 +2,7 @@ from io import StringIO, BytesIO from os import listdir, mkdir from os.path import join +from typing import List from boto3 import Session from git import Repo, exc @@ -19,7 +20,10 @@ CSV_DTYPES = { "geo_id": str, "val": float, "se": float, "sample_size": float, "missing_val": int, "missing_se": int, "missing_sample_size": int - } +} +CSV_DTYPES_OLD = { + "geo_id": str, "val": float, "se": float, "sample_size": float +} CSVS_BEFORE = { # All rows unchanged @@ -74,29 +78,29 @@ "missing_sample_size": [Nans.NOT_MISSING], }), - # All rows common, but no missing columns + # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously + # and present again as nan, row 5 deleted previously and absent from file, + # row 6 deleted previously, but new missing has different NA code + # (missing columns present) "csv6": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), - - # Row deleted and row added, but no missing columns (will not be uploaded) + "geo_id": ["1", "2", "4", "5", "6"], + "val": [1.0, 2.0, np.nan, np.nan, np.nan], + "se": [0.1, 0.2, np.nan, np.nan, np.nan], + "sample_size": [10.0, 20.0, np.nan, np.nan, np.nan], + "missing_val": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3, + "missing_se": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3, + "missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3, + }), + + # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously + # and present again as nan, row 5 deleted previously and absent from file + # (no missing columns) "csv7": pd.DataFrame({ - "geo_id": ["1", "2"], - "val": [1.0, 2.0], - "se": [0.1, 0.2], - "sample_size": [10.0, 20.0] - }), - - # Row deleted and row added, but no missing columns - "csv8": pd.DataFrame({ - "geo_id": ["1", "2"], - "val": [1.0, 2.0], - "se": [0.1, 0.2], - "sample_size": [10.0, 20.0] - }), + "geo_id": ["1", "2", "4", "5"], + "val": [1.0, 2.0, np.nan, np.nan], + "se": [0.1, 0.2, np.nan, np.nan], + "sample_size": [10.0, 20.0, np.nan, np.nan] + }) } CSVS_AFTER = { @@ -152,31 +156,41 @@ "sample_size": [10.0] }), - # All rows common, but no missing columns + # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously + # and present again as nan, row 5 deleted previously and absent from file, + # row 6 deleted previously, but new missing has different NA code + # (missing columns present) "csv6": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), - - # Row deleted and row added, but no missing columns (will not be uploaded) + "geo_id": ["1", "3", "4", "6"], + "val": [1.0, 3.0, np.nan, np.nan], + "se": [0.1, 0.3, np.nan, np.nan], + "sample_size": [10.0, 30.0, np.nan, np.nan], + "missing_val": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED], + "missing_se": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED], + "missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED], + }), + + # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously + # and present again as nan, row 5 deleted previously and absent from file + # (no missing columns) "csv7": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), - - # Row deleted and row added, but no missing columns - "csv8": pd.DataFrame({ - "geo_id": ["1", "3"], - "val": [1.0, 3.0], - "se": [0.1, 0.3], - "sample_size": [10.0, 30.0] - }), + "geo_id": ["1", "3", "4"], + "val": [1.0, 3.0, np.nan], + "se": [0.1, 0.3, np.nan], + "sample_size": [10.0, 30.0, np.nan] + }) } +import shutil +try: + shutil.rmtree("/tmp/cache") + shutil.rmtree("/tmp/export") +except: + ... + +def _assert_frames_equal_ignore_row_order(df1, df2, index_cols: List[str] =None): + return assert_frame_equal(df1.set_index(index_cols).sort_index(), df2.set_index(index_cols).sort_index()) + class TestArchiveDiffer: def test_stubs(self): @@ -194,14 +208,32 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(cache_dir) mkdir(export_dir) - csv1_diff = pd.DataFrame({ - "geo_id": ["3", "2", "4"], - "val": [np.nan, 2.1, 4.0], - "se": [np.nan, 0.21, np.nan], - "sample_size": [np.nan, 21.0, 40.0], - "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, + expected_csv1_diff = pd.DataFrame({ + "geo_id": ["2", "3", "4"], + "val": [2.1, np.nan, 4.0], + "se": [0.21, np.nan, np.nan], + "sample_size": [21.0, np.nan, 40.0], + "missing_val": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], + "missing_se": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], + "missing_sample_size": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], + }) + expected_csv3 = CSVS_AFTER["csv3"] + expected_csv4_diff = CSVS_AFTER["csv4"] + expected_csv5_diff = CSVS_AFTER["csv5"] + expected_csv6_diff = pd.DataFrame({ + "geo_id": ["2", "3", "6"], + "val": [np.nan, 3.0, np.nan], + "se": [np.nan, 0.3, np.nan], + "sample_size": [np.nan, 30.0, np.nan], + "missing_val": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED], + "missing_se": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED], + "missing_sample_size": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED], + }) + expected_csv7_diff = pd.DataFrame({ + "geo_id": ["2", "3"], + "val": [np.nan, 3.0], + "se": [np.nan, 0.3], + "sample_size": [np.nan, 30.0] }) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -225,7 +257,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv", "csv8.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -238,13 +270,34 @@ def test_diff_and_filter_exports(self, tmp_path): "csv3.csv", "csv4.csv", "csv4.csv.diff", "csv5.csv", "csv5.csv.diff", - "csv6.csv", + "csv6.csv", "csv6.csv.diff", "csv7.csv", "csv7.csv.diff", - "csv8.csv", "csv8.csv.diff" } - assert_frame_equal( + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), - csv1_diff) + expected_csv1_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv4.csv.diff"), dtype=CSV_DTYPES_OLD), + expected_csv4_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv5.csv.diff"), dtype=CSV_DTYPES_OLD), + expected_csv5_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv6.csv.diff"), dtype=CSV_DTYPES_OLD), + expected_csv6_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv7.csv.diff"), dtype=CSV_DTYPES_OLD), + expected_csv7_diff, + index_cols=["geo_id"] + ) # Test filter_exports # =================== @@ -259,10 +312,37 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"} - assert_frame_equal( + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"} + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff) + expected_csv1_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv3.csv"), dtype=CSV_DTYPES), + expected_csv3, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv4.csv"), dtype=CSV_DTYPES), + expected_csv4_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv5.csv"), dtype=CSV_DTYPES), + expected_csv5_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv6.csv"), dtype=CSV_DTYPES), + expected_csv6_diff, + index_cols=["geo_id"] + ) + _assert_frames_equal_ignore_row_order( + pd.read_csv(join(export_dir, "csv7.csv"), dtype=CSV_DTYPES), + expected_csv7_diff, + index_cols=["geo_id"] + ) AWS_CREDENTIALS = { @@ -384,7 +464,7 @@ def test_run(self, tmp_path, s3_client): assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"} csv1_diff = pd.DataFrame({ "geo_id": ["3", "2", "4"], "val": [np.nan, 2.1, 4.0], @@ -394,9 +474,11 @@ def test_run(self, tmp_path, s3_client): "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, }) - assert_frame_equal( + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff) + csv1_diff, + index_cols=["geo_id"] + ) class TestGitArchiveDiffer: @@ -596,7 +678,7 @@ def test_run(self, tmp_path): original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"} csv1_diff = pd.DataFrame({ "geo_id": ["3", "2", "4"], "val": [np.nan, 2.1, 4.0], @@ -606,9 +688,11 @@ def test_run(self, tmp_path): "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, }) - assert_frame_equal( + _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff) + csv1_diff, + index_cols=["geo_id"] + ) class TestFromParams: From 54ef6d42f3360b8795f66c39a57b41b0c0b9911e Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Wed, 9 Feb 2022 15:33:29 -0800 Subject: [PATCH 2/3] Happy linter --- _delphi_utils_python/delphi_utils/archive.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 597dc13f4..02f2134a3 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -107,7 +107,7 @@ def diff_export_csv( deleted_df[["val", "se", "sample_size"]] = np.nan if "missing_val" in after_df_cmn.columns: deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED - + # Remove deleted entries that were already present if deleted_idx.size > 0: deleted_same_mask = deleted_df == before_df.loc[deleted_idx, :] From 3e4df0461ba11d0975e31b32bd7b965bf8e5b04f Mon Sep 17 00:00:00 2001 From: Dmitry Shemetov Date: Mon, 28 Feb 2022 12:18:59 -0800 Subject: [PATCH 3/3] Code deletions properly, remove redundant tests --- _delphi_utils_python/delphi_utils/archive.py | 35 ++-- _delphi_utils_python/tests/test_archive.py | 205 ++++++------------- 2 files changed, 72 insertions(+), 168 deletions(-) diff --git a/_delphi_utils_python/delphi_utils/archive.py b/_delphi_utils_python/delphi_utils/archive.py index 02f2134a3..c805434bf 100644 --- a/_delphi_utils_python/delphi_utils/archive.py +++ b/_delphi_utils_python/delphi_utils/archive.py @@ -49,6 +49,10 @@ Files = List[str] FileDiffMap = Dict[str, Optional[str]] +EXPORT_CSV_DTYPES = { + "geo_id": str, "val": float, "se": float, "sample_size": float, + "missing_val": "Int64", "missing_se": "Int64", "missing_sample_size": "Int64" +} def diff_export_csv( before_csv: str, @@ -75,15 +79,10 @@ def diff_export_csv( changed_df is the pd.DataFrame of common rows from after_csv with changed values. added_df is the pd.DataFrame of added rows from after_csv. """ - export_csv_dtypes = { - "geo_id": str, "val": float, "se": float, "sample_size": float, - "missing_val": int, "missing_se": int, "missing_sample_size": int - } - - before_df = pd.read_csv(before_csv, dtype=export_csv_dtypes) + before_df = pd.read_csv(before_csv, dtype=EXPORT_CSV_DTYPES) before_df.set_index("geo_id", inplace=True) before_df = before_df.round({"val": 7, "se": 7}) - after_df = pd.read_csv(after_csv, dtype=export_csv_dtypes) + after_df = pd.read_csv(after_csv, dtype=EXPORT_CSV_DTYPES) after_df.set_index("geo_id", inplace=True) after_df = after_df.round({"val": 7, "se": 7}) deleted_idx = before_df.index.difference(after_df.index) @@ -93,8 +92,8 @@ def diff_export_csv( before_df_cmn = before_df.reindex(common_idx) after_df_cmn = after_df.reindex(common_idx) - # If CSVs have different columns (no missingness), mark all values as new - if ("missing_val" in before_df_cmn.columns) ^ ("missing_val" in after_df_cmn.columns): + # If new CSV has missingness columns, but old doesn't, mark all values as new + if ("missing_val" not in before_df_cmn.columns) & ("missing_val" in after_df_cmn.columns): same_mask = after_df_cmn.copy() same_mask.loc[:] = False else: @@ -102,22 +101,12 @@ def diff_export_csv( same_mask = before_df_cmn == after_df_cmn same_mask |= pd.isna(before_df_cmn) & pd.isna(after_df_cmn) - # Code deleted entries as nans with the deleted missing code + # Any deleted entries become rows with nans and the deleted missing code deleted_df = before_df.loc[deleted_idx, :].copy() deleted_df[["val", "se", "sample_size"]] = np.nan - if "missing_val" in after_df_cmn.columns: - deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED - - # Remove deleted entries that were already present - if deleted_idx.size > 0: - deleted_same_mask = deleted_df == before_df.loc[deleted_idx, :] - deleted_same_mask |= pd.isna(deleted_df) & pd.isna(before_df.loc[deleted_idx, :]) - deleted_df = deleted_df.loc[~(deleted_same_mask.all(axis=1)), :] - - # If the new file has no missing columns, then we should remove them from - # the deletions too - if "missing_val" not in after_df_cmn.columns: - deleted_df = deleted_df[["val", "se", "sample_size"]] + # If the new file doesn't have missing columsn, then when the deleted, changed, and added + # rows are concatenated (in diff_exports), they will default to NA + deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED return ( deleted_df, diff --git a/_delphi_utils_python/tests/test_archive.py b/_delphi_utils_python/tests/test_archive.py index 679a4b7ba..68f05454d 100644 --- a/_delphi_utils_python/tests/test_archive.py +++ b/_delphi_utils_python/tests/test_archive.py @@ -2,7 +2,7 @@ from io import StringIO, BytesIO from os import listdir, mkdir from os.path import join -from typing import List +from typing import Any, Dict, List from boto3 import Session from git import Repo, exc @@ -19,10 +19,7 @@ CSV_DTYPES = { "geo_id": str, "val": float, "se": float, "sample_size": float, - "missing_val": int, "missing_se": int, "missing_sample_size": int -} -CSV_DTYPES_OLD = { - "geo_id": str, "val": float, "se": float, "sample_size": float + "missing_val": "Int64", "missing_se": "Int64", "missing_sample_size": "Int64" } CSVS_BEFORE = { @@ -35,7 +32,7 @@ "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # One row deleted and one row added "csv1": pd.DataFrame({ @@ -44,9 +41,9 @@ "se": [np.nan, 0.20000002, 0.30000003], "sample_size": [10.0, 20.0, 30.0], "missing_val": [Nans.NOT_MISSING] * 3, - "missing_se": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.CENSORED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # File deleted "csv2": pd.DataFrame({ @@ -57,7 +54,7 @@ "missing_val": [Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING], - }), + }), # All rows common, but missing columns added "csv4": pd.DataFrame({ @@ -65,42 +62,15 @@ "val": [1.0], "se": [0.1], "sample_size": [10.0] - }), - - # All rows common, but missing columns removed - "csv5": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0], - "missing_val": [Nans.NOT_MISSING], - "missing_se": [Nans.NOT_MISSING], - "missing_sample_size": [Nans.NOT_MISSING], - }), - - # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously - # and present again as nan, row 5 deleted previously and absent from file, - # row 6 deleted previously, but new missing has different NA code - # (missing columns present) - "csv6": pd.DataFrame({ - "geo_id": ["1", "2", "4", "5", "6"], - "val": [1.0, 2.0, np.nan, np.nan, np.nan], - "se": [0.1, 0.2, np.nan, np.nan, np.nan], - "sample_size": [10.0, 20.0, np.nan, np.nan, np.nan], - "missing_val": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3, - "missing_se": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3, - "missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3, }), - # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously - # and present again as nan, row 5 deleted previously and absent from file - # (no missing columns) - "csv7": pd.DataFrame({ - "geo_id": ["1", "2", "4", "5"], - "val": [1.0, 2.0, np.nan, np.nan], - "se": [0.1, 0.2, np.nan, np.nan], - "sample_size": [10.0, 20.0, np.nan, np.nan] - }) + # Same as 1, but no missing columns ("old-style" file) + "csv5": pd.DataFrame({ + "geo_id": ["1", "2", "3"], + "val": [1.0, 2.0, 3.0], + "se": [np.nan, 0.20000002, 0.30000003], + "sample_size": [10.0, 20.0, 30.0] + }) } CSVS_AFTER = { @@ -113,7 +83,7 @@ "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # One row deleted and one row added "csv1": pd.DataFrame({ @@ -122,9 +92,9 @@ "se": [np.nan, 0.21, np.nan], "sample_size": [10.0, 21.0, 40.0], "missing_val": [Nans.NOT_MISSING] * 3, - "missing_se": [Nans.NOT_MISSING] * 3, + "missing_se": [Nans.CENSORED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }), + }), # File added "csv3": pd.DataFrame({ @@ -135,7 +105,7 @@ "missing_val": [Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING], - }), + }), # All rows common, but missing columns added "csv4": pd.DataFrame({ @@ -146,51 +116,30 @@ "missing_val": [Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING], - }), - - # All rows common, but missing columns removed - "csv5": pd.DataFrame({ - "geo_id": ["1"], - "val": [1.0], - "se": [0.1], - "sample_size": [10.0] - }), - - # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously - # and present again as nan, row 5 deleted previously and absent from file, - # row 6 deleted previously, but new missing has different NA code - # (missing columns present) - "csv6": pd.DataFrame({ - "geo_id": ["1", "3", "4", "6"], - "val": [1.0, 3.0, np.nan, np.nan], - "se": [0.1, 0.3, np.nan, np.nan], - "sample_size": [10.0, 30.0, np.nan, np.nan], - "missing_val": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED], - "missing_se": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED], - "missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED], }), # Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously # and present again as nan, row 5 deleted previously and absent from file # (no missing columns) - "csv7": pd.DataFrame({ - "geo_id": ["1", "3", "4"], - "val": [1.0, 3.0, np.nan], - "se": [0.1, 0.3, np.nan], - "sample_size": [10.0, 30.0, np.nan] + "csv5": pd.DataFrame({ + "geo_id": ["1", "2", "4"], + "val": [1.0, 2.1, 4.0], + "se": [np.nan, 0.21, np.nan], + "sample_size": [10.0, 21.0, 40.0] }) } -import shutil -try: - shutil.rmtree("/tmp/cache") - shutil.rmtree("/tmp/export") -except: - ... - -def _assert_frames_equal_ignore_row_order(df1, df2, index_cols: List[str] =None): +def _assert_frames_equal_ignore_row_order(df1, df2, index_cols: List[str] = None): return assert_frame_equal(df1.set_index(index_cols).sort_index(), df2.set_index(index_cols).sort_index()) +def _set_df_datatypes(df: pd.DataFrame, dtypes: Dict[str, Any]) -> pd.DataFrame: + df = df.copy() + for k, v in dtypes.items(): + if k in df.columns: + df[k] = df[k].astype(v) + return df + + class TestArchiveDiffer: def test_stubs(self): @@ -208,7 +157,7 @@ def test_diff_and_filter_exports(self, tmp_path): mkdir(cache_dir) mkdir(export_dir) - expected_csv1_diff = pd.DataFrame({ + expected_csv1_diff = _set_df_datatypes(pd.DataFrame({ "geo_id": ["2", "3", "4"], "val": [2.1, np.nan, 4.0], "se": [0.21, np.nan, np.nan], @@ -216,25 +165,17 @@ def test_diff_and_filter_exports(self, tmp_path): "missing_val": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], "missing_se": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], "missing_sample_size": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING], - }) - expected_csv3 = CSVS_AFTER["csv3"] - expected_csv4_diff = CSVS_AFTER["csv4"] - expected_csv5_diff = CSVS_AFTER["csv5"] - expected_csv6_diff = pd.DataFrame({ - "geo_id": ["2", "3", "6"], - "val": [np.nan, 3.0, np.nan], - "se": [np.nan, 0.3, np.nan], - "sample_size": [np.nan, 30.0, np.nan], - "missing_val": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED], - "missing_se": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED], - "missing_sample_size": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED], - }) - expected_csv7_diff = pd.DataFrame({ - "geo_id": ["2", "3"], - "val": [np.nan, 3.0], - "se": [np.nan, 0.3], - "sample_size": [np.nan, 30.0] - }) + }), dtypes=CSV_DTYPES) + expected_csv4_diff = _set_df_datatypes(CSVS_AFTER["csv4"], dtypes=CSV_DTYPES) + expected_csv5_diff = _set_df_datatypes(pd.DataFrame({ + "geo_id": ["2", "3", "4"], + "val": [2.1, np.nan, 4.0], + "se": [0.21, np.nan, np.nan], + "sample_size": [21.0, np.nan, 40.0], + "missing_val": [np.nan, Nans.DELETED, np.nan], + "missing_se": [np.nan, Nans.DELETED, np.nan], + "missing_sample_size": [np.nan, Nans.DELETED, np.nan], + }), dtypes=CSV_DTYPES) arch_diff = ArchiveDiffer(cache_dir, export_dir) @@ -257,7 +198,7 @@ def test_diff_and_filter_exports(self, tmp_path): # Check return values assert set(deleted_files) == {join(cache_dir, "csv2.csv")} assert set(common_diffs.keys()) == { - join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"]} + join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv"]} assert set(new_files) == {join(export_dir, "csv3.csv")} assert common_diffs[join(export_dir, "csv0.csv")] is None assert common_diffs[join(export_dir, "csv1.csv")] == join( @@ -270,34 +211,24 @@ def test_diff_and_filter_exports(self, tmp_path): "csv3.csv", "csv4.csv", "csv4.csv.diff", "csv5.csv", "csv5.csv.diff", - "csv6.csv", "csv6.csv.diff", - "csv7.csv", "csv7.csv.diff", } + # Check that the files look as expected _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES), expected_csv1_diff, index_cols=["geo_id"] ) _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv4.csv.diff"), dtype=CSV_DTYPES_OLD), + pd.read_csv(join(export_dir, "csv4.csv.diff"), dtype=CSV_DTYPES), expected_csv4_diff, index_cols=["geo_id"] ) _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv5.csv.diff"), dtype=CSV_DTYPES_OLD), + pd.read_csv(join(export_dir, "csv5.csv.diff"), dtype=CSV_DTYPES), expected_csv5_diff, index_cols=["geo_id"] ) - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv6.csv.diff"), dtype=CSV_DTYPES_OLD), - expected_csv6_diff, - index_cols=["geo_id"] - ) - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv7.csv.diff"), dtype=CSV_DTYPES_OLD), - expected_csv7_diff, - index_cols=["geo_id"] - ) + # Test filter_exports # =================== @@ -312,17 +243,12 @@ def test_diff_and_filter_exports(self, tmp_path): arch_diff.filter_exports(common_diffs) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), expected_csv1_diff, index_cols=["geo_id"] ) - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv3.csv"), dtype=CSV_DTYPES), - expected_csv3, - index_cols=["geo_id"] - ) _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv4.csv"), dtype=CSV_DTYPES), expected_csv4_diff, @@ -333,16 +259,6 @@ def test_diff_and_filter_exports(self, tmp_path): expected_csv5_diff, index_cols=["geo_id"] ) - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv6.csv"), dtype=CSV_DTYPES), - expected_csv6_diff, - index_cols=["geo_id"] - ) - _assert_frames_equal_ignore_row_order( - pd.read_csv(join(export_dir, "csv7.csv"), dtype=CSV_DTYPES), - expected_csv7_diff, - index_cols=["geo_id"] - ) AWS_CREDENTIALS = { @@ -406,7 +322,7 @@ def test_archive_exports(self, tmp_path, s3_client): mkdir(cache_dir) mkdir(export_dir) - csv1 = CSVS_BEFORE["csv1"] + csv1 = _set_df_datatypes(CSVS_BEFORE["csv1"], dtypes=CSV_DTYPES) csv1.to_csv(join(export_dir, "csv1.csv"), index=False) s3_client.create_bucket(Bucket=self.bucket_name) @@ -461,11 +377,11 @@ def test_run(self, tmp_path, s3_client): # Check that the buckets now contain the exported files. for csv_name, df in CSVS_AFTER.items(): body = s3_client.get_object(Bucket=self.bucket_name, Key=f"{self.indicator_prefix}/{csv_name}.csv")["Body"] - assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df) + assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), _set_df_datatypes(df, dtypes=CSV_DTYPES)) # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"} - csv1_diff = pd.DataFrame({ + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} + csv1_diff = _set_df_datatypes(pd.DataFrame({ "geo_id": ["3", "2", "4"], "val": [np.nan, 2.1, 4.0], "se": [np.nan, 0.21, np.nan], @@ -473,7 +389,7 @@ def test_run(self, tmp_path, s3_client): "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - }) + }), dtypes=CSV_DTYPES) _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), csv1_diff, @@ -561,7 +477,7 @@ def test_diff_exports(self, tmp_path): "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }) + }) # Write exact same CSV into cache and export, so no diffs expected csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -602,7 +518,7 @@ def test_archive_exports(self, tmp_path): "missing_val": [Nans.NOT_MISSING] * 3, "missing_se": [Nans.NOT_MISSING] * 3, "missing_sample_size": [Nans.NOT_MISSING] * 3, - }) + }) # csv1.csv is now a dirty edit in the repo, and to be exported too csv1.to_csv(join(cache_dir, "csv1.csv"), index=False) @@ -673,12 +589,11 @@ def test_run(self, tmp_path): # Check that the archive branch contains `CSVS_AFTER`. arch_diff.get_branch(branch_name).checkout() for csv_name, df in CSVS_AFTER.items(): - assert_frame_equal( - pd.read_csv(join(cache_dir, f"{csv_name}.csv"), dtype=CSV_DTYPES), df) + assert_frame_equal(pd.read_csv(join(cache_dir, f"{csv_name}.csv"), dtype=CSV_DTYPES), _set_df_datatypes(df, dtypes=CSV_DTYPES)) original_branch.checkout() # Check exports directory just has incremental changes - assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"} + assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv"} csv1_diff = pd.DataFrame({ "geo_id": ["3", "2", "4"], "val": [np.nan, 2.1, 4.0], @@ -687,10 +602,10 @@ def test_run(self, tmp_path): "missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, "missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2, - }) + }) _assert_frames_equal_ignore_row_order( pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES), - csv1_diff, + _set_df_datatypes(csv1_diff, dtypes=CSV_DTYPES), index_cols=["geo_id"] )