Skip to content

Commit 0792e19

Browse files
authored
Merge pull request #1814 from cmu-delphi/ndefries/backfill/speed2
[Backfill corrections] Incorporate changes from 1806 and 1808
2 parents 7567309 + bf3cd22 commit 0792e19

File tree

6 files changed

+45
-20
lines changed

6 files changed

+45
-20
lines changed

ansible/templates/backfillcorr-params-prod.json.j2

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@
44
"cache_dir": "./cache",
55
"testing_window": 1,
66
"training_days": 270,
7-
"lag_pad":2,
7+
"lag_pad": 2,
88
"export_dir": "./receiving",
99
"geo_levels": ["state"],
1010
"value_types": ["count", "fraction"],
1111
"num_col": "num",
1212
"denom_col": "den",
13+
"parallel": true,
14+
"parallel_max_cores": 2,
1315
"post": {
1416
"aws_credentials": {
1517
"aws_access_key_id": "{{ backfillcorr_aws_access_key_id }}",

backfill_corrections/delphiBackfillCorrection/DESCRIPTION

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,8 @@ Imports:
2727
parallel,
2828
purrr,
2929
vctrs,
30-
RcppRoll
30+
RcppRoll,
31+
bettermc
3132
Suggests:
3233
knitr (>= 1.15),
3334
rmarkdown (>= 1.4),

backfill_corrections/delphiBackfillCorrection/NAMESPACE

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ export(run_backfill)
2323
import(covidcast)
2424
importFrom(RcppRoll,roll_mean)
2525
importFrom(arrow,read_parquet)
26+
importFrom(bettermc,mclapply)
2627
importFrom(dplyr,"%>%")
2728
importFrom(dplyr,across)
2829
importFrom(dplyr,arrange)
@@ -31,9 +32,11 @@ importFrom(dplyr,bind_rows)
3132
importFrom(dplyr,desc)
3233
importFrom(dplyr,everything)
3334
importFrom(dplyr,filter)
35+
importFrom(dplyr,full_join)
3436
importFrom(dplyr,group_by)
3537
importFrom(dplyr,group_split)
3638
importFrom(dplyr,if_else)
39+
importFrom(dplyr,left_join)
3740
importFrom(dplyr,pull)
3841
importFrom(dplyr,select)
3942
importFrom(dplyr,starts_with)
@@ -47,7 +50,10 @@ importFrom(lubridate,make_date)
4750
importFrom(lubridate,month)
4851
importFrom(lubridate,year)
4952
importFrom(parallel,detectCores)
53+
importFrom(purrr,map)
5054
importFrom(purrr,map_dfc)
55+
importFrom(purrr,map_dfr)
56+
importFrom(purrr,reduce)
5157
importFrom(quantgen,quantile_lasso)
5258
importFrom(readr,write_csv)
5359
importFrom(stats,coef)

backfill_corrections/delphiBackfillCorrection/R/main.R

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
#'
1212
#' @importFrom dplyr %>% filter group_by summarize across everything group_split ungroup
1313
#' @importFrom tidyr drop_na
14+
#' @importFrom purrr map map_dfr
15+
#' @importFrom bettermc mclapply
1416
#'
1517
#' @export
1618
run_backfill <- function(df, params,
@@ -61,7 +63,12 @@ run_backfill <- function(df, params,
6163
group_dfs <- group_split(df, geo_value)
6264

6365
# Build model for each location
64-
for (subdf in group_dfs) {
66+
apply_fn <- ifelse(params$parallel, mclapply, lapply)
67+
result <- apply_fn(group_dfs, function(subdf) {
68+
# Make a copy with the same structure.
69+
state_test_data_list <- test_data_list
70+
state_coef_list <- coef_list
71+
6572
geo <- subdf$geo_value[1]
6673

6774
msg_ts("Processing ", geo, " geo group")
@@ -163,7 +170,7 @@ run_backfill <- function(df, params,
163170
test_data <- filtered_data[[2]]
164171

165172
if (nrow(train_data) == 0 || nrow(test_data) == 0) {
166-
msg_ts("Not enough data to either train or test for test_lag ",
173+
msg_ts("Not enough data to either train or test for test lag ",
167174
test_lag, ", skipping")
168175
next
169176
}
@@ -180,7 +187,6 @@ run_backfill <- function(df, params,
180187
params_list <- c(YITL, as.vector(unlist(covariates)))
181188

182189
# Model training and testing
183-
msg_ts("Training or loading models")
184190
prediction_results <- model_training_and_testing(
185191
train_data, test_data, taus = params$taus, covariates = params_list,
186192
lp_solver = params$lp_solver,
@@ -197,28 +203,32 @@ run_backfill <- function(df, params,
197203
# Model objects are saved during training, so only need to export
198204
# output if making predictions/corrections
199205
if (params$make_predictions) {
200-
msg_ts("Generating predictions")
201206
test_data <- prediction_results[[1]]
202207
coefs <- prediction_results[[2]]
203208
test_data <- evaluate(test_data, params$taus) %>%
204209
exponentiate_preds(params$taus)
205210

206211
key <- make_key(value_type, signal_suffix)
207-
idx <- length(test_data_list[[key]]) + 1
208-
test_data_list[[key]][[idx]] <- test_data
209-
coef_list[[key]][[idx]] <- coefs
212+
idx <- length(state_test_data_list[[key]]) + 1
213+
state_test_data_list[[key]][[idx]] <- test_data
214+
state_coef_list[[key]][[idx]] <- coefs
210215
}
211216
}# End for test lags
212217
}# End for value types
213218
}# End for signal suffixes
214-
}# End for geo list
219+
220+
return(list(coefs = state_coef_list, test_data = state_test_data_list))
221+
}) # End for geo list
222+
223+
test_data_list <- map(result, ~.x$test_data)
224+
coef_list <- map(result, ~.x$coefs)
215225

216226
if (params$make_predictions) {
217227
for (value_type in params$value_types) {
218228
for (signal_suffix in signal_suffixes) {
219229
key <- make_key(value_type, signal_suffix)
220-
test_combined <- bind_rows(test_data_list[[key]])
221-
coef_combined <- bind_rows(coef_list[[key]])
230+
test_combined <- map_dfr(test_data_list, ~.x[[key]])
231+
coef_combined <- map_dfr(coef_list, ~.x[[key]])
222232
export_test_result(test_combined, coef_combined,
223233
indicator=indicator, signal=signal,
224234
signal_suffix=signal_suffix,

backfill_corrections/delphiBackfillCorrection/R/preprocessing.R

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ add_weekofmonth <- function(df, time_col, wm = WEEK_ISSUES) {
187187
#' @template lag_col-template
188188
#' @template ref_lag-template
189189
#'
190+
#' @importFrom dplyr full_join left_join
191+
#' @importFrom purrr reduce
190192
#' @importFrom tidyr pivot_wider drop_na
191193
#'
192194
#' @export
@@ -203,16 +205,21 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) {
203205
names(avg_df)[names(avg_df) == value_col] <- 'value_7dav'
204206
avg_df_prev7 <- add_shift(avg_df, 7, refd_col)
205207
names(avg_df_prev7)[names(avg_df_prev7) == 'value_7dav'] <- 'value_prev_7dav'
206-
207-
backfill_df <- Reduce(function(x, y) merge(x, y, all=TRUE),
208-
list(df, avg_df, avg_df_prev7))
208+
209+
backfill_df <- reduce(
210+
list(df, avg_df, avg_df_prev7),
211+
full_join, by=c(refd_col, "issue_date")
212+
)
209213

210214
# Add target
211215
target_df <- df[df$lag==ref_lag, c(refd_col, value_col, "issue_date")]
212216
names(target_df)[names(target_df) == value_col] <- 'value_target'
213217
names(target_df)[names(target_df) == 'issue_date'] <- 'target_date'
214218

215-
backfill_df <- merge(backfill_df, target_df, by=refd_col, all.x=TRUE)
219+
backfill_df <- left_join(backfill_df, target_df, by=c(refd_col))
220+
221+
# Remove invalid rows
222+
backfill_df <- drop_na(backfill_df, c(lag_col))
216223

217224
# Add log values
218225
backfill_df$log_value_raw = log(backfill_df$value_raw + 1)
@@ -221,9 +228,6 @@ add_7davs_and_target <- function(df, value_col, refd_col, lag_col, ref_lag) {
221228
backfill_df$log_value_prev_7dav = log(backfill_df$value_prev_7dav + 1)
222229
backfill_df$log_7dav_slope = backfill_df$log_value_7dav - backfill_df$log_value_prev_7dav
223230

224-
# Remove invalid rows
225-
backfill_df <- drop_na(backfill_df, c(lag_col))
226-
227231
return (as.data.frame(backfill_df))
228232
}
229233

backfill_corrections/params.json.template

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,5 +9,7 @@
99
"geo_levels": ["state", "county"],
1010
"value_types": ["count", "fraction"],
1111
"num_col": "num",
12-
"denom_col": "den"
12+
"denom_col": "den",
13+
"parallel": false,
14+
"parallel_max_cores": 2
1315
}

0 commit comments

Comments
 (0)