Skip to content

Commit

Permalink
Merge pull request #1322 from ropensci/1314
Browse files Browse the repository at this point in the history
Content-addressable storage
  • Loading branch information
wlandau authored Aug 27, 2024
2 parents 409f991 + 8226e88 commit d8393be
Show file tree
Hide file tree
Showing 92 changed files with 2,377 additions and 451 deletions.
39 changes: 29 additions & 10 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ S3method(print,tar_resources_gcp)
S3method(print,tar_resources_network)
S3method(print,tar_resources_parquet)
S3method(print,tar_resources_qs)
S3method(print,tar_resources_repository_cas)
S3method(print,tar_resources_url)
S3method(print,tar_stem)
S3method(resources_validate,default)
Expand All @@ -37,6 +38,7 @@ S3method(resources_validate,tar_resources_gcp)
S3method(resources_validate,tar_resources_network)
S3method(resources_validate,tar_resources_parquet)
S3method(resources_validate,tar_resources_qs)
S3method(resources_validate,tar_resources_repository_cas)
S3method(resources_validate,tar_resources_url)
S3method(store_assert_format,default)
S3method(store_assert_format,tar_feather)
Expand Down Expand Up @@ -65,6 +67,7 @@ S3method(store_assert_repository_setting,aws)
S3method(store_assert_repository_setting,default)
S3method(store_assert_repository_setting,gcp)
S3method(store_assert_repository_setting,local)
S3method(store_assert_repository_setting,repository_cas)
S3method(store_cache_path,default)
S3method(store_cache_path,tar_cloud)
S3method(store_cache_path,tar_external)
Expand All @@ -86,23 +89,26 @@ S3method(store_class_format,url)
S3method(store_class_repository,aws)
S3method(store_class_repository,default)
S3method(store_class_repository,gcp)
S3method(store_class_repository,repository_cas)
S3method(store_convert_object,tar_feather)
S3method(store_convert_object,tar_fst)
S3method(store_convert_object,tar_fst_dt)
S3method(store_convert_object,tar_fst_tbl)
S3method(store_convert_object,tar_parquet)
S3method(store_convert_object,tar_store_custom)
S3method(store_convert_object,tar_store_file)
S3method(store_convert_object,tar_store_format_custom)
S3method(store_convert_object,tar_url)
S3method(store_copy_object,tar_fst_dt)
S3method(store_copy_object,tar_store_custom)
S3method(store_copy_object,tar_store_format_custom)
S3method(store_delete_object,default)
S3method(store_delete_object,tar_gcp)
S3method(store_delete_objects,default)
S3method(store_delete_objects,tar_aws)
S3method(store_delete_objects,tar_gcp)
S3method(store_delete_objects,tar_repository_cas)
S3method(store_ensure_correct_hash,default)
S3method(store_ensure_correct_hash,tar_cloud)
S3method(store_ensure_correct_hash,tar_repository_cas)
S3method(store_ensure_correct_hash,tar_store_file)
S3method(store_ensure_correct_hash,tar_url)
S3method(store_exist_object,default)
Expand All @@ -124,26 +130,29 @@ S3method(store_get_packages,tar_url)
S3method(store_has_correct_hash,default)
S3method(store_has_correct_hash,tar_aws)
S3method(store_has_correct_hash,tar_gcp)
S3method(store_has_correct_hash,tar_repository_cas)
S3method(store_has_correct_hash,tar_url)
S3method(store_hash_early,default)
S3method(store_hash_early,tar_aws_file)
S3method(store_hash_early,tar_gcp_file)
S3method(store_hash_early,tar_repository_cas)
S3method(store_hash_early,tar_repository_cas_file)
S3method(store_hash_early,tar_store_file)
S3method(store_hash_early,tar_url)
S3method(store_hash_late,default)
S3method(store_hash_late,tar_aws_file)
S3method(store_hash_late,tar_cloud)
S3method(store_hash_late,tar_gcp_file)
S3method(store_hash_late,tar_repository_cas)
S3method(store_hash_late,tar_repository_cas_file)
S3method(store_hash_late,tar_store_file)
S3method(store_hash_late,tar_url)
S3method(store_marshal_object,default)
S3method(store_marshal_object,tar_keras)
S3method(store_marshal_object,tar_store_custom)
S3method(store_marshal_object,tar_store_format_custom)
S3method(store_marshal_object,tar_torch)
S3method(store_marshal_value,default)
S3method(store_marshal_value,tar_nonexportable)
S3method(store_new,default)
S3method(store_new,format_custom)
S3method(store_path_from_record,default)
S3method(store_path_from_record,tar_external)
S3method(store_produce_path,default)
Expand All @@ -162,6 +171,8 @@ S3method(store_read_object,tar_aws)
S3method(store_read_object,tar_aws_file)
S3method(store_read_object,tar_gcp)
S3method(store_read_object,tar_gcp_file)
S3method(store_read_object,tar_repository_cas)
S3method(store_read_object,tar_repository_cas_file)
S3method(store_read_path,tar_feather)
S3method(store_read_path,tar_fst)
S3method(store_read_path,tar_fst_dt)
Expand All @@ -171,13 +182,14 @@ S3method(store_read_path,tar_null)
S3method(store_read_path,tar_parquet)
S3method(store_read_path,tar_qs)
S3method(store_read_path,tar_rds)
S3method(store_read_path,tar_store_custom)
S3method(store_read_path,tar_store_file)
S3method(store_read_path,tar_store_format_custom)
S3method(store_read_path,tar_torch)
S3method(store_read_path,tar_url)
S3method(store_row_path,default)
S3method(store_row_path,tar_external)
S3method(store_set_timestamp_trust,default)
S3method(store_set_timestamp_trust,tar_cloud)
S3method(store_set_timestamp_trust,tar_external)
S3method(store_set_timestamp_trust,tar_store_file_fast)
S3method(store_sync_file_meta,default)
Expand All @@ -190,7 +202,7 @@ S3method(store_unload,tar_aws_file)
S3method(store_unload,tar_gcp_file)
S3method(store_unmarshal_object,default)
S3method(store_unmarshal_object,tar_keras)
S3method(store_unmarshal_object,tar_store_custom)
S3method(store_unmarshal_object,tar_store_format_custom)
S3method(store_unmarshal_object,tar_torch)
S3method(store_unmarshal_value,default)
S3method(store_unmarshal_value,tar_nonexportable)
Expand All @@ -202,8 +214,8 @@ S3method(store_upload_object,tar_aws)
S3method(store_upload_object,tar_aws_file)
S3method(store_upload_object,tar_gcp)
S3method(store_upload_object,tar_gcp_file)
S3method(store_validate,default)
S3method(store_validate,tar_store_custom)
S3method(store_upload_object,tar_repository_cas)
S3method(store_upload_object,tar_repository_cas_file)
S3method(store_write_object,default)
S3method(store_write_object,tar_cloud)
S3method(store_write_object,tar_store_file)
Expand All @@ -215,8 +227,8 @@ S3method(store_write_path,tar_null)
S3method(store_write_path,tar_parquet)
S3method(store_write_path,tar_qs)
S3method(store_write_path,tar_rds)
S3method(store_write_path,tar_store_custom)
S3method(store_write_path,tar_store_file)
S3method(store_write_path,tar_store_format_custom)
S3method(store_write_path,tar_torch)
S3method(store_write_path,tar_url)
S3method(tar_make_interactive_load_target,tar_bud)
Expand Down Expand Up @@ -384,6 +396,9 @@ export(tar_callr_args_default)
export(tar_callr_inner_try)
export(tar_cancel)
export(tar_canceled)
export(tar_cas_d)
export(tar_cas_e)
export(tar_cas_u)
export(tar_completed)
export(tar_condition_traced)
export(tar_config_get)
Expand Down Expand Up @@ -474,6 +489,9 @@ export(tar_random_port)
export(tar_read)
export(tar_read_raw)
export(tar_renv)
export(tar_repository_cas)
export(tar_repository_cas_local)
export(tar_repository_cas_local_gc)
export(tar_reprex)
export(tar_resources)
export(tar_resources_aws)
Expand All @@ -487,6 +505,7 @@ export(tar_resources_gcp)
export(tar_resources_network)
export(tar_resources_parquet)
export(tar_resources_qs)
export(tar_resources_repository_cas)
export(tar_resources_url)
export(tar_runtime_object)
export(tar_script)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Avoid saving a file in `_targets/objects` for `error = "null"`. Instead, switch to a special `"null"` storage format class if `error` is `"null"` the target throws an error. This should allow users to more freely create new formats with `tar_format()` without worrying about how to handle `NULL` objects created by `error = "null"`.
* Implement `format = "auto"` (#1311, @hadley).
* Replace `pingr` dependency with `base::socketConnection()` for local URL utilities (#1317, #1318, @Adafede).
* Implement `tar_repository_cas()`, `tar_repository_cas_local()`, and `tar_repository_cas_local_gc()` for content-addressable storage (#1232, #1314, @noamross).

# targets 1.7.1

Expand Down
10 changes: 5 additions & 5 deletions R/class_builder.R
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ target_run.tar_builder <- function(target, envir, path_store) {
target_gc(target)
builder_ensure_deps(target, target$subpipeline, "worker")
frames <- frames_produce(envir, target, target$subpipeline)
builder_set_tar_runtime(target, frames, path_store)
builder_set_tar_runtime(target, frames)
store_update_stage_early(target$store, target$settings$name, path_store)
builder_update_build(target, frames_get_envir(frames))
builder_ensure_paths(target, path_store)
Expand Down Expand Up @@ -167,9 +167,7 @@ target_skip.tar_builder <- function(
active
) {
target_update_queue(target, scheduler)
path <- meta$get_record(target_get_name(target))$path
file <- target$store$file
file$path <- path
file_repopulate(target$store$file, meta$get_record(target_get_name(target)))
if (active) {
builder_ensure_workspace(
target = target,
Expand All @@ -188,6 +186,8 @@ target_skip.tar_builder <- function(

#' @export
target_conclude.tar_builder <- function(target, pipeline, scheduler, meta) {
on.exit(builder_unset_tar_runtime())
builder_set_tar_runtime(target, NULL)
target_update_queue(target, scheduler)
builder_handle_warnings(target, scheduler)
builder_ensure_workspace(
Expand Down Expand Up @@ -498,7 +498,7 @@ builder_wait_correct_hash <- function(target) {
store_ensure_correct_hash(target$store, storage, deployment)
}

builder_set_tar_runtime <- function(target, frames, path_store) {
builder_set_tar_runtime <- function(target, frames) {
tar_runtime$target <- target
tar_runtime$frames <- frames
}
Expand Down
2 changes: 1 addition & 1 deletion R/class_cloud.R
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#' @export
store_set_timestamp_trust.default <- function(store) {
store_set_timestamp_trust.tar_cloud <- function(store) {
store$file$trust_timestamps <- FALSE
}

Expand Down
9 changes: 8 additions & 1 deletion R/class_counter.R
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@ counter_exists_name <- function(counter, name) {
}

counter_exist_names <- function(counter, names) {
unlist(lapply(names, exists, envir = counter$envir, inherits = FALSE))
as.logical(
lapply(
names,
exists,
envir = counter$envir,
inherits = FALSE
)
)
}

counter_filter_exists <- function(counter, names) {
Expand Down
5 changes: 5 additions & 0 deletions R/class_file.R
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ file_should_rehash <- function(file, time, size) {
)
}

file_repopulate <- function(file, record) {
file$path <- record$path
file$hash <- record$data
}

file_ensure_hash <- function(file) {
files <- file_list_files(file$path)
info <- file_info(files)
Expand Down
10 changes: 8 additions & 2 deletions R/class_options.R
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ options_class <- R6::R6Class(
self$repository %|||% "local"
},
get_repository_meta = function() {
(self$repository_meta %|||% self$repository) %|||% "local"
default <- self$repository_meta %|||%
if_any(is_repository_cas(self$repository), "local", self$repository)
default %|||% "local"
},
get_iteration = function() {
self$iteration %|||% "vector"
Expand Down Expand Up @@ -501,7 +503,11 @@ options_class <- R6::R6Class(
tar_assert_repository(repository)
},
validate_repository_meta = function(repository_meta) {
tar_assert_repository(repository_meta)
tar_assert_in(
repository_meta,
choices = c("local", "aws", "gcp"),
msg = "repository_meta must be one of \"local\", \"aws\", or \"gcp\"."
)
},
validate_iteration = function(iteration) {
tar_assert_flag(iteration, c("vector", "list", "group"))
Expand Down
2 changes: 1 addition & 1 deletion R/class_record.R
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ record_bootstrap_store <- function(record) {
repository = record$repository,
resources = tar_options$get_resources()
)
store$file$path <- record$path
file_repopulate(store$file, record)
store
}

Expand Down
31 changes: 31 additions & 0 deletions R/class_resources_repository_cas.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
resources_repository_cas_init <- function(
envvars = NULL
) {
resources_repository_cas_new(
envvars = envvars
)
}

resources_repository_cas_new <- function(
envvars = NULL
) {
force(envvars)
enclass(environment(), c("tar_resources_repository_cas", "tar_resources"))
}

#' @export
resources_validate.tar_resources_repository_cas <- function(resources) {
if (!is.null(resources$envvars)) {
tar_assert_chr(resources$envvars)
tar_assert_none_na(resources$envvars)
tar_assert_named(resources$controller)
}
}

#' @export
print.tar_resources_repository_cas <- function(x, ...) {
cat(
"<tar_resources_repository_cas>\n ",
paste0(paste_list(as.list(x)), collapse = "\n ")
)
}
Loading

0 comments on commit d8393be

Please sign in to comment.