diff --git a/nchs_mortality/.pylintrc b/nchs_mortality/.pylintrc index f30837c7e..02339e190 100644 --- a/nchs_mortality/.pylintrc +++ b/nchs_mortality/.pylintrc @@ -4,6 +4,7 @@ disable=logging-format-interpolation, too-many-locals, too-many-arguments, + fixme, # Allow pytest functions to be part of a class. no-self-use, # Allow pytest classes to have one test. diff --git a/nchs_mortality/delphi_nchs_mortality/constants.py b/nchs_mortality/delphi_nchs_mortality/constants.py index 164b84307..783227369 100644 --- a/nchs_mortality/delphi_nchs_mortality/constants.py +++ b/nchs_mortality/delphi_nchs_mortality/constants.py @@ -25,7 +25,11 @@ "prop" ] INCIDENCE_BASE = 100000 -GEO_RES = "state" +GEO_RES = [ + "nation", + "hhs", + "state" +] # this is necessary as a delimiter in the f-string expressions we use to # construct detailed error reports diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index fa0226fcb..b7a841a3e 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -7,9 +7,10 @@ import time from datetime import datetime, date, timedelta from typing import Dict, Any +from itertools import product import numpy as np -from delphi_utils import S3ArchiveDiffer, get_structured_logger +from delphi_utils import S3ArchiveDiffer, get_structured_logger, GeoMapper from .archive_diffs import arch_diffs from .constants import (METRICS, SENSOR_NAME_MAP, @@ -43,13 +44,14 @@ def run_module(params: Dict[str, Any]): __name__, filename=params["common"].get("log_filename"), log_exceptions=params["common"].get("log_exceptions", True)) export_start_date = params["indicator"]["export_start_date"] - if export_start_date == "latest": # Find the previous Saturday + if export_start_date == "latest": # Find the previous Saturday export_start_date = date.today() - timedelta( - days=date.today().weekday() + 2) + days=date.today().weekday() + 2) export_start_date = export_start_date.strftime('%Y-%m-%d') daily_export_dir = params["common"]["daily_export_dir"] token = params["indicator"]["token"] test_file = params["indicator"].get("test_file", None) + gmpr = GeoMapper() if "archive" in params: daily_arch_diff = S3ArchiveDiffer( @@ -60,52 +62,54 @@ def run_module(params: Dict[str, Any]): stats = [] df_pull = pull_nchs_mortality_data(token, test_file) - for metric in METRICS: + for metric, geo, sensor, in product(METRICS, GEO_RES, SENSORS): if metric == 'percent_of_expected_deaths': - print(metric) - df = df_pull.copy() - df["val"] = df[metric] - df["se"] = np.nan - df["sample_size"] = np.nan - df = df[~df["val"].isnull()] - sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) - dates = export_csv( - df, - geo_name=GEO_RES, - export_dir=daily_export_dir, - start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), - sensor=sensor_name, - ) - if len(dates) > 0: - stats.append((max(dates), len(dates))) - else: - for sensor in SENSORS: - print(metric, sensor) - df = df_pull.copy() - if sensor == "num": - df["val"] = df[metric] - else: - df["val"] = df[metric] / df["population"] * INCIDENCE_BASE - df["se"] = np.nan - df["sample_size"] = np.nan - df = df[~df["val"].isnull()] - sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) - dates = export_csv( - df, - geo_name=GEO_RES, - export_dir=daily_export_dir, - start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), - sensor=sensor_name, - ) - if len(dates) > 0: - stats.append((max(dates), len(dates))) - -# Weekly run of archive utility on Monday -# - Does not upload to S3, that is handled by daily run of archive utility -# - Exports issues into receiving for the API -# Daily run of archiving utility -# - Uploads changed files to S3 -# - Does not export any issues into receiving + continue + print(metric, sensor) + sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) + df = _safe_copy_df(df_pull, metric) + + if geo in ["hhs", "nation"]: + df = _map_from_state(df, geo, gmpr) + + if sensor == "prop": + df["val"] = df["val"] / df["population"] * INCIDENCE_BASE + + dates = export_csv( + df, + geo_name=geo, + export_dir=daily_export_dir, + start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), + sensor=sensor_name, + ) + if len(dates) > 0: + stats.append((max(dates), len(dates))) + + for geo in GEO_RES: + metric = 'percent_of_expected_deaths' + print(metric) + sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) + df = _safe_copy_df(df_pull, metric) + + if geo in ["hhs", "nation"]: + df = _map_from_state(df, geo, gmpr, weighted=True) + + dates = export_csv( + df, + geo_name=geo, + export_dir=daily_export_dir, + start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), + sensor=sensor_name, + ) + if len(dates) > 0: + stats.append((max(dates), len(dates))) + + # Weekly run of archive utility on Monday + # - Does not upload to S3, that is handled by daily run of archive utility + # - Exports issues into receiving for the API + # Daily run of archiving utility + # - Uploads changed files to S3 + # - Does not export any issues into receiving if "archive" in params: arch_diffs(params, daily_arch_diff) @@ -115,7 +119,39 @@ def run_module(params: Dict[str, Any]): max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d") logger.info("Completed indicator run", - elapsed_time_in_seconds = elapsed_time_in_seconds, - csv_export_count = csv_export_count, - max_lag_in_days = max_lag_in_days, - oldest_final_export_date = formatted_min_max_date) + elapsed_time_in_seconds=elapsed_time_in_seconds, + csv_export_count=csv_export_count, + max_lag_in_days=max_lag_in_days, + oldest_final_export_date=formatted_min_max_date) + + +def _safe_copy_df(df, metric_col_name): + """Create a copy of the given df, and drop rows where the metric is nan.""" + df_copy = df.copy() + df_copy["se"] = np.nan + df_copy["sample_size"] = np.nan + df_copy["val"] = df_copy[metric_col_name] + return df_copy[~df_copy["val"].isnull()] + + +def _map_from_state(df, geo, gmpr, weighted=False): + """Map from state_id to another given geocode. + + The weighted flag is used when aggregating metrics which come as percentages + rather than raw counts, and therefore need to be weighted by population when + combining. + """ + # TODO - this first mapping from state_id to state_code is necessary because + # the GeoMapper does not currently support going directly from state_id to hhs or + # nation. See issue #1255 + df = gmpr.replace_geocode( + df, "state_id", "state_code", from_col="geo_id", date_col="timestamp") + if weighted: + df["weight"] = df["population"] + df = gmpr.replace_geocode( + df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename( + columns={geo: "geo_id"}) + if weighted: + df["val"] = df["val"] / df["population"] + + return df diff --git a/nchs_mortality/tests/test_run.py b/nchs_mortality/tests/test_run.py index 36dba6698..e8f6f7a0c 100644 --- a/nchs_mortality/tests/test_run.py +++ b/nchs_mortality/tests/test_run.py @@ -36,17 +36,29 @@ def test_output_files_exist(self, run_as_module, date): 'deaths_pneumonia_or_flu_or_covid_incidence'] sensors = ["num", "prop"] - expected_files = [] + expected_files_nation = [] + expected_files_state=[] + expected_files_hhs=[] for d in dates: for metric in metrics: if metric == "deaths_percent_of_expected": - expected_files += ["weekly_" + d + "_state_" \ + expected_files_nation += ["weekly_" + d + "_nation_" \ + + metric + ".csv"] + expected_files_state += ["weekly_" + d + "_state_" \ + + metric + ".csv"] + expected_files_hhs += ["weekly_" + d + "_hhs_" \ + metric + ".csv"] else: for sensor in sensors: - expected_files += ["weekly_" + d + "_state_" \ + expected_files_nation += ["weekly_" + d + "_nation_" \ + + metric + "_" + sensor + ".csv"] + expected_files_state += ["weekly_" + d + "_state_" \ + + metric + "_" + sensor + ".csv"] + expected_files_hhs += ["weekly_" + d + "_hhs_" \ + metric + "_" + sensor + ".csv"] - assert set(expected_files).issubset(set(csv_files)) + assert set(expected_files_nation).issubset(set(csv_files)) + assert set(expected_files_state).issubset(set(csv_files)) + assert set(expected_files_hhs).issubset(set(csv_files)) @pytest.mark.parametrize("date", ["2020-09-14", "2020-09-18"]) def test_output_file_format(self, run_as_module, date):