From e4f0754e03c84e66b3d93db11c902d99db74fa31 Mon Sep 17 00:00:00 2001 From: Dewey Dunnington Date: Fri, 23 Feb 2024 16:07:22 -0400 Subject: [PATCH] feat(r): Add bindings for IPC reader (#390) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds bindings to nanoarrow's IPC reader from R. The entrypoint for a user is `read_nanoarrow()`, which accepts raw vectors, connections, and file paths (thin wrapper around connections). It also fixes a number of compiler warnings in the IPC extension. The implementation is not particularly complicated from the R side, but the main drawback of adding IPC support is that the flatbuffers implementation (flatcc) actively does not care about gcc compiler warnings (whereas CRAN actively cares about them). These are all slower than the arrow package, which has more tools at its disposal to prevent copies. ``` r library(arrow, warn.conflicts = FALSE) library(nanoarrow) # Basic read example tf <- tempfile() write_ipc_stream(dplyr::starwars, tf) read_nanoarrow(tf) |> tibble::as_tibble() #> # A tibble: 87 × 14 #> name height mass hair_color skin_color eye_color birth_year sex gender #> #> 1 Luke Sk… 172 77 blond fair blue 19 male mascu… #> 2 C-3PO 167 75 gold yellow 112 none mascu… #> 3 R2-D2 96 32 white, bl… red 33 none mascu… #> 4 Darth V… 202 136 none white yellow 41.9 male mascu… #> 5 Leia Or… 150 49 brown light brown 19 fema… femin… #> 6 Owen La… 178 120 brown, gr… light blue 52 male mascu… #> 7 Beru Wh… 165 75 brown light blue 47 fema… femin… #> 8 R5-D4 97 32 white, red red NA none mascu… #> 9 Biggs D… 183 84 black light brown 24 male mascu… #> 10 Obi-Wan… 182 77 auburn, w… fair blue-gray 57 male mascu… #> # ℹ 77 more rows #> # ℹ 5 more variables: homeworld , species , films >, #> # vehicles >, starships > df_bigish <- nanoarrow:::vec_gen(data.frame(x = character()), n = 1e6) write_ipc_stream(df_bigish, tf) # Wrapper because mmap is apparently not passed through from read_ipc_stream() # and this is pretty significant read_ipc_stream_wrap <- function(f, ..., mmap) { arrow::read_ipc_stream( arrow:::make_readable_file(f, mmap = mmap, random_access = FALSE), ... ) } tf_raw <- brio::read_file_raw(tf) # Slower than arrow for raw vector input because of C implementation, # which doesn't currently share the global buffer (just shares buffers # between columns within a single batch) bench::mark( nanoarrow = read_nanoarrow(tf_raw) |> collect_array_stream(), arrow = read_ipc_stream(buffer(tf_raw), as_data_frame = FALSE), check = FALSE ) #> # A tibble: 2 × 6 #> expression min median `itr/sec` mem_alloc `gc/sec` #> #> 1 nanoarrow 1.27ms 1.84ms 439. 41.5KB 2.18 #> 2 arrow 509.26µs 528.65µs 1821. 3.6MB 79.3 # Slower than arrow, maybe because of C implementation, but definitely # because it uses base::readBin() which necessiates an extra copy bench::mark( nanoarrow = read_nanoarrow(tf) |> collect_array_stream(), arrow_mmap = read_ipc_stream_wrap(tf, mmap = TRUE, as_data_frame = FALSE), arrow = read_ipc_stream_wrap(tf, mmap = FALSE, as_data_frame = FALSE), check = FALSE ) #> # A tibble: 3 × 6 #> expression min median `itr/sec` mem_alloc `gc/sec` #> #> 1 nanoarrow 5.18ms 5.66ms 174. 16.2MB 189. #> 2 arrow_mmap 613.48µs 640.5µs 1526. 528.6KB 13.9 #> 3 arrow 2.18ms 2.84ms 339. 551.6KB 4.06 ``` Created on 2024-02-19 with [reprex v2.0.2](https://reprex.tidyverse.org) --- .github/workflows/r-check.yaml | 6 +- extensions/nanoarrow_ipc/CMakeLists.txt | 29 ++ .../src/nanoarrow/nanoarrow_ipc_decoder.c | 44 +-- .../src/nanoarrow/nanoarrow_ipc_reader.c | 7 + r/.covrignore | 3 + r/NAMESPACE | 5 + r/R/ipc.R | 207 ++++++++++++++ r/bootstrap.R | 45 ++- r/configure | 16 -- r/man/read_nanoarrow.Rd | 47 ++++ r/src/.gitignore | 3 + r/src/Makevars | 2 +- r/src/init.c | 8 +- r/src/ipc.c | 161 +++++++++++ r/src/util.c | 8 +- r/src/util.h | 2 + r/tests/testthat/test-ipc.R | 258 ++++++++++++++++++ 17 files changed, 797 insertions(+), 54 deletions(-) create mode 100644 r/R/ipc.R create mode 100644 r/man/read_nanoarrow.Rd create mode 100644 r/src/ipc.c create mode 100644 r/tests/testthat/test-ipc.R diff --git a/.github/workflows/r-check.yaml b/.github/workflows/r-check.yaml index d13cd580c..c3a559b5e 100644 --- a/.github/workflows/r-check.yaml +++ b/.github/workflows/r-check.yaml @@ -51,7 +51,7 @@ jobs: R_KEEP_PKG_SOURCE: yes steps: - - uses: actions/checkout@v2 + - uses: actions/checkout@v4 - uses: r-lib/actions/setup-pandoc@v2 - uses: r-lib/actions/setup-r@v2 @@ -66,14 +66,14 @@ jobs: if: matrix.config.os != 'windows-latest' env: PKG_CPPFLAGS: "-DNANOARROW_DEBUG" - PKG_CFLAGS: "-Werror -Wall -Wextra -Wpedantic -Wconversion -Wno-unused-parameter -Wno-sign-conversion -Wno-cast-function-type" + PKG_CFLAGS: "-Werror -Wall -Wextra -Wpedantic -Wconversion -Wno-unused-parameter -Wno-sign-conversion -Wno-cast-function-type -Wno-misleading-indentation -Wno-conversion -Wno-unused-const-variable" run: | R CMD INSTALL r --preclean shell: bash - uses: r-lib/actions/setup-r-dependencies@v2 with: - extra-packages: any::rcmdcheck, arrow=?ignore-before-r=4.0.0, github::r-lib/pkgbuild@v1.4.0 + extra-packages: any::rcmdcheck, arrow=?ignore-before-r=4.0.0 needs: check working-directory: r diff --git a/extensions/nanoarrow_ipc/CMakeLists.txt b/extensions/nanoarrow_ipc/CMakeLists.txt index 7a724b19f..6d4bf2086 100644 --- a/extensions/nanoarrow_ipc/CMakeLists.txt +++ b/extensions/nanoarrow_ipc/CMakeLists.txt @@ -170,6 +170,35 @@ else() endif() +# Don't add extra warning flags when bundling, since we treat flatcc +# as a part of the nanoarrow_ipc target and we have no control over the +# warnings it produces. +if(CMAKE_BUILD_TYPE STREQUAL "Debug" AND NOT NANOARROW_IPC_BUNDLE) + if(CMAKE_C_COMPILER_ID STREQUAL "GNU") + target_compile_options(nanoarrow_ipc + PRIVATE -Wall + -Werror + -Wextra + -Wpedantic + -Wno-type-limits + -Wmaybe-uninitialized + -Wunused-result + -Wconversion + -Wno-sign-conversion + -Wno-misleading-indentation) + elseif(CMAKE_C_COMPILER_ID STREQUAL "AppleClang" OR CMAKE_C_COMPILER_ID STREQUAL + "Clang") + target_compile_options(nanoarrow_ipc + PRIVATE -Wall + -Werror + -Wextra + -Wpedantic + -Wdocumentation + -Wconversion + -Wno-sign-conversion) + endif() +endif() + if(NANOARROW_IPC_BUILD_TESTS) set(MEMORYCHECK_COMMAND_OPTIONS "--leak-check=full --suppressions=${CMAKE_CURRENT_LIST_DIR}/../../valgrind.supp --error-exitcode=1" diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c index 8a5a111b6..2e19ebd28 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c @@ -46,9 +46,14 @@ #include "nanoarrow_ipc.h" #include "nanoarrow_ipc_flatcc_generated.h" +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + // A more readable expression way to refer to the fact that there are 8 bytes // at the beginning of every message header. -const static int64_t kMessageHeaderPrefixSize = 8; +static const int32_t kMessageHeaderPrefixSize = 8; // Internal representation of a parsed "Field" from flatbuffers. This // represents a field in a depth-first walk of column arrays and their @@ -155,6 +160,10 @@ int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; } static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr, int64_t size) { + NANOARROW_UNUSED(allocator); + NANOARROW_UNUSED(ptr); + NANOARROW_UNUSED(size); + struct ArrowIpcSharedBufferPrivate* private_data = (struct ArrowIpcSharedBufferPrivate*)allocator->private_data; @@ -455,8 +464,10 @@ static int ArrowIpcDecoderSetTypeFixedSizeBinary(struct ArrowSchema* schema, struct ArrowError* error) { ns(FixedSizeBinary_table_t) type = (ns(FixedSizeBinary_table_t))type_generic; int fixed_size = ns(FixedSizeBinary_byteWidth(type)); - return ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, - fixed_size); + NANOARROW_RETURN_NOT_OK_WITH_ERROR( + ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_size), + error); + return NANOARROW_OK; } static int ArrowIpcDecoderSetTypeDate(struct ArrowSchema* schema, @@ -488,7 +499,7 @@ static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema, case ns(TimeUnit_MILLISECOND): if (bitwidth != 32) { ArrowErrorSet(error, "Expected bitwidth of 32 for Time TimeUnit %s but found %d", - ns(TimeUnit_name(time_unit)), bitwidth); + ns(TimeUnit_name(ns(Time_unit(type)))), bitwidth); return EINVAL; } @@ -499,7 +510,7 @@ static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema, case ns(TimeUnit_NANOSECOND): if (bitwidth != 64) { ArrowErrorSet(error, "Expected bitwidth of 64 for Time TimeUnit %s but found %d", - ns(TimeUnit_name(time_unit)), bitwidth); + ns(TimeUnit_name(ns(Time_unit(type)))), bitwidth); return EINVAL; } @@ -644,7 +655,6 @@ static int ArrowIpcDecoderSetTypeUnion(struct ArrowSchema* schema, int format_out_size = sizeof(union_types_str); int n_chars = 0; - const char* format_prefix; switch (union_mode) { case ns(UnionMode_Sparse): n_chars = snprintf(format_cursor, format_out_size, "+us:"); @@ -826,9 +836,6 @@ static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, ns(Field_vec_t static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder, flatbuffers_generic_t message_header, struct ArrowError* error) { - struct ArrowIpcDecoderPrivate* private_data = - (struct ArrowIpcDecoderPrivate*)decoder->private_data; - ns(Schema_table_t) schema = (ns(Schema_table_t))message_header; int endianness = ns(Schema_endianness(schema)); switch (endianness) { @@ -977,9 +984,6 @@ static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decode ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder, struct ArrowBufferView data, struct ArrowError* error) { - struct ArrowIpcDecoderPrivate* private_data = - (struct ArrowIpcDecoderPrivate*)decoder->private_data; - ArrowIpcDecoderResetHeaderInfo(decoder); NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix( decoder, &data, &decoder->header_size_bytes, error)); @@ -1051,7 +1055,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, } // Read some basic information from the message - int32_t metadata_version = ns(Message_version(message)); + decoder->metadata_version = ns(Message_version(message)); decoder->message_type = ns(Message_header_type(message)); decoder->body_size_bytes = ns(Message_bodyLength(message)); @@ -1063,7 +1067,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, case ns(MetadataVersion_V3): case ns(MetadataVersion_V4): ArrowErrorSet(error, "Expected metadata version V5 but found %s", - ns(MetadataVersion_name(decoder->metadata_version))); + ns(MetadataVersion_name(ns(Message_version(message))))); break; default: ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)", @@ -1085,7 +1089,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder, case ns(MessageHeader_Tensor): case ns(MessageHeader_SparseTensor): ArrowErrorSet(error, "Unsupported message type: '%s'", - ns(MessageHeader_type_name(decoder->message_type))); + ns(MessageHeader_type_name(ns(Message_header_type(message))))); return ENOTSUP; default: ArrowErrorSet(error, "Unknown message type: %d", (int)(decoder->message_type)); @@ -1245,7 +1249,7 @@ struct ArrowIpcBufferSource { int64_t buffer_length_bytes; enum ArrowIpcCompressionType codec; enum ArrowType data_type; - int32_t element_size_bits; + int64_t element_size_bits; int swap_endian; }; @@ -1284,6 +1288,10 @@ static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory* f struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error) { + NANOARROW_UNUSED(factory); + NANOARROW_UNUSED(dst); + NANOARROW_UNUSED(error); + struct ArrowBufferView* body = (struct ArrowBufferView*)factory->private_data; dst_view->data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes; dst_view->size_bytes = src->buffer_length_bytes; @@ -1303,6 +1311,8 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory* struct ArrowBufferView* dst_view, struct ArrowBuffer* dst, struct ArrowError* error) { + NANOARROW_UNUSED(error); + struct ArrowIpcSharedBuffer* shared = (struct ArrowIpcSharedBuffer*)factory->private_data; ArrowBufferReset(dst); @@ -1364,7 +1374,7 @@ static int ArrowIpcDecoderSwapEndian(struct ArrowIpcBufferSource* src, const uint64_t* ptr_src = out_view->data.as_uint64; uint64_t* ptr_dst = (uint64_t*)dst->data; uint64_t words[4]; - int n_words = src->element_size_bits / 64; + int n_words = (int)(src->element_size_bits / 64); for (int64_t i = 0; i < (dst->size_bytes / n_words / 8); i++) { for (int j = 0; j < n_words; j++) { diff --git a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c index 4c008d8b6..dcce8ea2d 100644 --- a/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c +++ b/extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c @@ -22,6 +22,11 @@ #include "nanoarrow.h" #include "nanoarrow_ipc.h" +// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA +#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA) +#define ENODATA 120 +#endif + void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src, struct ArrowIpcInputStream* dst) { memcpy(dst, src, sizeof(struct ArrowIpcInputStream)); @@ -37,6 +42,8 @@ static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct ArrowIpcInputStream* uint8_t* buf, int64_t buf_size_bytes, int64_t* size_read_out, struct ArrowError* error) { + NANOARROW_UNUSED(error); + if (buf_size_bytes == 0) { *size_read_out = 0; return NANOARROW_OK; diff --git a/r/.covrignore b/r/.covrignore index 47a52d0e3..19df21ed3 100644 --- a/r/.covrignore +++ b/r/.covrignore @@ -17,3 +17,6 @@ src/nanoarrow.c src/nanoarrow.h +src/nanoarrow_ipc.h +src/nanoarrow_ipc.c +src/flatcc* diff --git a/r/NAMESPACE b/r/NAMESPACE index 98173ea58..d868f0b2c 100644 --- a/r/NAMESPACE +++ b/r/NAMESPACE @@ -108,6 +108,9 @@ S3method(print,nanoarrow_array) S3method(print,nanoarrow_array_stream) S3method(print,nanoarrow_buffer) S3method(print,nanoarrow_schema) +S3method(read_nanoarrow,character) +S3method(read_nanoarrow,connection) +S3method(read_nanoarrow,raw) S3method(str,nanoarrow_array) S3method(str,nanoarrow_array_stream) S3method(str,nanoarrow_buffer) @@ -124,6 +127,7 @@ export(convert_array) export(convert_array_extension) export(convert_array_stream) export(convert_buffer) +export(example_ipc_stream) export(infer_nanoarrow_ptype) export(infer_nanoarrow_ptype_extension) export(infer_nanoarrow_schema) @@ -188,6 +192,7 @@ export(nanoarrow_pointer_set_protected) export(nanoarrow_schema_modify) export(nanoarrow_schema_parse) export(nanoarrow_version) +export(read_nanoarrow) export(register_nanoarrow_extension) export(resolve_nanoarrow_extension) export(unregister_nanoarrow_extension) diff --git a/r/R/ipc.R b/r/R/ipc.R new file mode 100644 index 000000000..29471b0f5 --- /dev/null +++ b/r/R/ipc.R @@ -0,0 +1,207 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +#' Read serialized streams of Arrow data +#' +#' Reads connections, file paths, URLs, or raw vectors of serialized Arrow +#' data. Arrow documentation typically refers to this format as "Arrow IPC", +#' since its origin was as a means to transmit tables between processes +#' (e.g., multiple R sessions). This format can also be written to and read +#' from files or URLs and is essentially a high performance equivalent of +#' a CSV file that does a better job maintaining types. +#' +#' The nanoarrow package does not currently have the ability to write serialized +#' IPC data: use [arrow::write_ipc_stream()] to write data from R, or use +#' the equivalent writer from another Arrow implementation in Python, C++, +#' Rust, JavaScript, Julia, C#, and beyond. +#' +#' The media type of an Arrow stream is `application/vnd.apache.arrow.stream` +#' and the recommended file extension is `.arrows`. +#' +#' @param x A `raw()` vector, connection, or file path from which to read +#' binary data. Common extensions indicating compression (.gz, .bz2, .zip) +#' are automatically uncompressed. +#' @param lazy By default, `read_nanoarrow()` will read and discard a copy of +#' the reader's schema to ensure that invalid streams are discovered as +#' soon as possible. Use `lazy = TRUE` to defer this check until the reader +#' is actually consumed. +#' @param ... Currently unused. +#' +#' @return A [nanoarrow_array_stream][as_nanoarrow_array_stream] +#' @export +#' +#' @examples +#' as.data.frame(read_nanoarrow(example_ipc_stream())) +#' +read_nanoarrow <- function(x, ..., lazy = FALSE) { + UseMethod("read_nanoarrow") +} + +#' @export +read_nanoarrow.raw <- function(x, ..., lazy = FALSE) { + buffer <- as_nanoarrow_buffer(x) + reader <- .Call(nanoarrow_c_ipc_array_reader_buffer, buffer) + check_stream_if_requested(reader, lazy) +} + +#' @export +read_nanoarrow.character <- function(x, ..., lazy = FALSE) { + if (length(x) != 1) { + stop(sprintf("Can't interpret character(%d) as file path", length(x))) + } + + con_type <- guess_connection_type(x) + if (con_type == "unz") { + con <- do.call(con_type, list(x, filename = guess_zip_filename(x))) + } else { + con <- do.call(con_type, list(x)) + } + + # Helps with error reporting when reading invalid files + reader <- read_nanoarrow(con, lazy = TRUE) + check_stream_if_requested(reader, lazy) +} + +#' @export +read_nanoarrow.connection <- function(x, ..., lazy = FALSE) { + if (!isOpen(x)) { + # Unopened connections should be opened in binary mode + open(x, "rb") + + stream <- tryCatch( + .Call(nanoarrow_c_ipc_array_reader_connection, x), + error = function(e) { + close(x) + stop(e) + } + ) + + # Close the connection when the array stream is released + stream_finalizer <- function() { + close(x) + } + + finalizer_env <- new.env(parent = baseenv()) + finalizer_env$x <- x + environment(stream_finalizer) <- finalizer_env + + reader <- array_stream_set_finalizer(stream, stream_finalizer) + } else { + reader <- .Call(nanoarrow_c_ipc_array_reader_connection, x) + } + + check_stream_if_requested(reader, lazy) +} + +#' @rdname read_nanoarrow +#' @export +example_ipc_stream <- function() { + # data.frame(some_col = c(1L, 2L, 3L)) as a serialized schema/batch + schema <- as.raw(c( + 0xff, 0xff, 0xff, 0xff, 0x10, 0x01, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0a, 0x00, 0x0e, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, + 0x00, 0x01, 0x04, 0x00, 0x10, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x0c, 0x00, + 0x00, 0x00, 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, + 0x04, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x84, 0xff, + 0xff, 0xff, 0x18, 0x00, 0x00, 0x00, 0x04, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x00, 0x00, + 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x00, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x00, 0x00, 0x00, 0x00, + 0x01, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x12, 0x00, 0x18, 0x00, + 0x08, 0x00, 0x06, 0x00, 0x07, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x10, 0x00, 0x14, 0x00, + 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x02, 0x14, 0x00, 0x00, 0x00, 0x70, 0x00, + 0x00, 0x00, 0x08, 0x00, 0x00, 0x00, 0x18, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x63, 0x6f, 0x6c, 0x00, 0x00, + 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x0c, 0x00, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x04, 0x00, 0x08, 0x00, 0x08, 0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x00, + 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x76, 0x61, 0x6c, + 0x75, 0x65, 0x5f, 0x66, 0x69, 0x65, 0x6c, 0x64, 0x00, 0x00, 0x00, 0x00, 0x0e, 0x00, + 0x00, 0x00, 0x73, 0x6f, 0x6d, 0x65, 0x5f, 0x6b, 0x65, 0x79, 0x5f, 0x66, 0x69, 0x65, + 0x6c, 0x64, 0x00, 0x00, 0x08, 0x00, 0x0c, 0x00, 0x08, 0x00, 0x07, 0x00, 0x08, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x20, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 + )) + + batch <- as.raw(c( + 0xff, 0xff, 0xff, 0xff, 0x88, 0x00, 0x00, 0x00, 0x14, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x0c, 0x00, 0x16, 0x00, 0x06, 0x00, 0x05, 0x00, 0x08, 0x00, 0x0c, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x03, 0x04, 0x00, 0x18, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x0a, 0x00, 0x18, 0x00, 0x0c, 0x00, + 0x04, 0x00, 0x08, 0x00, 0x0a, 0x00, 0x00, 0x00, 0x3c, 0x00, 0x00, 0x00, 0x10, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x0c, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, + 0x00, 0x00, 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x02, 0x00, 0x00, 0x00, 0x03, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00 + )) + + c(schema, batch) +} + +check_stream_if_requested <- function(reader, lazy) { + if (!lazy) { + # Report error as coming from read_nanoarrow() always + cnd_call <- sys.call(-1) + tryCatch( + reader$get_schema(), + error = function(e) { + reader$release() + e$call <- cnd_call + stop(e) + } + ) + } + + reader +} + +guess_connection_type <- function(x) { + is_url <- grepl("://", x) + + compressed_con <- switch( + tools::file_ext(x), + "gz" = "gzfile", + "bz2" = "bzfile", + "zip" = "unz" + ) + + if (is_url && !is.null(compressed_con)) { + stop("Reading compressed streams from URLs is not supported") + } + + if (is_url) { + "url" + } else if (is.null(compressed_con)) { + "file" + } else { + compressed_con + } +} + +guess_zip_filename <- function(x) { + files <- utils::unzip(x, list = TRUE)[[1]] + if (length(files) != 1) { + stop( + sprintf( + "Unzip only supported of archives with exactly one file (found %d)", + length(files) + ) + ) + } + + files +} diff --git a/r/bootstrap.R b/r/bootstrap.R index d13a74339..db66b3de1 100644 --- a/r/bootstrap.R +++ b/r/bootstrap.R @@ -22,8 +22,11 @@ on.exit(unlink(temp_dir, recursive = TRUE)) dir.create(temp_dir) source_dir <- normalizePath("..", winslash = "/") +ipc_source_dir <- file.path(source_dir, "extensions", "nanoarrow_ipc") build_dir <- file.path(temp_dir, "build") +ipc_build_dir <- file.path(temp_dir, "build_ipc") dist_dir <- file.path(temp_dir, "dist") +dir.create(ipc_build_dir) dir.create(build_dir) dir.create(dist_dir) @@ -48,9 +51,15 @@ file.exists("../CMakeLists.txt") && sprintf("%s -DNANOARROW_BUNDLE=ON -DNANOARROW_NAMESPACE=RPkg", source_dir), wd = build_dir ) && - run_cmake(sprintf("--build %s", shQuote(build_dir))) && run_cmake( sprintf("--install %s --prefix=%s", shQuote(build_dir), shQuote(dist_dir)) + ) && + run_cmake( + sprintf("%s -DNANOARROW_IPC_BUNDLE=ON", ipc_source_dir), + wd = ipc_build_dir + ) && + run_cmake( + sprintf("--install %s --prefix=%s", shQuote(ipc_build_dir), shQuote(dist_dir)) ) # If any of the above failed, we can also copy from ../dist. This is likely for @@ -59,26 +68,32 @@ if (!file.exists(file.path(dist_dir, "nanoarrow.h"))) { dist_dir <- "../dist" } -files_to_vendor <- file.path(dist_dir, c("nanoarrow.c", "nanoarrow.h")) +files_to_vendor <- file.path( + dist_dir, + c("nanoarrow.c", "nanoarrow.h", + "nanoarrow_ipc.c", "nanoarrow_ipc.h", + "flatcc.c", "flatcc") +) if (all(file.exists(files_to_vendor))) { files_dst <- file.path("src", basename(files_to_vendor)) - n_removed <- suppressWarnings(sum(file.remove(files_dst))) - if (n_removed > 0) { - cat(sprintf("Removed %d previously vendored files from src/\n", n_removed)) + # Clean previous files/dirs + for (f in files_dst) { + unlink(f, recursive = TRUE) } - cat( - sprintf( - "Vendoring files from arrow-nanoarrow to src/:\n%s\n", - paste("-", files_to_vendor, collapse = "\n") - ) - ) + cat("Vendoring files from arrow-nanoarrow to src/:\n") - if (all(file.copy(files_to_vendor, "src"))) { - cat("All files successfully copied to src/\n") - } else { - stop("Failed to vendor all files") + for (f in files_to_vendor) { + cat(sprintf("- %s\n", basename(f))) + if (!file.copy(f, "src", recursive = TRUE)) { + stop(sprintf("Failed to copy '%s' to src/", basename(f))) + } } + + # Post-process headers for CMD check + f <- "src/flatcc/portable/pdiagnostic.h" + lines <- readLines(f) + writeLines(gsub("^#pragma", "/**/#pragma", lines), f) } diff --git a/r/configure b/r/configure index 3998a3a3a..07777590a 100755 --- a/r/configure +++ b/r/configure @@ -26,22 +26,6 @@ if [ -f "src/nanoarrow.h" ] && [ -f "src/nanoarrow.c" ]; then exit 0 fi -# We have a situation where the package has been built via R CMD build -# but there is no vendored nanoarrow. This occurs with -# remotes::install_github() with the default arguments. In this case, pull -# the latest bundled version from GitHub. To ensure commit-level consistency, -# use remotes::install_github(build = FALSE) (which will run cmake to get -# a fresh bundle with a specific commit). -curl -L https://github.com/apache/arrow-nanoarrow/raw/main/dist/nanoarrow.h \ - --output src/nanoarrow.h --silent -curl -L https://github.com/apache/arrow-nanoarrow/raw/main/dist/nanoarrow.c \ - --output src/nanoarrow.c --silent - -if [ -f "src/nanoarrow.h" ] && [ -f "src/nanoarrow.c" ]; then - echo "Fetched bundled nanoarrow from https://github.com/apache/arrow-nanoarrow/tree/main/dist" - exit 0 -fi - echo "Vendored src/nanoarrow.h and/or src/nanoarrow.c are missing" echo "This source tarball was built incorrectly." exit 1 diff --git a/r/man/read_nanoarrow.Rd b/r/man/read_nanoarrow.Rd new file mode 100644 index 000000000..87d300c8b --- /dev/null +++ b/r/man/read_nanoarrow.Rd @@ -0,0 +1,47 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/ipc.R +\name{read_nanoarrow} +\alias{read_nanoarrow} +\alias{example_ipc_stream} +\title{Read serialized streams of Arrow data} +\usage{ +read_nanoarrow(x, ..., lazy = FALSE) + +example_ipc_stream() +} +\arguments{ +\item{x}{A \code{raw()} vector, connection, or file path from which to read +binary data. Common extensions indicating compression (.gz, .bz2, .zip) +are automatically uncompressed.} + +\item{...}{Currently unused.} + +\item{lazy}{By default, \code{read_nanoarrow()} will read and discard a copy of +the reader's schema to ensure that invalid streams are discovered as +soon as possible. Use \code{lazy = TRUE} to defer this check until the reader +is actually consumed.} +} +\value{ +A \link[=as_nanoarrow_array_stream]{nanoarrow_array_stream} +} +\description{ +Reads connections, file paths, URLs, or raw vectors of serialized Arrow +data. Arrow documentation typically refers to this format as "Arrow IPC", +since its origin was as a means to transmit tables between processes +(e.g., multiple R sessions). This format can also be written to and read +from files or URLs and is essentially a high performance equivalent of +a CSV file that does a better job maintaining types. +} +\details{ +The nanoarrow package does not currently have the ability to write serialized +IPC data: use \code{\link[arrow:write_ipc_stream]{arrow::write_ipc_stream()}} to write data from R, or use +the equivalent writer from another Arrow implementation in Python, C++, +Rust, JavaScript, Julia, C#, and beyond. + +The media type of an Arrow stream is \code{application/vnd.apache.arrow.stream} +and the recommended file extension is \code{.arrows}. +} +\examples{ +as.data.frame(read_nanoarrow(example_ipc_stream())) + +} diff --git a/r/src/.gitignore b/r/src/.gitignore index edacbf0a9..9d89d07a0 100644 --- a/r/src/.gitignore +++ b/r/src/.gitignore @@ -20,3 +20,6 @@ *.dll nanoarrow.c nanoarrow.h +nanoarrow_ipc.h +nanoarrow_ipc.c +flatcc* diff --git a/r/src/Makevars b/r/src/Makevars index de5c307f2..263ba3f6d 100644 --- a/r/src/Makevars +++ b/r/src/Makevars @@ -15,4 +15,4 @@ # specific language governing permissions and limitations # under the License. -PKG_CPPFLAGS=-I../inst/include +PKG_CPPFLAGS=-I../inst/include -I../src diff --git a/r/src/init.c b/r/src/init.c index 95a44543e..913ea77af 100644 --- a/r/src/init.c +++ b/r/src/init.c @@ -46,7 +46,7 @@ extern SEXP nanoarrow_c_array_set_schema(SEXP array_xptr, SEXP schema_xptr, extern SEXP nanoarrow_c_infer_schema_array(SEXP array_xptr); extern SEXP nanoarrow_c_array_proxy(SEXP array_xptr, SEXP array_view_xptr, SEXP recursive_sexp); -extern SEXP nanoarrow_c_as_array_default(SEXP x_sexp, SEXP schema_sexp); +extern SEXP nanoarrow_c_as_array_default(SEXP x_sexp, SEXP schema_xptr); extern SEXP nanoarrow_c_as_buffer_default(SEXP x_sexp); extern SEXP nanoarrow_c_buffer_append(SEXP buffer_xptr, SEXP new_buffer_xptr); extern SEXP nanoarrow_c_buffer_info(SEXP buffer_xptr); @@ -56,6 +56,8 @@ extern SEXP nanoarrow_c_convert_array_stream(SEXP array_stream_xptr, SEXP ptype_ SEXP size_sexp, SEXP n_sexp); extern SEXP nanoarrow_c_infer_ptype(SEXP schema_xptr); extern SEXP nanoarrow_c_convert_array(SEXP array_xptr, SEXP ptype_sexp); +extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr); +extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con); extern SEXP nanoarrow_c_allocate_schema(void); extern SEXP nanoarrow_c_allocate_array(void); extern SEXP nanoarrow_c_allocate_array_stream(void); @@ -127,6 +129,10 @@ static const R_CallMethodDef CallEntries[] = { {"nanoarrow_c_convert_array_stream", (DL_FUNC)&nanoarrow_c_convert_array_stream, 4}, {"nanoarrow_c_infer_ptype", (DL_FUNC)&nanoarrow_c_infer_ptype, 1}, {"nanoarrow_c_convert_array", (DL_FUNC)&nanoarrow_c_convert_array, 2}, + {"nanoarrow_c_ipc_array_reader_buffer", (DL_FUNC)&nanoarrow_c_ipc_array_reader_buffer, + 1}, + {"nanoarrow_c_ipc_array_reader_connection", + (DL_FUNC)&nanoarrow_c_ipc_array_reader_connection, 1}, {"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0}, {"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0}, {"nanoarrow_c_allocate_array_stream", (DL_FUNC)&nanoarrow_c_allocate_array_stream, 0}, diff --git a/r/src/ipc.c b/r/src/ipc.c new file mode 100644 index 000000000..3039c7e6e --- /dev/null +++ b/r/src/ipc.c @@ -0,0 +1,161 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#define R_NO_REMAP +#include +#include + +#include "nanoarrow_ipc.h" + +#include "buffer.h" +#include "nanoarrow/r.h" +#include "util.h" + +static void finalize_input_stream_xptr(SEXP input_stream_xptr) { + struct ArrowIpcInputStream* input_stream = + (struct ArrowIpcInputStream*)R_ExternalPtrAddr(input_stream_xptr); + if (input_stream != NULL && input_stream->release != NULL) { + input_stream->release(input_stream); + } + + if (input_stream != NULL) { + ArrowFree(input_stream); + } +} + +static SEXP input_stream_owning_xptr(void) { + struct ArrowIpcInputStream* input_stream = + (struct ArrowIpcInputStream*)ArrowMalloc(sizeof(struct ArrowIpcInputStream)); + input_stream->release = NULL; + SEXP input_stream_xptr = + PROTECT(R_MakeExternalPtr(input_stream, R_NilValue, R_NilValue)); + R_RegisterCFinalizer(input_stream_xptr, &finalize_input_stream_xptr); + UNPROTECT(1); + return input_stream_xptr; +} + +SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr) { + struct ArrowBuffer* buffer = buffer_from_xptr(buffer_xptr); + + SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr()); + struct ArrowArrayStream* array_stream = + nanoarrow_output_array_stream_from_xptr(array_stream_xptr); + + SEXP input_stream_xptr = PROTECT(input_stream_owning_xptr()); + struct ArrowIpcInputStream* input_stream = + (struct ArrowIpcInputStream*)R_ExternalPtrAddr(input_stream_xptr); + + int code = ArrowIpcInputStreamInitBuffer(input_stream, buffer); + if (code != NANOARROW_OK) { + Rf_error("ArrowIpcInputStreamInitBuffer() failed"); + } + + code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL); + if (code != NANOARROW_OK) { + Rf_error("ArrowIpcArrayStreamReaderInit() failed"); + } + + UNPROTECT(2); + return array_stream_xptr; +} + +struct ConnectionInputStreamHandler { + SEXP con; + uint8_t* buf; + int64_t buf_size_bytes; + int64_t* size_read_out; + struct ArrowError* error; + int return_code; +}; + +static SEXP handle_readbin_error(SEXP cond, void* hdata) { + struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata; + + SEXP fun = PROTECT(Rf_install("conditionMessage")); + SEXP call = PROTECT(Rf_lang2(fun, cond)); + SEXP result = PROTECT(Rf_eval(call, R_BaseEnv)); + SEXP result0 = STRING_ELT(result, 0); + const char* cond_msg = Rf_translateCharUTF8(result0); + + ArrowErrorSet(data->error, "R execution error: %s", cond_msg); + data->return_code = EIO; + + UNPROTECT(3); + return R_NilValue; +} + +static SEXP call_readbin(void* hdata) { + struct ConnectionInputStreamHandler* data = (struct ConnectionInputStreamHandler*)hdata; + SEXP n = PROTECT(Rf_ScalarReal((double)data->buf_size_bytes)); + SEXP call = PROTECT(Rf_lang4(nanoarrow_sym_readbin, data->con, nanoarrow_ptype_raw, n)); + + SEXP result = PROTECT(Rf_eval(call, R_BaseEnv)); + R_xlen_t bytes_read = Rf_xlength(result); + memcpy(data->buf, RAW(result), bytes_read); + *(data->size_read_out) = bytes_read; + + UNPROTECT(3); + return R_NilValue; +} + +static ArrowErrorCode read_con_input_stream(struct ArrowIpcInputStream* stream, + uint8_t* buf, int64_t buf_size_bytes, + int64_t* size_read_out, + struct ArrowError* error) { + if (!nanoarrow_is_main_thread()) { + ArrowErrorSet(error, "Can't read from R connection on a non-R thread"); + return EIO; + } + + struct ConnectionInputStreamHandler data; + data.con = (SEXP)stream->private_data; + data.buf = buf; + data.buf_size_bytes = buf_size_bytes; + data.size_read_out = size_read_out; + data.error = error; + data.return_code = NANOARROW_OK; + + R_tryCatchError(&call_readbin, &data, &handle_readbin_error, &data); + return data.return_code; +} + +static void release_con_input_stream(struct ArrowIpcInputStream* stream) { + nanoarrow_release_sexp((SEXP)stream->private_data); +} + +SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con) { + SEXP array_stream_xptr = PROTECT(nanoarrow_array_stream_owning_xptr()); + struct ArrowArrayStream* array_stream = + nanoarrow_output_array_stream_from_xptr(array_stream_xptr); + + SEXP input_stream_xptr = PROTECT(input_stream_owning_xptr()); + struct ArrowIpcInputStream* input_stream = + (struct ArrowIpcInputStream*)R_ExternalPtrAddr(input_stream_xptr); + + input_stream->read = &read_con_input_stream; + input_stream->release = &release_con_input_stream; + input_stream->private_data = (SEXP)con; + nanoarrow_preserve_sexp(con); + + int code = ArrowIpcArrayStreamReaderInit(array_stream, input_stream, NULL); + if (code != NANOARROW_OK) { + Rf_error("ArrowIpcArrayStreamReaderInit() failed"); + } + + UNPROTECT(2); + return array_stream_xptr; +} diff --git a/r/src/util.c b/r/src/util.c index a278f1876..6d4035ba6 100644 --- a/r/src/util.c +++ b/r/src/util.c @@ -29,6 +29,8 @@ SEXP nanoarrow_cls_data_frame = NULL; SEXP nanoarrow_cls_schema = NULL; SEXP nanoarrow_cls_array_stream = NULL; SEXP nanoarrow_cls_buffer = NULL; +SEXP nanoarrow_sym_readbin = NULL; +SEXP nanoarrow_ptype_raw = NULL; void nanoarrow_init_cached_sexps(void) { SEXP nanoarrow_str = PROTECT(Rf_mkString("nanoarrow")); @@ -40,6 +42,8 @@ void nanoarrow_init_cached_sexps(void) { nanoarrow_cls_schema = PROTECT(Rf_mkString("nanoarrow_schema")); nanoarrow_cls_array_stream = PROTECT(Rf_mkString("nanoarrow_array_stream")); nanoarrow_cls_buffer = PROTECT(Rf_mkString("nanoarrow_buffer")); + nanoarrow_sym_readbin = PROTECT(Rf_install("readBin")); + nanoarrow_ptype_raw = PROTECT(Rf_allocVector(RAWSXP, 0)); R_PreserveObject(nanoarrow_ns_pkg); R_PreserveObject(nanoarrow_cls_array); @@ -49,8 +53,10 @@ void nanoarrow_init_cached_sexps(void) { R_PreserveObject(nanoarrow_cls_schema); R_PreserveObject(nanoarrow_cls_array_stream); R_PreserveObject(nanoarrow_cls_buffer); + R_PreserveObject(nanoarrow_sym_readbin); + R_PreserveObject(nanoarrow_ptype_raw); - UNPROTECT(9); + UNPROTECT(11); } SEXP nanoarrow_c_preserved_count(void) { diff --git a/r/src/util.h b/r/src/util.h index 86e23c232..d652330ed 100644 --- a/r/src/util.h +++ b/r/src/util.h @@ -31,6 +31,8 @@ extern SEXP nanoarrow_cls_data_frame; extern SEXP nanoarrow_cls_schema; extern SEXP nanoarrow_cls_array_stream; extern SEXP nanoarrow_cls_buffer; +extern SEXP nanoarrow_sym_readbin; +extern SEXP nanoarrow_ptype_raw; void nanoarrow_init_cached_sexps(void); diff --git a/r/tests/testthat/test-ipc.R b/r/tests/testthat/test-ipc.R new file mode 100644 index 000000000..5c95f4dc1 --- /dev/null +++ b/r/tests/testthat/test-ipc.R @@ -0,0 +1,258 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +test_that("read_nanoarrow() works for raw vectors", { + stream <- read_nanoarrow(example_ipc_stream()) + expect_s3_class(stream, "nanoarrow_array_stream") + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() works for open connections", { + con <- rawConnection(example_ipc_stream()) + on.exit(close(con)) + + stream <- read_nanoarrow(con) + expect_s3_class(stream, "nanoarrow_array_stream") + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() works for unopened connections", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- file(tf, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + con <- file(tf) + # Don't close on exit, because we're supposed to do that + + stream <- read_nanoarrow(con) + expect_true(isOpen(con)) + stream$release() + expect_error( + close(con), + "invalid connection" + ) +}) + +test_that("read_nanoarrow() works for file paths", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- file(tf, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + stream <- read_nanoarrow(tf) + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() works for URLs", { + tf <- tempfile() + on.exit(unlink(tf)) + + con <- file(tf, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + stream <- read_nanoarrow(paste0("file://", tf)) + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() works for compressed .gz file paths", { + tf <- tempfile(fileext = ".gz") + on.exit(unlink(tf)) + + con <- gzfile(tf, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + stream <- read_nanoarrow(tf) + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() works for compressed .bz2 file paths", { + tf <- tempfile(fileext = ".bz2") + on.exit(unlink(tf)) + + con <- bzfile(tf, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + stream <- read_nanoarrow(tf) + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() works for compressed .zip file paths", { + tf <- tempfile(fileext = ".zip") + tdir <- tempfile() + on.exit(unlink(c(tf, tdir), recursive = TRUE)) + + dir.create(tdir) + uncompressed <- file.path(tdir, "file.arrows") + con <- file(uncompressed, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + local({ + wd <- getwd() + on.exit(setwd(wd)) + setwd(tdir) + zip(tf, "file.arrows", extras = "-q") + }) + + stream <- read_nanoarrow(tf) + expect_identical( + as.data.frame(stream), + data.frame(some_col = c(1L, 2L, 3L)) + ) +}) + +test_that("read_nanoarrow() errors for compressed URL paths", { + expect_error( + read_nanoarrow("https://something.zip"), + "Reading compressed streams from URLs" + ) +}) + +test_that("read_nanoarrow() errors for input with length != 1", { + expect_error( + read_nanoarrow(character(0)), + "Can't interpret character" + ) +}) + +test_that("read_nanoarrow() errors zip archives that contain files != 1", { + tf <- tempfile(fileext = ".zip") + tdir <- tempfile() + on.exit(unlink(c(tf, tdir), recursive = TRUE)) + + dir.create(tdir) + file.create(file.path(tdir, c("file1", "file2"))) + local({ + wd <- getwd() + on.exit(setwd(wd)) + setwd(tdir) + zip(tf, c("file1", "file2"), extras = "-q") + }) + + expect_error( + read_nanoarrow(tf), + "Unzip only supported of archives with exactly one file" + ) +}) + +test_that("read_nanoarrow() reports errors from readBin", { + tf <- tempfile() + on.exit(unlink(tf)) + writeLines("this is not a binary file", tf) + + con <- file(tf, open = "r") + on.exit(close(con)) + + expect_error( + read_nanoarrow(con), + "R execution error" + ) +}) + +test_that("read_nanoarrow() respects lazy argument", { + expect_error( + read_nanoarrow(raw(0), lazy = FALSE), + "No data available on stream" + ) + + reader <- read_nanoarrow(raw(0), lazy = TRUE) + expect_error( + reader$get_next(), + "No data available on stream" + ) + + tf <- tempfile() + con <- rawConnection(raw(0)) + on.exit({ + close(con) + unlink(tf) + }) + + expect_error( + read_nanoarrow(con, lazy = FALSE), + "No data available on stream" + ) + + reader <- read_nanoarrow(con, lazy = TRUE) + expect_error( + reader$get_next(), + "No data available on stream" + ) + + file.create(tf) + expect_error( + read_nanoarrow(tf, lazy = FALSE), + "No data available on stream" + ) + + reader <- read_nanoarrow(tf, lazy = TRUE) + expect_error( + reader$get_next(), + "No data available on stream" + ) +}) + +test_that("read_nanoarrow() from connection errors when called from another thread", { + skip_if_not_installed("arrow") + skip_if_not("dataset" %in% names(arrow::arrow_info()$capabilities)) + skip_if_not_installed("dplyr") + + tf <- tempfile() + tf_out <- tempfile() + on.exit(unlink(c(tf, tf_out), recursive = TRUE)) + + con <- file(tf, "wb") + writeBin(example_ipc_stream(), con) + close(con) + + stream <- read_nanoarrow(tf) + reader <- arrow::as_record_batch_reader(stream) + + # There is an internal MakeSafeRecordBatchReader that ensures all read + # calls happen on the R thread (used in DuckDB integration), but for now + # this should at least error and not crash. + expect_error( + arrow::write_dataset(reader, tf_out), + "Can't read from R connection on a non-R thread" + ) +})