Skip to content

Fix archiver bug ignoring deletions when comparing two files with no missing columns #1522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions _delphi_utils_python/delphi_utils/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,17 @@ def diff_export_csv(
if "missing_val" in after_df_cmn.columns:
deleted_df[["missing_val", "missing_se", "missing_sample_size"]] = Nans.DELETED

# Remove deleted entries that were already present
if deleted_idx.size > 0:
deleted_same_mask = deleted_df == before_df.loc[deleted_idx, :]
deleted_same_mask |= pd.isna(deleted_df) & pd.isna(before_df.loc[deleted_idx, :])
deleted_df = deleted_df.loc[~(deleted_same_mask.all(axis=1)), :]

# If the new file has no missing columns, then we should remove them from
# the deletions too
if "missing_val" not in after_df_cmn.columns:
deleted_df = deleted_df[["val", "se", "sample_size"]]

return (
deleted_df,
after_df_cmn.loc[~(same_mask.all(axis=1)), :],
Expand Down
214 changes: 149 additions & 65 deletions _delphi_utils_python/tests/test_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from io import StringIO, BytesIO
from os import listdir, mkdir
from os.path import join
from typing import List

from boto3 import Session
from git import Repo, exc
Expand All @@ -19,7 +20,10 @@
CSV_DTYPES = {
"geo_id": str, "val": float, "se": float, "sample_size": float,
"missing_val": int, "missing_se": int, "missing_sample_size": int
}
}
CSV_DTYPES_OLD = {
"geo_id": str, "val": float, "se": float, "sample_size": float
}

CSVS_BEFORE = {
# All rows unchanged
Expand Down Expand Up @@ -74,29 +78,29 @@
"missing_sample_size": [Nans.NOT_MISSING],
}),

# All rows common, but no missing columns
# Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously
# and present again as nan, row 5 deleted previously and absent from file,
# row 6 deleted previously, but new missing has different NA code
# (missing columns present)
"csv6": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),

# Row deleted and row added, but no missing columns (will not be uploaded)
"geo_id": ["1", "2", "4", "5", "6"],
"val": [1.0, 2.0, np.nan, np.nan, np.nan],
"se": [0.1, 0.2, np.nan, np.nan, np.nan],
"sample_size": [10.0, 20.0, np.nan, np.nan, np.nan],
"missing_val": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3,
"missing_se": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3,
"missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] * 3,
}),

# Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously
# and present again as nan, row 5 deleted previously and absent from file
# (no missing columns)
"csv7": pd.DataFrame({
"geo_id": ["1", "2"],
"val": [1.0, 2.0],
"se": [0.1, 0.2],
"sample_size": [10.0, 20.0]
}),

# Row deleted and row added, but no missing columns
"csv8": pd.DataFrame({
"geo_id": ["1", "2"],
"val": [1.0, 2.0],
"se": [0.1, 0.2],
"sample_size": [10.0, 20.0]
}),
"geo_id": ["1", "2", "4", "5"],
"val": [1.0, 2.0, np.nan, np.nan],
"se": [0.1, 0.2, np.nan, np.nan],
"sample_size": [10.0, 20.0, np.nan, np.nan]
})
}

CSVS_AFTER = {
Expand Down Expand Up @@ -152,31 +156,41 @@
"sample_size": [10.0]
}),

# All rows common, but no missing columns
# Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously
# and present again as nan, row 5 deleted previously and absent from file,
# row 6 deleted previously, but new missing has different NA code
# (missing columns present)
"csv6": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),

# Row deleted and row added, but no missing columns (will not be uploaded)
"geo_id": ["1", "3", "4", "6"],
"val": [1.0, 3.0, np.nan, np.nan],
"se": [0.1, 0.3, np.nan, np.nan],
"sample_size": [10.0, 30.0, np.nan, np.nan],
"missing_val": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED],
"missing_se": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED],
"missing_sample_size": [Nans.NOT_MISSING] * 2 + [Nans.DELETED] + [Nans.CENSORED],
}),

# Row 1 same, row 2 deleted, row 3 added, row 4 deleted previously
# and present again as nan, row 5 deleted previously and absent from file
# (no missing columns)
"csv7": pd.DataFrame({
"geo_id": ["1"],
"val": [1.0],
"se": [0.1],
"sample_size": [10.0]
}),

# Row deleted and row added, but no missing columns
"csv8": pd.DataFrame({
"geo_id": ["1", "3"],
"val": [1.0, 3.0],
"se": [0.1, 0.3],
"sample_size": [10.0, 30.0]
}),
"geo_id": ["1", "3", "4"],
"val": [1.0, 3.0, np.nan],
"se": [0.1, 0.3, np.nan],
"sample_size": [10.0, 30.0, np.nan]
})
}

import shutil
try:
shutil.rmtree("/tmp/cache")
shutil.rmtree("/tmp/export")
except:
...

def _assert_frames_equal_ignore_row_order(df1, df2, index_cols: List[str] =None):
return assert_frame_equal(df1.set_index(index_cols).sort_index(), df2.set_index(index_cols).sort_index())

class TestArchiveDiffer:

def test_stubs(self):
Expand All @@ -194,14 +208,32 @@ def test_diff_and_filter_exports(self, tmp_path):
mkdir(cache_dir)
mkdir(export_dir)

