Skip to content

Add NCHS mortality geo aggregation at the HHS and nation levels #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion nchs_mortality/delphi_nchs_mortality/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,11 @@
"prop"
]
INCIDENCE_BASE = 100000
GEO_RES = "state"
GEO_RES = [
"nation",
"hhs",
"state"
]

# this is necessary as a delimiter in the f-string expressions we use to
# construct detailed error reports
Expand Down
58 changes: 38 additions & 20 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@
import time
from datetime import datetime, date, timedelta
from typing import Dict, Any
from itertools import product

import numpy as np
from delphi_utils import S3ArchiveDiffer, get_structured_logger
from delphi_utils import S3ArchiveDiffer, get_structured_logger, GeoMapper

from .archive_diffs import arch_diffs
from .constants import (METRICS, SENSOR_NAME_MAP,
Expand All @@ -18,7 +19,7 @@
from .pull import pull_nchs_mortality_data


def run_module(params: Dict[str, Any]):
def run_module(params: Dict[str, Any]): # pylint: disable=too-many-branches, too-many-statements
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this better than splitting it up into more-specific functions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Definitely not, just pushed some changes to pull out some helper methods!

"""Run module for processing NCHS mortality data.

The `params` argument is expected to have the following structure:
Expand All @@ -43,13 +44,14 @@ def run_module(params: Dict[str, Any]):
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
export_start_date = params["indicator"]["export_start_date"]
if export_start_date == "latest": # Find the previous Saturday
if export_start_date == "latest": # Find the previous Saturday
export_start_date = date.today() - timedelta(
days=date.today().weekday() + 2)
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')
daily_export_dir = params["common"]["daily_export_dir"]
token = params["indicator"]["token"]
test_file = params["indicator"].get("test_file", None)
gmpr = GeoMapper()

if "archive" in params:
daily_arch_diff = S3ArchiveDiffer(
Expand All @@ -60,7 +62,7 @@ def run_module(params: Dict[str, Any]):

stats = []
df_pull = pull_nchs_mortality_data(token, test_file)
for metric in METRICS:
for metric, geo in product(METRICS, GEO_RES):
if metric == 'percent_of_expected_deaths':
print(metric)
df = df_pull.copy()
Expand All @@ -69,9 +71,20 @@ def run_module(params: Dict[str, Any]):
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric]])

if geo in ["hhs", "nation"]:
df = gmpr.replace_geocode(
df, "state_id", "state_code", from_col="geo_id", date_col="timestamp")
# Weight by population when aggregating across geocodes
df["weight"] = df["population"]
df = gmpr.replace_geocode(
df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename(
columns={geo: "geo_id"})
df["val"] = df["val"] / df["population"]

dates = export_csv(
df,
geo_name=GEO_RES,
geo_name=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
Expand All @@ -82,30 +95,35 @@ def run_module(params: Dict[str, Any]):
for sensor in SENSORS:
print(metric, sensor)
df = df_pull.copy()
df["se"] = np.nan
df["sample_size"] = np.nan
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be pulled out of the for loop

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually it can probably be pulled out of the if block as well

sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
if geo in ["hhs", "nation"]:
df = gmpr.replace_geocode(
df, "state_id", "state_code", from_col="geo_id", date_col="timestamp")
df = gmpr.replace_geocode(
df, "state_code", geo, date_col="timestamp").rename(columns={geo: "geo_id"})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reduce some of the duplication with percent_of_expected_deaths here at all?

if sensor == "num":
df["val"] = df[metric]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
dates = export_csv(
df,
geo_name=GEO_RES,
geo_name=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
if "archive" in params:
arch_diffs(params, daily_arch_diff)

Expand All @@ -115,7 +133,7 @@ def run_module(params: Dict[str, Any]):
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info("Completed indicator run",
elapsed_time_in_seconds = elapsed_time_in_seconds,
csv_export_count = csv_export_count,
max_lag_in_days = max_lag_in_days,
oldest_final_export_date = formatted_min_max_date)
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date)
20 changes: 16 additions & 4 deletions nchs_mortality/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,29 @@ def test_output_files_exist(self, run_as_module, date):
'deaths_pneumonia_or_flu_or_covid_incidence']
sensors = ["num", "prop"]

expected_files = []
expected_files_nation = []
expected_files_state=[]
expected_files_hhs=[]
for d in dates:
for metric in metrics:
if metric == "deaths_percent_of_expected":
expected_files += ["weekly_" + d + "_state_" \
expected_files_nation += ["weekly_" + d + "_nation_" \
+ metric + ".csv"]
expected_files_state += ["weekly_" + d + "_state_" \
+ metric + ".csv"]
expected_files_hhs += ["weekly_" + d + "_hhs_" \
+ metric + ".csv"]
else:
for sensor in sensors:
expected_files += ["weekly_" + d + "_state_" \
expected_files_nation += ["weekly_" + d + "_nation_" \
+ metric + "_" + sensor + ".csv"]
expected_files_state += ["weekly_" + d + "_state_" \
+ metric + "_" + sensor + ".csv"]
expected_files_hhs += ["weekly_" + d + "_hhs_" \
+ metric + "_" + sensor + ".csv"]
assert set(expected_files).issubset(set(csv_files))
assert set(expected_files_nation).issubset(set(csv_files))
assert set(expected_files_state).issubset(set(csv_files))
assert set(expected_files_hhs).issubset(set(csv_files))

@pytest.mark.parametrize("date", ["2020-09-14", "2020-09-18"])
def test_output_file_format(self, run_as_module, date):
Expand Down