Skip to content

Add NAN code support to changehc #898

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions changehc/delphi_changehc/update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,27 +11,57 @@
# third party
import numpy as np
import pandas as pd
from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday
from delphi_utils import GeoMapper, add_prefix, create_export_csv, Weekday, Nans

# first party
from .config import Config
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI, NA
from .constants import SMOOTHED, SMOOTHED_ADJ, SMOOTHED_CLI, SMOOTHED_ADJ_CLI
from .sensor import CHCSensor


def censor_columns(df, cols, inplace=False):
"""Replace values with nans in the specified columns."""
df = df if inplace else df.copy()
df.loc[:, cols] = np.nan
return df

def add_nancodes(df, write_se, inplace=False):
"""Add nancodes to the dataframe."""
df = df if inplace else df.copy()

# Default missingness codes
df["missing_val"] = Nans.NOT_MISSING
df["missing_se"] = Nans.CENSORED if not write_se else Nans.NOT_MISSING
df["missing_sample_size"] = Nans.CENSORED

# Censor those that weren't included
df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
df.loc[~df['incl'], ["missing_val", "missing_se"]] = Nans.CENSORED

# Mark any remaining nans with unknown
remaining_nans_mask = df["val"].isnull() & df["missing_val"].eq(Nans.NOT_MISSING)
df.loc[remaining_nans_mask, "missing_val"] = Nans.OTHER

remaining_nans_mask = df["se"].isnull() & df["missing_se"].eq(Nans.NOT_MISSING)
df.loc[remaining_nans_mask, "missing_se"] = Nans.OTHER

return df

def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_path=".", start_date=None, end_date=None):
"""Write sensor values to csv.

Args:
df: dataframe containing unique timestamp, unqiue geo_id, val, se, sample_size
geo_level: the geographic level being written e.g. county, state
write_se: boolean to write out standard errors, if true, use an obfuscated name
day_shift: a timedelta specifying the time shift to apply to the dates
out_name: name of the output file
output_path: outfile path to write the csv (default is current directory)
start_date: the first date of the dates to be written
end_date: the last date of the dates to be written
logger: a logger object to log events while writing
"""
logger = logging if logger is None else logger

df = df.copy()

# shift dates forward for labeling
Expand All @@ -40,13 +70,12 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
# suspicious value warnings
suspicious_se_mask = df["se"].gt(5)
assert df[suspicious_se_mask].empty, " se contains suspiciously large values"
assert not df["se"].isna().any(), " se contains nan values"

if write_se:
logger.info("========= WARNING: WRITING SEs TO {0} =========".format(out_name))
else:
df["se"] = np.nan