csv1_diff = pd.DataFrame({
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
"se": [np.nan, 0.21, np.nan],
"sample_size": [np.nan, 21.0, 40.0],
"missing_val": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
expected_csv1_diff = pd.DataFrame({
"geo_id": ["2", "3", "4"],
"val": [2.1, np.nan, 4.0],
"se": [0.21, np.nan, np.nan],
"sample_size": [21.0, np.nan, 40.0],
"missing_val": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING],
"missing_se": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING],
"missing_sample_size": [Nans.NOT_MISSING, Nans.DELETED, Nans.NOT_MISSING],
})
expected_csv3 = CSVS_AFTER["csv3"]
expected_csv4_diff = CSVS_AFTER["csv4"]
expected_csv5_diff = CSVS_AFTER["csv5"]
expected_csv6_diff = pd.DataFrame({
"geo_id": ["2", "3", "6"],
"val": [np.nan, 3.0, np.nan],
"se": [np.nan, 0.3, np.nan],
"sample_size": [np.nan, 30.0, np.nan],
"missing_val": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED],
"missing_se": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED],
"missing_sample_size": [Nans.DELETED, Nans.NOT_MISSING, Nans.CENSORED],
})
expected_csv7_diff = pd.DataFrame({
"geo_id": ["2", "3"],
"val": [np.nan, 3.0],
"se": [np.nan, 0.3],
"sample_size": [np.nan, 30.0]
})

arch_diff = ArchiveDiffer(cache_dir, export_dir)
Expand All @@ -225,7 +257,7 @@ def test_diff_and_filter_exports(self, tmp_path):
# Check return values
assert set(deleted_files) == {join(cache_dir, "csv2.csv")}
assert set(common_diffs.keys()) == {
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv", "csv8.csv"]}
join(export_dir, f) for f in ["csv0.csv", "csv1.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"]}
assert set(new_files) == {join(export_dir, "csv3.csv")}
assert common_diffs[join(export_dir, "csv0.csv")] is None
assert common_diffs[join(export_dir, "csv1.csv")] == join(
Expand All @@ -238,13 +270,34 @@ def test_diff_and_filter_exports(self, tmp_path):
"csv3.csv",
"csv4.csv", "csv4.csv.diff",
"csv5.csv", "csv5.csv.diff",
"csv6.csv",
"csv6.csv", "csv6.csv.diff",
"csv7.csv", "csv7.csv.diff",
"csv8.csv", "csv8.csv.diff"
}
assert_frame_equal(
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv1.csv.diff"), dtype=CSV_DTYPES),
csv1_diff)
expected_csv1_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv4.csv.diff"), dtype=CSV_DTYPES_OLD),
expected_csv4_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv5.csv.diff"), dtype=CSV_DTYPES_OLD),
expected_csv5_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv6.csv.diff"), dtype=CSV_DTYPES_OLD),
expected_csv6_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv7.csv.diff"), dtype=CSV_DTYPES_OLD),
expected_csv7_diff,
index_cols=["geo_id"]
)

# Test filter_exports
# ===================
Expand All @@ -259,10 +312,37 @@ def test_diff_and_filter_exports(self, tmp_path):
arch_diff.filter_exports(common_diffs)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"}
assert_frame_equal(
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"}
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
expected_csv1_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv3.csv"), dtype=CSV_DTYPES),
expected_csv3,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv4.csv"), dtype=CSV_DTYPES),
expected_csv4_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv5.csv"), dtype=CSV_DTYPES),
expected_csv5_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv6.csv"), dtype=CSV_DTYPES),
expected_csv6_diff,
index_cols=["geo_id"]
)
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv7.csv"), dtype=CSV_DTYPES),
expected_csv7_diff,
index_cols=["geo_id"]
)


AWS_CREDENTIALS = {
Expand Down Expand Up @@ -384,7 +464,7 @@ def test_run(self, tmp_path, s3_client):
assert_frame_equal(pd.read_csv(body, dtype=CSV_DTYPES), df)

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
Expand All @@ -394,9 +474,11 @@ def test_run(self, tmp_path, s3_client):
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
csv1_diff,
index_cols=["geo_id"]
)


class TestGitArchiveDiffer:
Expand Down Expand Up @@ -596,7 +678,7 @@ def test_run(self, tmp_path):
original_branch.checkout()

# Check exports directory just has incremental changes
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv7.csv", "csv8.csv"}
assert set(listdir(export_dir)) == {"csv1.csv", "csv3.csv", "csv4.csv", "csv5.csv", "csv6.csv", "csv7.csv"}
csv1_diff = pd.DataFrame({
"geo_id": ["3", "2", "4"],
"val": [np.nan, 2.1, 4.0],
Expand All @@ -606,9 +688,11 @@ def test_run(self, tmp_path):
"missing_se": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
"missing_sample_size": [Nans.DELETED] + [Nans.NOT_MISSING] * 2,
})
assert_frame_equal(
_assert_frames_equal_ignore_row_order(
pd.read_csv(join(export_dir, "csv1.csv"), dtype=CSV_DTYPES),
csv1_diff)
csv1_diff,
index_cols=["geo_id"]
)


class TestFromParams:
Expand Down