@@ -188,38 +188,76 @@ daily_to_weekly <- function(epi_df, agg_method = c("sum", "mean"), day_of_week =
188
188
select(- epiweek , - year )
189
189
}
190
190
191
+ # ' Aggregate a daily archive to a weekly archive.
192
+ # '
193
+ # ' @param epi_arch the archive to aggregate.
194
+ # ' @param agg_columns the columns to aggregate.
195
+ # ' @param agg_method the method to use to aggregate the data, one of "sum" or "mean".
196
+ # ' @param day_of_week the day of the week to use as the reference day.
197
+ # ' @param day_of_week_end the day of the week to use as the end of the week.
191
198
daily_to_weekly_archive <- function (epi_arch ,
192
199
agg_columns ,
193
200
agg_method = c(" sum" , " mean" ),
194
201
day_of_week = 4L ,
195
202
day_of_week_end = 7L ) {
203
+ # How to aggregate the windowed data.
196
204
agg_method <- arg_match(agg_method )
205
+ # The columns we will later group by when aggregating.
197
206
keys <- key_colnames(epi_arch , exclude = c(" time_value" , " version" ))
207
+ # The versions we will slide over.
198
208
ref_time_values <- epi_arch $ DT $ version %> %
199
209
unique() %> %
200
210
sort()
211
+ # Choose a fast function to use to slide and aggregate.
201
212
if (agg_method == " sum" ) {
202
213
slide_fun <- epi_slide_sum
203
214
} else if (agg_method == " mean" ) {
204
215
slide_fun <- epi_slide_mean
205
216
}
206
- too_many_tibbles <- epix_slide(
217
+ # Slide over the versions and aggregate.
218
+ epix_slide(
207
219
epi_arch ,
208
- .before = 99999999L ,
209
220
.versions = ref_time_values ,
210
- function (x , group , ref_time ) {
211
- ref_time_last_week_end <-
212
- floor_date(ref_time , " week" , day_of_week_end - 1 ) # this is over by 1
221
+ function (x , group_keys , ref_time ) {
222
+ # The last day of the week we will slide over.
223
+ ref_time_last_week_end <- floor_date(ref_time , " week" , day_of_week_end - 1 )
224
+
225
+ # To find the days we will slide over, we need to find the first and last
226
+ # complete weeks of data. Get the max and min times, and then find the
227
+ # first and last complete weeks of data.
228
+ min_time <- min(x $ time_value )
213
229
max_time <- max(x $ time_value )
214
- valid_slide_days <- seq.Date(
215
- from = ceiling_date(min(x $ time_value ), " week" , week_start = day_of_week_end - 1 ),
216
- to = floor_date(max(x $ time_value ), " week" , week_start = day_of_week_end - 1 ),
217
- by = 7L
218
- )
230
+
231
+ # Let's determine if the min and max times are in the same week.
232
+ ceil_min_time <- ceiling_date(min_time , " week" , week_start = day_of_week_end - 1 )
233
+ ceil_max_time <- ceiling_date(max_time , " week" , week_start = day_of_week_end - 1 )
234
+
235
+ # If they're not in the same week, this means we have at least one
236
+ # complete week of data to slide over.
237
+ if (ceil_min_time < ceil_max_time ) {
238
+ valid_slide_days <- seq.Date(
239
+ from = ceiling_date(min_time , " week" , week_start = day_of_week_end - 1 ),
240
+ to = floor_date(max_time , " week" , week_start = day_of_week_end - 1 ),
241
+ by = 7L
242
+ )
243
+ } else {
244
+ # This is the degenerate case, where we have about 1 week or less of
245
+ # data. In this case, we opt to return nothing for two reasons:
246
+ # 1. in most cases here, the data is incomplete for a single week,
247
+ # 2. if the data is complete, a single week of data is not enough to
248
+ # reasonably perform any kind of aggregation.
249
+ return (tibble())
250
+ }
251
+
252
+ # If the last day of the week is not the end of the week, add it to the
253
+ # list of valid slide days (this will produce an incomplete slide, but
254
+ # that's fine for us, since it should only be 1 day, historically.)
219
255
if (wday(max_time ) != day_of_week_end ) {
220
256
valid_slide_days <- c(valid_slide_days , max_time )
221
257
}
222
- slid_result <- x %> %
258
+
259
+ # Slide over the days and aggregate.
260
+ x %> %
223
261
group_by(across(all_of(keys ))) %> %
224
262
slide_fun(
225
263
agg_columns ,
@@ -229,18 +267,13 @@ daily_to_weekly_archive <- function(epi_arch,
229
267
) %> %
230
268
select(- all_of(agg_columns )) %> %
231
269
rename_with(~ gsub(" slide_value_" , " " , .x )) %> %
232
- # only keep 1/week
233
- # group_by week, keep the largest in each week
234
- # alternatively
235
- # switch time_value to the designated day of the week
270
+ rename_with(~ gsub(" _7dsum" , " " , .x )) %> %
271
+ # Round all dates to reference day of the week. These will get
272
+ # de-duplicated by compactify in as_epi_archive below.
236
273
mutate(time_value = round_date(time_value , " week" , day_of_week - 1 )) %> %
237
274
as_tibble()
238
275
}
239
- )
240
- too_many_tibbles %> %
241
- pull(time_value ) %> %
242
- max()
243
- too_many_tibbles %> %
276
+ ) %> %
244
277
as_epi_archive(compactify = TRUE )
245
278
}
246
279
@@ -313,9 +346,8 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
313
346
314
347
most_recent_row <- meta_data %> %
315
348
# update_date is actually a time, so we need to filter for the day after.
316
- filter(update_date < = as_of + 1 ) %> %
317
- arrange(desc(update_date )) %> %
318
- slice(1 )
349
+ filter(update_date < = as.Date(as_of ) + 1 ) %> %
350
+ slice_max(update_date )
319
351
320
352
if (nrow(most_recent_row ) == 0 ) {
321
353
cli :: cli_abort(" No data available for the given date." )
@@ -331,9 +363,7 @@ get_health_data <- function(as_of, disease = c("covid", "flu")) {
331
363
if (disease == " covid" ) {
332
364
data %<> % mutate(
333
365
hhs = previous_day_admission_adult_covid_confirmed +
334
- previous_day_admission_adult_covid_suspected +
335
- previous_day_admission_pediatric_covid_confirmed +
336
- previous_day_admission_pediatric_covid_suspected
366
+ previous_day_admission_pediatric_covid_confirmed
337
367
)
338
368
} else if (disease == " flu" ) {
339
369
data %<> % mutate(hhs = previous_day_admission_influenza_confirmed )
@@ -594,15 +624,16 @@ gen_ili_data <- function(default_day_of_week = 1) {
594
624
as_epi_archive(compactify = TRUE )
595
625
}
596
626
627
+ # ' Process Raw NHSN Data
628
+ # '
629
+ # ' Turns the raw NHSN data into a tidy format with the following columns:
630
+ # ' - geo_value: the jurisdiction of the data
631
+ # ' - disease: the disease of the data
632
+ # ' - time_value: the date of the data
633
+ # ' - version: the version of the data
634
+ # ' - value: the value of the data
635
+ # '
597
636
process_nhsn_data <- function (raw_nhsn_data ) {
598
- # These are exception dates when the data was available on a different day
599
- # than usual. In these two cases, it was the Thursday after. But to keep
600
- # the rest of the pipeline the same, we pretend it was available on Wednesday.
601
- remap_exceptions <- list (
602
- " 2024-12-26" = " 2024-12-25" ,
603
- " 2025-01-02" = " 2025-01-01"
604
- )
605
- fixed_version <- remap_exceptions [[as.character(Sys.Date())]] %|| % Sys.Date()
606
637
raw_nhsn_data %> %
607
638
mutate(
608
639
geo_value = tolower(jurisdiction ),
@@ -614,15 +645,58 @@ process_nhsn_data <- function(raw_nhsn_data) {
614
645
select(- weekendingdate , - jurisdiction , - starts_with(" totalconf" )) %> %
615
646
pivot_longer(cols = starts_with(" nhsn" ), names_to = " disease" ) %> %
616
647
filter(! is.na(value )) %> %
617
- mutate(version = fixed_version ) %> %
648
+ mutate(version = Sys.Date() ) %> %
618
649
relocate(geo_value , disease , time_value , version )
619
650
}
620
651
621
652
622
653
# for filenames of the form nhsn_data_2024-11-19_16-29-43.191649.rds
623
654
get_version_timestamp <- function (filename ) ymd_hms(str_match(filename , " [0-9]{4}-..-.._..-..-..\\ .[^.^_]*" ))
624
655
625
- # ' all in one function to get and cache a nhsn archive from raw files
656
+ # ' Remove duplicate files from S3
657
+ # '
658
+ # ' Removes duplicate files from S3 by keeping only the earliest timestamp file for each ETag.
659
+ # ' You can modify keep_df, if this doesn't suit your needs.
660
+ # '
661
+ # ' @param bucket The name of the S3 bucket.
662
+ # ' @param prefix The prefix of the files to remove duplicates from.
663
+ # ' @param dry_run Whether to actually delete the files.
664
+ # ' @param .progress Whether to show a progress bar.
665
+ delete_duplicates_from_s3_by_etag <- function (bucket , prefix , dry_run = TRUE , .progress = TRUE ) {
666
+ # Get a list of all new dataset snapshots from S3
667
+ files_df <- aws.s3 :: get_bucket_df(bucket = bucket , prefix = prefix ) %> % as_tibble()
668
+
669
+ # Create a list of all the files to keep by keeping the earliest timestamp file for each ETag
670
+ keep_df <- files_df %> %
671
+ group_by(ETag ) %> %
672
+ slice_min(LastModified ) %> %
673
+ ungroup()
674
+ delete_df <- files_df %> %
675
+ anti_join(keep_df , by = " Key" )
676
+ if (nrow(delete_df ) > 0 ) {
677
+ if (dry_run ) {
678
+ cli :: cli_alert_info(" Would delete {nrow(delete_df)} files from {bucket} with prefix {prefix}" )
679
+ print(delete_df )
680
+ return (invisible (delete_df ))
681
+ } else {
682
+ delete_files_from_s3(bucket = bucket , keys = delete_df $ Key , .progress = .progress )
683
+ }
684
+ }
685
+ }
686
+
687
+ # ' Delete files from S3
688
+ # '
689
+ # ' Faster than aws.s3::delete_object, when there are many files to delete (thousands).
690
+ # '
691
+ # ' @param bucket The name of the S3 bucket.
692
+ # ' @param keys The keys of the files to delete, as a character vector.
693
+ # ' @param batch_size The number of files to delete in each batch.
694
+ # ' @param .progress Whether to show a progress bar.
695
+ delete_files_from_s3 <- function (bucket , keys , batch_size = 500 , .progress = TRUE ) {
696
+ split(keys , ceiling(seq_along(keys ) / batch_size )) %> %
697
+ purrr :: walk(~ aws.s3 :: delete_object(bucket = bucket , object = .x ), .progress = .progress )
698
+ }
699
+
626
700
# ' @description
627
701
# ' This takes in all of the raw data files for the nhsn data, creates a
628
702
# ' quasi-archive (it keeps one example per version-day, rather than one per
@@ -709,10 +783,12 @@ create_nhsn_data_archive <- function(disease_name) {
709
783
as_epi_archive(compactify = TRUE )
710
784
}
711
785
712
-
713
786
up_to_date_nssp_state_archive <- function (disease = c(" covid" , " influenza" )) {
714
787
disease <- arg_match(disease )
715
- nssp_state <- pub_covidcast(
788
+ nssp_state <- retry_fn(
789
+ max_attempts = 10 ,
790
+ wait_seconds = 1 ,
791
+ fn = pub_covidcast ,
716
792
source = " nssp" ,
717
793
signal = glue :: glue(" pct_ed_visits_{disease}" ),
718
794
time_type = " week" ,
@@ -728,3 +804,26 @@ up_to_date_nssp_state_archive <- function(disease = c("covid", "influenza")) {
728
804
mutate(time_value = time_value + 3 ) %> %
729
805
as_epi_archive(compactify = TRUE )
730
806
}
807
+
808
+ # Get the last time the signal was updated.
809
+ get_covidcast_signal_last_update <- function (source , signal , geo_type ) {
810
+ pub_covidcast_meta() %> %
811
+ filter(source == !! source , signal == !! signal , geo_type == !! geo_type ) %> %
812
+ pull(last_update ) %> %
813
+ as.POSIXct()
814
+ }
815
+
816
+ # Get the last time the Socrata dataset was updated.
817
+ get_socrata_updated_at <- function (dataset_url ) {
818
+ httr :: GET(dataset_url ) %> %
819
+ httr :: content() %> %
820
+ pluck(" rowsUpdatedAt" ) %> %
821
+ as.POSIXct()
822
+ }
823
+
824
+ get_s3_object_last_modified <- function (bucket , key ) {
825
+ # Format looks like "Fri, 31 Jan 2025 22:01:16 GMT"
826
+ attr(aws.s3 :: head_object(key , bucket = bucket ), " last-modified" ) %> %
827
+ str_replace_all(" GMT" , " " ) %> %
828
+ as.POSIXct(format = " %a, %d %b %Y %H:%M:%S" )
829
+ }
0 commit comments