assert not df["val"].isna().any(), " val contains nan values"
suspicious_val_mask = df["val"].gt(90)
if not df[suspicious_val_mask].empty:
for geo in df.loc[suspicious_val_mask, "geo_id"]:
Expand All @@ -61,7 +90,8 @@ def write_to_csv(df, geo_level, write_se, day_shift, out_name, logger, output_pa
start_date=start_date,
end_date=end_date,
sensor=out_name,
write_empty_days=True
write_empty_days=True,
logger=logger
)
logger.debug("wrote {0} rows for {1} {2}".format(
df.size, df["geo_id"].unique().size, geo_level
Expand Down Expand Up @@ -231,14 +261,12 @@ def update_sensor(self,
res = pd.DataFrame(res).loc[final_sensor_idxs]
dfs.append(res)

# Form the output dataframe
df = pd.concat(dfs)
# sample size is never shared
df["sample_size"] = np.nan
# conform to naming expected by create_export_csv()
df = df.reset_index().rename(columns={"rate": "val"})
# df.loc[~df['incl'], ["val", "se"]] = np.nan # update to this line after nancodes get merged in
df = df[df["incl"]]
# Form the output dataframe and conform to expected naming
df = pd.concat(dfs).reset_index().rename(columns={"date": "timestamp", "rate": "val"})

# sample size is never shared; standard error might be shared
df = censor_columns(df, ["sample_size"] if self.se else ["sample_size", "se"])
df = add_nancodes(df, self.se)

# write out results
dates = write_to_csv(
Expand Down
149 changes: 92 additions & 57 deletions changehc/tests/test_update_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@
from boto3 import Session
from moto import mock_s3
import pytest
import mock

# first party
from delphi_changehc.config import Config
from delphi_changehc.update_sensor import write_to_csv, CHCSensorUpdater
from delphi_changehc.update_sensor import add_nancodes, censor_columns, write_to_csv, CHCSensorUpdater
from delphi_utils.nancodes import Nans

CONFIG = Config()
PARAMS = {
Expand Down Expand Up @@ -96,7 +98,8 @@ def test_geo_reindex(self):
def test_update_sensor(self):
"""Tests that the sensors are properly updated."""
outputs = {}
for geo in ["county", "state", "hhs", "nation"]:
geos = ["county", "state", "hhs", "nation"]
for geo in geos:
td = TemporaryDirectory()
su_inst = CHCSensorUpdater(
"03-01-2020",
Expand Down Expand Up @@ -127,11 +130,10 @@ def test_update_sensor(self):
assert len(os.listdir(td.name)) == len(su_inst.sensor_dates),\
f"failed {geo} update sensor test"
td.cleanup()
assert outputs["20200319_county_smoothed_outpatient_covid.csv"].empty
assert outputs["20200319_state_smoothed_outpatient_covid.csv"].empty
assert outputs["20200319_hhs_smoothed_outpatient_covid.csv"].empty
assert outputs["20200319_nation_smoothed_outpatient_covid.csv"].empty

value_columns = ["val", "se", "sample_size"]
for geo in geos:
assert np.isnan(outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"][value_columns]).all().all()
assert outputs["20200319_" + geo + "_smoothed_outpatient_covid.csv"]["missing_val"].eq(3).all()

class TestWriteToCsv:
"""Tests for writing output files to CSV."""
Expand All @@ -142,16 +144,19 @@ def test_write_to_csv_results(self):
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
"sample_size": [np.nan] * 6,
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
"include": [True, True, True] + [True, False, True],
"incl": [True, True, True] + [True, False, True],
"geo_id": ["a"] * 3 + ["b"] * 3,
})

td = TemporaryDirectory()

res0 = censor_columns(res0, ["sample_size", "se"])
res0 = add_nancodes(res0, write_se=False)

write_to_csv(
res0[res0['include']],
res0,
geo_level="geography",
write_se=False,
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name,
Expand All @@ -162,7 +167,10 @@ def test_write_to_csv_results(self):
expected_name = "20200502_geography_name_of_signal.csv"
assert exists(join(td.name, expected_name))
output_data = pd.read_csv(join(td.name, expected_name))
expected_columns = ["geo_id", "val", "se", "sample_size"]
expected_columns = [
"geo_id", "val", "se", "sample_size",
"missing_val", "missing_se", "missing_sample_size"
]
assert (output_data.columns == expected_columns).all()
assert (output_data.geo_id == ["a", "b"]).all()
assert np.array_equal(output_data.val.values, np.array([0.1, 1]))
Expand All @@ -175,8 +183,8 @@ def test_write_to_csv_results(self):
assert exists(join(td.name, expected_name))
output_data = pd.read_csv(join(td.name, expected_name))
assert (output_data.columns == expected_columns).all()
assert (output_data.geo_id == ["a"]).all()
assert np.array_equal(output_data.val.values, np.array([0.5]))
assert (output_data.geo_id == ["a", "b"]).all()
assert np.array_equal(output_data.val.values, np.array([0.5, np.nan]), equal_nan=True)
assert np.isnan(output_data.se.values).all()
assert np.isnan(output_data.sample_size.values).all()

Expand All @@ -198,13 +206,15 @@ def test_write_to_csv_with_se_results(self):
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
"sample_size": [np.nan] * 6,
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
"include": [True, True, True] + [True, False, True],
"incl": [True, True, True] + [True, False, True],
"geo_id": ["a"] * 3 + ["b"] * 3,
})

res0 = add_nancodes(res0, write_se=True)

td = TemporaryDirectory()
write_to_csv(
res0[res0['include']],
res0,
geo_level="geography",
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
Expand All @@ -215,64 +225,46 @@ def test_write_to_csv_with_se_results(self):

# check outputs
expected_name = "20200502_geography_name_of_signal.csv"
expected_columns = [
"geo_id", "val", "se", "sample_size",
"missing_val", "missing_se", "missing_sample_size"
]
assert exists(join(td.name, expected_name))
output_data = pd.read_csv(join(td.name, expected_name))
expected_columns = ["geo_id", "val", "se", "sample_size"]
assert (output_data.columns == expected_columns).all()
assert (output_data.geo_id == ["a", "b"]).all()
assert np.array_equal(output_data.val.values, np.array([0.1, 1]))
assert np.array_equal(output_data.se.values, np.array([0.1, 0.5]))
assert np.isnan(output_data.sample_size.values).all()
td.cleanup()

def test_write_to_csv_wrong_results(self):
"""Tests that nonsensical inputs trigger exceptions."""
def test_suspicious_value_logging(self):
res0 = pd.DataFrame({
"val": [0.1, 0.5, 1.5] + [1, 2, 3],
"se": [0.1, 1, 1.1] + [0.5, 0.5, 0.5],
"val": [91, 0.5, 1.5] + [1, 2, 3],
"se": [0.1, 1, 1.1] + [0.5, np.nan, 0.5],
"sample_size": [np.nan] * 6,
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
"include": [True, True, True] + [True, False, True],
"incl": [True, True, True] + [True, False, True],
"geo_id": ["a"] * 3 + ["b"] * 3,
}).set_index(["timestamp", "geo_id"]).sort_index()

td = TemporaryDirectory()
})

# nan value for included loc-date
res1 = res0.copy()
res1 = res1[res1['include']]
res1.loc[("2020-05-01", "a"), "val"] = np.nan
res1.reset_index(inplace=True)
with pytest.raises(AssertionError):
write_to_csv(
res1,
geo_level="geography",
write_se=False,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name,
logger=TEST_LOGGER
)
res0 = add_nancodes(res0, write_se=True)

# nan se for included loc-date
res2 = res0.copy()
res2 = res2[res2['include']]
res2.loc[("2020-05-01", "a"), "se"] = np.nan
res2.reset_index(inplace=True)
with pytest.raises(AssertionError):
write_to_csv(
res2,
geo_level="geography",
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name,
logger=TEST_LOGGER
)
mock_logger = mock.Mock()
td = TemporaryDirectory()
write_to_csv(
res0,
geo_level="geography",
write_se=True,
day_shift=CONFIG.DAY_SHIFT,
out_name="name_of_signal",
output_path=td.name,
logger=mock_logger
)

# large se value
res3 = res0.copy()
res3 = res3[res3['include']]
res3 = res0.copy().set_index(["timestamp", "geo_id"])
res3 = res3[res3['incl']]
res3.loc[("2020-05-01", "a"), "se"] = 10
res3.reset_index(inplace=True)
with pytest.raises(AssertionError):
Expand All @@ -286,4 +278,47 @@ def test_write_to_csv_wrong_results(self):
logger=TEST_LOGGER
)

td.cleanup()
mock_logger.warning.assert_called_once_with(
"value suspiciously high, {0}: {1}".format("a", "name_of_signal")
)

def test_add_nancodes(self):
"""Tests that nancodes are correctly addded."""
res0 = pd.DataFrame({
"val": [np.nan, 0.5, 1.5] + [1, 2, 3],
"se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5],
"sample_size": [np.nan] * 6,
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
"incl": [True, True, True] + [True, False, True],
"geo_id": ["a"] * 3 + ["b"] * 3,
}).set_index(["timestamp", "geo_id"]).sort_index()

expected_df = pd.DataFrame({
"val": [np.nan, 0.5, 1.5] + [1, np.nan, 3],
"se": [np.nan, 1, 1.1] + [np.nan, np.nan, 0.5],
"sample_size": [np.nan] * 6,
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
"incl": [True, True, True] + [True, False, True],
"geo_id": ["a"] * 3 + ["b"] * 3,
"missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING],
"missing_se": [Nans.OTHER] + [Nans.NOT_MISSING] * 2 + [Nans.OTHER, Nans.CENSORED, Nans.NOT_MISSING],
"missing_sample_size": [Nans.CENSORED] * 6,
}).set_index(["timestamp", "geo_id"]).sort_index()

res = censor_columns(res0, ["sample_size"])
pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=True))

expected_df = pd.DataFrame({
"val": [np.nan, 0.5, 1.5] + [1, np.nan, 3],
"se": [np.nan] * 6,
"sample_size": [np.nan] * 6,
"timestamp": pd.to_datetime(["2020-05-01", "2020-05-02", "2020-05-04"] * 2),
"incl": [True, True, True] + [True, False, True],
"geo_id": ["a"] * 3 + ["b"] * 3,
"missing_val": [Nans.OTHER] + [Nans.NOT_MISSING] * 3 + [Nans.CENSORED, Nans.NOT_MISSING],
"missing_se": [Nans.CENSORED] * 6,
"missing_sample_size": [Nans.CENSORED] * 6,
}).set_index(["timestamp", "geo_id"]).sort_index()

res = censor_columns(res0, ["sample_size", "se"])
pd.testing.assert_frame_equal(expected_df, add_nancodes(res, write_se=False))