Skip to content

covid: evaluate current season #168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .renvignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
tmp.R
node_modules/*
local_logs/
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ submit: submit-covid submit-flu
get_nwss:
mkdir -p aux_data/nwss_covid_data; \
mkdir -p aux_data/nwss_flu_data; \
. .venv/bin/activate; \
cd scripts/nwss_export_tool/; \
python nwss_covid_export.py; \
python nwss_covid_export.py
python nwss_influenza_export.py

run-nohup:
nohup Rscript scripts/run.R &
Expand Down
111 changes: 83 additions & 28 deletions R/aux_data_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -188,38 +188,76 @@ daily_to_weekly <- function(epi_df, agg_method = c("sum", "mean"), day_of_week =
select(-epiweek, -year)
}

#' Aggregate a daily archive to a weekly archive.
#'
#' @param epi_arch the archive to aggregate.
#' @param agg_columns the columns to aggregate.
#' @param agg_method the method to use to aggregate the data, one of "sum" or "mean".
#' @param day_of_week the day of the week to use as the reference day.
#' @param day_of_week_end the day of the week to use as the end of the week.
daily_to_weekly_archive <- function(epi_arch,
agg_columns,
agg_method = c("sum", "mean"),
day_of_week = 4L,
day_of_week_end = 7L) {
# How to aggregate the windowed data.
agg_method <- arg_match(agg_method)
# The columns we will later group by when aggregating.
keys <- key_colnames(epi_arch, exclude = c("time_value", "version"))
# The versions we will slide over.
ref_time_values <- epi_arch$DT$version %>%
unique() %>%
sort()
# Choose a fast function to use to slide and aggregate.
if (agg_method == "sum") {
slide_fun <- epi_slide_sum
} else if (agg_method == "mean") {
slide_fun <- epi_slide_mean
}
too_many_tibbles <- epix_slide(
# Slide over the versions and aggregate.
epix_slide(
epi_arch,
.before = 99999999L,
.versions = ref_time_values,
function(x, group, ref_time) {
ref_time_last_week_end <-
floor_date(ref_time, "week", day_of_week_end - 1) # this is over by 1
function(x, group_keys, ref_time) {
# The last day of the week we will slide over.
ref_time_last_week_end <- floor_date(ref_time, "week", day_of_week_end - 1)

# To find the days we will slide over, we need to find the first and last
# complete weeks of data. Get the max and min times, and then find the
# first and last complete weeks of data.
min_time <- min(x$time_value)
max_time <- max(x$time_value)
valid_slide_days <- seq.Date(
from = ceiling_date(min(x$time_value), "week", week_start = day_of_week_end - 1),
to = floor_date(max(x$time_value), "week", week_start = day_of_week_end - 1),
by = 7L
)

# Let's determine if the min and max times are in the same week.
ceil_min_time <- ceiling_date(min_time, "week", week_start = day_of_week_end - 1)
ceil_max_time <- ceiling_date(max_time, "week", week_start = day_of_week_end - 1)

# If they're not in the same week, this means we have at least one
# complete week of data to slide over.
if (ceil_min_time < ceil_max_time) {
valid_slide_days <- seq.Date(
from = ceiling_date(min_time, "week", week_start = day_of_week_end - 1),
to = floor_date(max_time, "week", week_start = day_of_week_end - 1),
by = 7L
)
} else {
# This is the degenerate case, where we have about 1 week or less of
# data. In this case, we opt to return nothing for two reasons:
# 1. in most cases here, the data is incomplete for a single week,
# 2. if the data is complete, a single week of data is not enough to
# reasonably perform any kind of aggregation.
return(tibble())
}

# If the last day of the week is not the end of the week, add it to the
# list of valid slide days (this will produce an incomplete slide, but
# that's fine for us, since it should only be 1 day, historically.)
if (wday(max_time) != day_of_week_end) {
valid_slide_days <- c(valid_slide_days, max_time)
}
slid_result <- x %>%

# Slide over the days and aggregate.
x %>%
group_by(across(all_of(keys))) %>%
slide_fun(
agg_columns,
Expand All @@ -229,18 +267,13 @@ daily_to_weekly_archive <- function(epi_arch,
) %>%
select(-all_of(agg_columns)) %>%
rename_with(~ gsub("slide_value_", "", .x)) %>%
# only keep 1/week
# group_by week, keep the largest in each week
# alternatively
# switch time_value to the designated day of the week
rename_with(~ gsub("_7dsum", "", .x)) %>%
# Round all dates to reference day of the week. These will get
# de-duplicated by compactify in as_epi_archive below.
mutate(time_value = round_date(time_value, "week", day_of_week - 1)) %>%
as_tibble()
}
)
too_many_tibbles %>%
pull(time_value) %>%
max()
too_many_tibbles %>%
) %>%
as_epi_archive(compactify = TRUE)
}

Expand Down Expand Up @@ -313,9 +346,8 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {

most_recent_row <- meta_data %>%
# update_date is actually a time, so we need to filter for the day after.
filter(update_date <= as_of + 1) %>%
arrange(desc(update_date)) %>%
slice(1)
filter(update_date <= as.Date(as_of) + 1) %>%
slice_max(update_date)

if (nrow(most_recent_row) == 0) {
cli::cli_abort("No data available for the given date.")
Expand All @@ -331,9 +363,7 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
if (disease == "covid") {
data %<>% mutate(
hhs = previous_day_admission_adult_covid_confirmed +
previous_day_admission_adult_covid_suspected +
previous_day_admission_pediatric_covid_confirmed +
previous_day_admission_pediatric_covid_suspected
previous_day_admission_pediatric_covid_confirmed
)
} else if (disease == "flu") {
data %<>% mutate(hhs = previous_day_admission_influenza_confirmed)
Expand Down Expand Up @@ -709,10 +739,12 @@ create_nhsn_data_archive <- function(disease_name) {
as_epi_archive(compactify = TRUE)
}


up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
disease <- arg_match(disease)
nssp_state <- pub_covidcast(
nssp_state <- retry_fn(
max_attempts = 10,
wait_seconds = 1,
fn = pub_covidcast,
source = "nssp",
signal = glue::glue("pct_ed_visits_{disease}"),
time_type = "week",
Expand All @@ -728,3 +760,26 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
mutate(time_value = time_value + 3) %>%
as_epi_archive(compactify = TRUE)
}

# Get the last time the signal was updated.
get_covidcast_signal_last_update <- function(source, signal) {
pub_covidcast_meta() %>%
filter(source == !!source, signal == !!signal) %>%
pull(last_update) %>%
as.POSIXct()
}

# Get the last time the Socrata dataset was updated.
get_socrata_updated_at <- function(dataset_url) {
httr::GET(dataset_url) %>%
httr::content() %>%
pluck("rowsUpdatedAt") %>%
as.POSIXct()
}

get_s3_object_last_modified <- function(bucket, key) {
# Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
attr(aws.s3::head_object(key, bucket = bucket), "last-modified") %>%
str_replace_all(" GMT", "") %>%
as.POSIXct(format = "%a, %d %b %Y %H:%M:%S")
}
31 changes: 12 additions & 19 deletions R/forecasters/data_transforms.R
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,10 @@ get_nonkey_names <- function(epi_data) {
#' @export
rolling_mean <- function(epi_data, width = 7L, cols_to_mean = NULL) {
cols_to_mean <- get_trainable_names(epi_data, cols_to_mean)
epi_data %<>% group_by(across(key_colnames(epi_data, exclude = "time_value")))
for (col in cols_to_mean) {
mean_name <- paste0("slide_", col, "_m", width)
epi_data %<>%
epi_slide_mean(all_of(col), .window_size = width) %>%
rename(!!mean_name := paste0("slide_value_", col))
}
epi_data %<>% ungroup()
return(epi_data)
epi_data %>%
group_by(across(key_colnames(epi_data, exclude = "time_value"))) %>%
epi_slide_mean(all_of(cols_to_mean), .window_size = width, .new_col_names = paste0("slide_", cols_to_mean, "_m", width)) %>%
ungroup()
}

#' Get a rolling standard deviation for the named columns
Expand All @@ -75,20 +70,18 @@ rolling_sd <- function(epi_data, sd_width = 29L, mean_width = NULL, cols_to_sd =
cols_to_sd <- get_trainable_names(epi_data, cols_to_sd)
result <- epi_data
result %<>% group_by(across(key_colnames(epi_data, exclude = "time_value")))
for (col in cols_to_sd) {
mean_name <- glue::glue("slide_{col}_m{mean_width}")
sd_name <- glue::glue("slide_{col}_sd{sd_width}")
for (col_name in cols_to_sd) {
mean_name <- glue::glue("slide_{col_name}_m{mean_width}")
sd_name <- glue::glue("slide_{col_name}_sd{sd_width}")

result %<>%
epi_slide_mean(all_of(col), .window_size = mean_width) %>%
rename(!!mean_name := paste0("slide_value_", col))
epi_slide_mean(all_of(col_name), .window_size = mean_width, .new_col_names = mean_name)

result %<>%
mutate(.temp = (.data[[mean_name]] - .data[[col]])^2) %>%
epi_slide_mean(all_of(".temp"), .window_size = sd_width) %>%
select(-.temp) %>%
rename(!!sd_name := "slide_value_.temp") %>%
mutate(!!sd_name := sqrt(.data[[sd_name]]))
mutate(.temp = (.data[[mean_name]] - .data[[col_name]])^2) %>%
epi_slide_mean(all_of(".temp"), .window_size = sd_width, .new_col_names = sd_name) %>%
mutate(!!sd_name := sqrt(.data[[sd_name]])) %>%
select(-.temp)

if (!keep_mean) {
result %<>% select(-{{ mean_name }})
Expand Down
4 changes: 2 additions & 2 deletions R/forecasters/ensemble_average.R
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ ensemble_average <- function(epi_data,
# their names are separated for obscure target related reasons
names(ensemble_args) <- ensemble_args_names %||% names(ensemble_args)
average_type <- ensemble_args$average_type %||% median
join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "prediction")
join_columns <- ensemble_args$join_columns %||% setdiff(names(forecasts[[1]]), "value")

# begin actual analysis
forecasts %>%
bind_rows(.id = "id") %>%
group_by(across(all_of(join_columns))) %>%
summarize(prediction = average_type(prediction, na.rm = TRUE), .groups = "drop") %>%
summarize(value = average_type(value, na.rm = TRUE), .groups = "drop") %>%
ungroup()
}
9 changes: 6 additions & 3 deletions R/forecasters/epipredict_utilities.R
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,11 @@ run_workflow_and_format <- function(preproc,
if (is.null(as_of)) {
as_of <- max(train_data$time_value)
}

# Look at the train data (uncomment for debuggin).
# df <- preproc %>% prep(train_data) %>% bake(train_data)
# browser()

workflow <- epi_workflow(preproc, trainer) %>%
fit(train_data) %>%
add_frosting(postproc)
Expand All @@ -125,9 +130,7 @@ run_workflow_and_format <- function(preproc,
# keeping only the last time_value for any given location/key
pred %<>%
group_by(across(all_of(key_colnames(train_data, exclude = "time_value")))) %>%
# TODO: slice_max(time_value)?
arrange(time_value) %>%
filter(row_number() == n()) %>%
slice_max(time_value) %>%
ungroup()
return(format_storage(pred, as_of))
}
Expand Down
1 change: 1 addition & 0 deletions R/forecasters/forecaster_baseline_linear.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#' epi_data is expected to have: geo_value, time_value, and value columns.
forecaster_baseline_linear <- function(epi_data, ahead, log = FALSE, sort = FALSE, residual_tail = 0.85, residual_center = 0.085, no_intercept = FALSE) {
epi_data <- validate_epi_data(epi_data)
forecast_date <- attributes(epi_data)$metadata$as_of
population_data <- get_population_data() %>%
rename(geo_value = state_id) %>%
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_climatological.R
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ climate_linear_ensembled <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

args_list <- list(...)
ahead <- as.integer(ahead / 7)
epi_data %<>% filter_extraneous(filter_source, filter_agg_level)
Expand Down
1 change: 1 addition & 0 deletions R/forecasters/forecaster_flatline.R
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ flatline_fc <- function(epi_data,
filter_source = "",
filter_agg_level = "",
...) {
epi_data <- validate_epi_data(epi_data)
# perform any preprocessing not supported by epipredict
epi_data %<>% filter_extraneous(filter_source, filter_agg_level)
# this is a temp fix until a real fix gets put into epipredict
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_flusion.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ flusion <- function(epi_data,
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)
derivative_estimator <- arg_match(derivative_estimator)

epi_data <- validate_epi_data(epi_data)

# perform any preprocessing not supported by epipredict
args_input <- list(...)
# this next part is basically unavoidable boilerplate you'll want to copy
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_no_recent_outcome.R
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ no_recent_outcome <- function(epi_data,
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)
week_method <- arg_match(week_method)

epi_data <- validate_epi_data(epi_data)

# this is for the case where there are multiple sources in the same column
epi_data %<>% filter_extraneous(filter_source, filter_agg_level)
args_input <- list(...)
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_scaled_pop.R
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ scaled_pop <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

# perform any preprocessing not supported by epipredict
#
# this is for the case where there are multiple sources in the same column
Expand Down
3 changes: 3 additions & 0 deletions R/forecasters/forecaster_scaled_pop_seasonal.R
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ scaled_pop_seasonal <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

if (typeof(seasonal_method) == "list") {
seasonal_method <- seasonal_method[[1]]
}
Expand Down
4 changes: 4 additions & 0 deletions R/forecasters/forecaster_smoothed_scaled.R
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ smoothed_scaled <- function(epi_data,
scale_method <- arg_match(scale_method)
center_method <- arg_match(center_method)
nonlin_method <- arg_match(nonlin_method)

epi_data <- validate_epi_data(epi_data)

# perform any preprocessing not supported by epipredict
#
# this is for the case where there are multiple sources in the same column
Expand Down Expand Up @@ -170,6 +173,7 @@ smoothed_scaled <- function(epi_data,
keep_mean = keep_mean
)
}

# need to make a version with the non seasonal and problematic flu seasons removed
if (drop_non_seasons) {
season_data <- epi_data %>% drop_non_seasons()
Expand Down
1 change: 1 addition & 0 deletions R/imports.R
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ library(parsnip)
library(paws.storage)
library(plotly)
library(purrr)
library(qs2)
library(quantreg)
library(readr)
library(recipes)
Expand Down
Loading
Loading