Skip to content

Commit f3899e4

Browse files
committed
wip
1 parent 80702b3 commit f3899e4

File tree

3 files changed

+146
-72
lines changed

3 files changed

+146
-72
lines changed

R/aux_data_utils.R

Lines changed: 66 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -697,6 +697,8 @@ delete_files_from_s3 <- function(bucket, keys, batch_size = 500, .progress = TRU
697697
purrr::walk(~aws.s3::delete_object(bucket = bucket, object = .x), .progress = .progress)
698698
}
699699

700+
#' All in one function to get and cache a nhsn archive from raw files.
701+
#'
700702
#' @description
701703
#' This takes in all of the raw data files for the nhsn data, creates a
702704
#' quasi-archive (it keeps one example per version-day, rather than one per
@@ -707,72 +709,89 @@ delete_files_from_s3 <- function(bucket, keys, batch_size = 500, .progress = TRU
707709
#' containing the data for `disease_name`.
708710
create_nhsn_data_archive <- function(disease_name) {
709711
if (aws.s3::head_object("archive_timestamped.parquet", bucket = "forecasting-team-data")) {
712+
# Load the previous archive from S3
710713
aws.s3::save_object("archive_timestamped.parquet", bucket = "forecasting-team-data", file = here::here("cache/archive_timestamped.parquet"))
711714
previous_archive <- qs::qread(here::here("cache/archive_timestamped.parquet"))
712715
last_timestamp <- max(previous_archive$version_timestamp)
713716
} else {
714-
# there is no remote
715-
previous_archive <- NULL
717+
# Remote archive does not exist, so start from scratch
718+
previous_archive <- tibble()
716719
last_timestamp <- as.Date("1000-01-01")
717720
}
718-
new_data <- aws.s3::get_bucket_df(bucket = "forecasting-team-data", prefix = "nhsn_data_") %>%
719-
filter(get_version_timestamp(Key) > last_timestamp) %>%
720-
pull(Key) %>%
721-
lapply(
722-
function(filename) {
723-
version_timestamp <- get_version_timestamp(filename)
724-
res <- NULL
725-
tryCatch(
726-
{
727-
s3load(object = filename, bucket = "forecasting-team-data")
728-
if (grepl("prelim", filename)) {
729-
res <- epi_data_raw_prelim
730-
endpoint_val <- "prelim"
731-
} else {
732-
res <- epi_data_raw
733-
endpoint_val <- "basic"
734-
}
735-
res <- res %>%
736-
process_nhsn_data() %>%
737-
select(geo_value, disease, time_value, value) %>%
738-
mutate(version_timestamp = version_timestamp, endpoint = endpoint_val)
739-
},
740-
error = function(cond) {}
741-
)
742-
res
743-
}
744-
)
745-
# drop any duplicates on the same day
746-
compactified <-
747-
new_data %>%
748-
bind_rows()
749-
if (nrow(compactified) == 0) {
721+
722+
# Get a list of all new dataset snapshots from S3
723+
new_data_files <- aws.s3::get_bucket_df(bucket = "forecasting-team-data", prefix = "nhsn_data_") %>%
724+
mutate(version_timestamp = get_version_timestamp(Key), version = as.Date(version_timestamp)) %>%
725+
filter(version_timestamp > last_timestamp) %>%
726+
as_tibble()
727+
new_data_files_latest_per_day <- new_data_files
728+
# Filter to just the latest version_timestamp for each version date.
729+
new_data_files_latest_per_day <- new_data_files %>%
730+
group_by(version) %>%
731+
slice_max(version_timestamp) %>%
732+
ungroup()
733+
734+
if (length(new_data_files_latest_per_day) == 0) {
735+
# No new data, so just use the previous archive
750736
one_per_day <- previous_archive
751737
} else {
752-
compactified <-
753-
compactified %>%
754-
arrange(geo_value, time_value, disease, endpoint, version_timestamp) %>%
738+
# Process each new dataset snapshot
739+
new_data <- new_data_files_latest_per_day$Key %>%
740+
map(
741+
function(filename) {
742+
version_timestamp <- get_version_timestamp(filename)
743+
res <- NULL
744+
tryCatch(
745+
{
746+
s3load(object = filename, bucket = "forecasting-team-data")
747+
if (grepl("prelim", filename)) {
748+
res <- epi_data_raw_prelim
749+
endpoint_val <- "prelim"
750+
} else {
751+
res <- epi_data_raw
752+
endpoint_val <- "basic"
753+
}
754+
res <- res %>%
755+
process_nhsn_data() %>%
756+
select(geo_value, disease, time_value, value) %>%
757+
mutate(version_timestamp = version_timestamp, endpoint = endpoint_val)
758+
},
759+
error = function(cond) {
760+
cli::cli_warn("Error processing {filename}: {cond}")
761+
NULL
762+
}
763+
)
764+
res
765+
}, .progress = TRUE
766+
)
767+
768+
new_data %>%
769+
bind_rows() %>%
755770
mutate(version = as.Date(version_timestamp)) %>%
756-
filter(if_any(
757-
c(everything(), -endpoint, -version_timestamp), # all non-version, non-endpoint columns
758-
~ !epiprocess:::is_locf(., .Machine$double.eps^0.5)
759-
))
771+
group_by(version, disease, geo_value, time_value) %>%
772+
slice_max(version_timestamp) %>%
773+
ungroup()
760774

775+
# Only keep the values with the latest version_timestamp for a given version date.
776+
# We only need to do this for the versions in compactified, as the other versions can't possibly change
761777
unchanged <- previous_archive %>% filter(!(version %in% unique(compactified$version)))
762-
# only keep the last value for a given version (so across version_timestamps)
763-
# we only need to do this for the versions in compactified, as the other versions can't possibly change
764-
one_per_day <-
765-
previous_archive %>%
778+
one_per_day <- previous_archive %>%
766779
filter(version %in% unique(compactified$version)) %>%
767780
bind_rows(compactified) %>%
768781
group_by(geo_value, disease, time_value, version) %>%
769-
arrange(version_timestamp) %>%
770-
filter(row_number() == n()) %>%
782+
slice_max(version_timestamp) %>%
771783
ungroup() %>%
772784
bind_rows(unchanged)
773785
qs::qsave(one_per_day, here::here("cache/archive_timestamped.parquet"))
774786
aws.s3::put_object(here::here("cache/archive_timestamped.parquet"), "archive_timestamped.parquet", bucket = "forecasting-team-data")
775787
}
788+
789+
if (nrow(one_per_day) == 0) {
790+
cli::cli_warn("No data found for {disease_name}")
791+
return(NULL)
792+
}
793+
794+
# Return the archive for the disease of interest.
776795
one_per_day %>%
777796
filter(disease == disease_name) %>%
778797
select(-version_timestamp, -endpoint, -disease) %>%

R/utils.R

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,7 @@ update_site <- function(sync_to_s3 = TRUE) {
326326
if (!file_exists(template_path)) {
327327
stop("Template file does not exist.")
328328
}
329+
329330
report_md_content <- readLines(template_path)
330331
# Get the list of files in the reports directory
331332
report_files <- dir_ls(reports_dir, regexp = ".*_prod_on_.*.html")

scripts/nhsn_download.R

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,82 @@
1-
print("########################################")
2-
print("Starting at")
3-
print(Sys.time())
4-
print("########################################")
5-
# the crontab that is used to run this is:
1+
# NHSN Archive Builder
2+
#
3+
# This script is meant to run every minute. It downloads the latest NHSN data from
4+
# the CDC website and adds it to the archive.
5+
#
6+
# It is meant to be general and usable for other data source projects. Adapting
7+
# it to other data sources you will need to consider:
8+
# 1. The new data source URL
9+
# 2. A way to capture the last updated timestamp on the new data source
10+
#
11+
# The crontab:
612
# 31 0-23 * * 2,3,5 cd /path/to/root/of/this/project && direnv exec /path/to/root/of/this/project /usr/bin/Rscript scripts/nhsn_download.R >> cache/nhsn_download.log 2>&1
7-
suppressPackageStartupMessages(source(here::here("R", "load_all.R")))
13+
14+
library(tidyverse)
15+
library(here)
16+
library(httr)
817
library(readr)
9-
library(epiprocess)
18+
library(purrr)
1019
library(qs)
11-
save_folder <- here::here("cache")
12-
dir.create(save_folder)
13-
dir.create(file.path(save_folder, "raw_data"))
14-
15-
# read and immediately save the raw version
16-
epi_data_raw <- readr::read_csv("https://data.cdc.gov/resource/ua7e-t2fy.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm")
17-
epi_data_raw_prelim <- readr::read_csv("https://data.cdc.gov/resource/mpgq-jmmr.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm")
18-
raw_file <- glue::glue("nhsn_data_{Sys.time()}") %>%
19-
gsub(" ", "_", .) %>%
20-
gsub(":", "-", .)
21-
raw_path <- raw_file %>%
22-
file.path(save_folder, "raw_data", .) %>%
23-
paste0(".parquet")
24-
qs::qsave(epi_data_raw, raw_path)
25-
s3save(epi_data_raw, object = paste0(raw_file, ".rds"), bucket = "forecasting-team-data")
26-
s3save(epi_data_raw_prelim, object = paste0(raw_file, "_prelim", ".rds"), bucket = "forecasting-team-data")
27-
28-
create_nhsn_data_archive()
20+
21+
22+
config <- list(
23+
raw_url = "https://data.cdc.gov/resource/ua7e-t2fy.csv",
24+
prelim_url = "https://data.cdc.gov/resource/mpgq-jmmr.csv",
25+
raw_metadata_url = "https://data.cdc.gov/api/views/ua7e-t2fy",
26+
prelim_metadata_url = "https://data.cdc.gov/api/views/mpgq-jmmr",
27+
save_folder = here::here("cache"),
28+
raw_file_name = "nhsn_data",
29+
last_updated_at_file_name = "last_updated_at.rds",
30+
s3_bucket = "forecasting-team-data",
31+
s3_key = "nhsn_data.parquet"
32+
)
33+
34+
get_socrata_updated_at <- function(dataset_url) {
35+
httr::GET(dataset_url) %>%
36+
httr::content() %>%
37+
pluck("rowsUpdatedAt") %>%
38+
as.POSIXct()
39+
}
40+
41+
main <- function() {
42+
# Create the save folder if it doesn't exist
43+
if (!dir.exists(config$save_folder)) {
44+
dir.create(config$save_folder)
45+
}
46+
if (!dir.exists(file.path(config$save_folder, "raw_data"))) {
47+
dir.create(file.path(config$save_folder, "raw_data"))
48+
}
49+
if (file.exists(file.path(config$save_folder, config$last_updated_at_file_name))) {
50+
last_updated_at <- read_rds(file.path(config$save_folder, config$last_updated_at_file_name))
51+
} else {
52+
last_updated_at <- tibble(
53+
raw = NA_POSIXct_,
54+
prelim = NA_POSIXct_
55+
)
56+
}
57+
58+
raw_update_at <- get_socrata_updated_at(config$raw_metadata_url)
59+
prelim_update_at <- get_socrata_updated_at(config$prelim_metadata_url)
60+
61+
raw_file_name <- glue::glue("{config$raw_file_name}_{Sys.time()}") %>%
62+
gsub(" ", "_", .) %>%
63+
gsub(":", "-", .)
64+
raw_path <- file.path(config$save_folder, "raw_data", raw_file_name) %>%
65+
paste0(".parquet")
66+
67+
if (raw_update_at > last_updated_at$raw) {
68+
# read and immediately save the raw version
69+
epi_data_raw <- readr::read_csv("https://data.cdc.gov/resource/ua7e-t2fy.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm")
70+
last_updated_at$raw <- raw_update_at
71+
qs::qsave(epi_data_raw, raw_path)
72+
s3save(epi_data_raw, object = paste0(raw_file, ".rds"), bucket = "forecasting-team-data")
73+
}
74+
75+
if (prelim_update_at > last_updated_at$prelim) {
76+
epi_data_raw_prelim <- readr::read_csv("https://data.cdc.gov/resource/mpgq-jmmr.csv?$limit=20000&$select=weekendingdate,jurisdiction,totalconfc19newadm,totalconfflunewadm")
77+
last_updated_at$prelim <- prelim_update_at
78+
qs::qsave(epi_data_raw_prelim, raw_path)
79+
s3save(epi_data_raw_prelim, object = paste0(raw_file, "_prelim", ".rds"), bucket = "forecasting-team-data")
80+
}
81+
create_nhsn_data_archive()
82+
}

0 commit comments

Comments
 (0)