|
9 | 9 | from itertools import product
|
10 | 10 | from typing import Dict, Any
|
11 | 11 |
|
| 12 | +import pandas as pd |
12 | 13 | import numpy as np
|
13 | 14 | from delphi_utils import (
|
14 | 15 | create_export_csv,
|
|
64 | 65 | ]
|
65 | 66 |
|
66 | 67 |
|
| 68 | +def add_nancodes(df, smoother): |
| 69 | + """Add nancodes to the dataframe.""" |
| 70 | + idx = pd.IndexSlice |
| 71 | + |
| 72 | + # Default nancodes |
| 73 | + df["missing_val"] = Nans.NOT_MISSING |
| 74 | + df["missing_se"] = Nans.NOT_APPLICABLE |
| 75 | + df["missing_sample_size"] = Nans.NOT_APPLICABLE |
| 76 | + |
| 77 | + # Mark early smoothing entries as data insufficient |
| 78 | + if smoother == "seven_day_average": |
| 79 | + df.sort_index(inplace=True) |
| 80 | + min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) |
| 81 | + df.loc[idx[:min_time_value, :], "missing_val"] = Nans.PRIVACY |
| 82 | + |
| 83 | + # Mark any remaining nans with unknown |
| 84 | + remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING) |
| 85 | + df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN |
| 86 | + return df |
| 87 | + |
| 88 | + |
67 | 89 | def run_module(params: Dict[str, Dict[str, Any]]):
|
68 | 90 | """Run the usafacts indicator.
|
69 | 91 |
|
@@ -112,37 +134,28 @@ def run_module(params: Dict[str, Dict[str, Any]]):
|
112 | 134 | df = dfs[metric]
|
113 | 135 | # Aggregate to appropriate geographic resolution
|
114 | 136 | df = geo_map(df, geo_res, sensor)
|
115 |
| - df["val"] = df[["geo_id", sensor]].groupby("geo_id")[sensor].transform( |
116 |
| - SMOOTHERS_MAP[smoother][0].smooth |
117 |
| - ) |
118 |
| - df["se"] = np.nan |
119 |
| - df["sample_size"] = np.nan |
| 137 | + df.set_index(["timestamp", "geo_id"], inplace=True) |
120 | 138 |
|
121 |
| - # Default missing code |
122 |
| - df["missing_val"] = Nans.NOT_MISSING |
123 |
| - df["missing_se"] = Nans.NOT_APPLICABLE |
124 |
| - df["missing_sample_size"] = Nans.NOT_APPLICABLE |
| 139 | + # Smooth |
| 140 | + smooth_obj, smoother_prefix, _, smoother_lag = SMOOTHERS_MAP[smoother] |
| 141 | + df["val"] = df[sensor].groupby(level=1).transform(smooth_obj.smooth) |
125 | 142 |
|
126 |
| - # Mark early smoothing entries as data insufficient |
127 |
| - if smoother == "seven_day_average": |
128 |
| - df.sort_index(inplace=True) |
129 |
| - min_time_value = df.index.min()[0] + 6 * pd.Timedelta(days=1) |
130 |
| - df.loc[idx[:min_time_value, :], "missing_val"] = Nans.DATA_INSUFFICIENT |
| 143 | + # USAFacts is not a survey indicator |
| 144 | + df["se"] = np.nan |
| 145 | + df["sample_size"] = np.nan |
131 | 146 |
|
132 |
| - # Mark any remaining nans with unknown |
133 |
| - remaining_nans_mask = df["val"].isnull() & (df["missing_val"] == Nans.NOT_MISSING) |
134 |
| - df.loc[remaining_nans_mask, "missing_val"] = Nans.UNKNOWN |
| 147 | + df = add_nancodes(df, smoother) |
135 | 148 |
|
136 | 149 | df.reset_index(inplace=True)
|
137 | 150 | sensor_name = SENSOR_NAME_MAP[sensor][0]
|
138 |
| - # if (SENSOR_NAME_MAP[sensor][1] or SMOOTHERS_MAP[smoother][2]): |
| 151 | + # if (SENSOR_NAME_MAP[sensor][1] or is_smooth_wip): |
139 | 152 | # metric = f"wip_{metric}"
|
140 | 153 | # sensor_name = WIP_SENSOR_NAME_MAP[sensor][0]
|
141 |
| - sensor_name = SMOOTHERS_MAP[smoother][1] + sensor_name |
| 154 | + sensor_name = smoother_prefix + sensor_name |
142 | 155 | exported_csv_dates = create_export_csv(
|
143 | 156 | df,
|
144 | 157 | export_dir=export_dir,
|
145 |
| - start_date=SMOOTHERS_MAP[smoother][3](export_start_date), |
| 158 | + start_date=smoother_lag(export_start_date), |
146 | 159 | metric=metric,
|
147 | 160 | geo_res=geo_res,
|
148 | 161 | sensor=sensor_name,
|
|
0 commit comments