Skip to content

Commit

Permalink
feat(r): Add bindings for IPC reader (#390)
Browse files Browse the repository at this point in the history
This PR adds bindings to nanoarrow's IPC reader from R. The entrypoint
for a user is `read_nanoarrow()`, which accepts raw vectors,
connections, and file paths (thin wrapper around connections). It also
fixes a number of compiler warnings in the IPC extension.

The implementation is not particularly complicated from the R side, but
the main drawback of adding IPC support is that the flatbuffers
implementation (flatcc) actively does not care about gcc compiler
warnings (whereas CRAN actively cares about them).

These are all slower than the arrow package, which has more tools at its
disposal to prevent copies.

``` r
library(arrow, warn.conflicts = FALSE)
library(nanoarrow)

# Basic read example
tf <- tempfile()
write_ipc_stream(dplyr::starwars, tf)
read_nanoarrow(tf) |> tibble::as_tibble()
#> # A tibble: 87 × 14
#>    name     height  mass hair_color skin_color eye_color birth_year sex   gender
#>    <chr>     <int> <dbl> <chr>      <chr>      <chr>          <dbl> <chr> <chr> 
#>  1 Luke Sk…    172    77 blond      fair       blue            19   male  mascu…
#>  2 C-3PO       167    75 <NA>       gold       yellow         112   none  mascu…
#>  3 R2-D2        96    32 <NA>       white, bl… red             33   none  mascu…
#>  4 Darth V…    202   136 none       white      yellow          41.9 male  mascu…
#>  5 Leia Or…    150    49 brown      light      brown           19   fema… femin…
#>  6 Owen La…    178   120 brown, gr… light      blue            52   male  mascu…
#>  7 Beru Wh…    165    75 brown      light      blue            47   fema… femin…
#>  8 R5-D4        97    32 <NA>       white, red red             NA   none  mascu…
#>  9 Biggs D…    183    84 black      light      brown           24   male  mascu…
#> 10 Obi-Wan…    182    77 auburn, w… fair       blue-gray       57   male  mascu…
#> # ℹ 77 more rows
#> # ℹ 5 more variables: homeworld <chr>, species <chr>, films <list<chr>>,
#> #   vehicles <list<chr>>, starships <list<chr>>

df_bigish <- nanoarrow:::vec_gen(data.frame(x = character()), n = 1e6)
write_ipc_stream(df_bigish, tf)

# Wrapper because mmap is apparently not passed through from read_ipc_stream()
# and this is pretty significant
read_ipc_stream_wrap <- function(f, ..., mmap) {
  arrow::read_ipc_stream(
    arrow:::make_readable_file(f, mmap = mmap, random_access = FALSE),
    ...
  )
}

tf_raw <- brio::read_file_raw(tf)

# Slower than arrow for raw vector input because of C implementation,
# which doesn't currently share the global buffer (just shares buffers
# between columns within a single batch)
bench::mark(
  nanoarrow = read_nanoarrow(tf_raw) |> collect_array_stream(),
  arrow = read_ipc_stream(buffer(tf_raw), as_data_frame = FALSE),
  check = FALSE
)
#> # A tibble: 2 × 6
#>   expression      min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr> <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 nanoarrow    1.27ms   1.84ms      439.    41.5KB     2.18
#> 2 arrow      509.26µs 528.65µs     1821.     3.6MB    79.3


# Slower than arrow, maybe because of C implementation, but definitely
# because it uses base::readBin() which necessiates an extra copy
bench::mark(
  nanoarrow = read_nanoarrow(tf) |> collect_array_stream(),
  arrow_mmap = read_ipc_stream_wrap(tf, mmap = TRUE, as_data_frame = FALSE),
  arrow = read_ipc_stream_wrap(tf, mmap = FALSE, as_data_frame = FALSE),
  check = FALSE
)
#> # A tibble: 3 × 6
#>   expression      min   median `itr/sec` mem_alloc `gc/sec`
#>   <bch:expr> <bch:tm> <bch:tm>     <dbl> <bch:byt>    <dbl>
#> 1 nanoarrow    5.18ms   5.66ms      174.    16.2MB   189.  
#> 2 arrow_mmap 613.48µs  640.5µs     1526.   528.6KB    13.9 
#> 3 arrow        2.18ms   2.84ms      339.   551.6KB     4.06
```

<sup>Created on 2024-02-19 with [reprex
v2.0.2](https://reprex.tidyverse.org)</sup>
  • Loading branch information
paleolimbot authored Feb 23, 2024
1 parent c66ddc3 commit e4f0754
Show file tree
Hide file tree
Showing 17 changed files with 797 additions and 54 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/r-check.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ jobs:
R_KEEP_PKG_SOURCE: yes

steps:
- uses: actions/checkout@v2
- uses: actions/checkout@v4

- uses: r-lib/actions/setup-pandoc@v2
- uses: r-lib/actions/setup-r@v2
Expand All @@ -66,14 +66,14 @@ jobs:
if: matrix.config.os != 'windows-latest'
env:
PKG_CPPFLAGS: "-DNANOARROW_DEBUG"
PKG_CFLAGS: "-Werror -Wall -Wextra -Wpedantic -Wconversion -Wno-unused-parameter -Wno-sign-conversion -Wno-cast-function-type"
PKG_CFLAGS: "-Werror -Wall -Wextra -Wpedantic -Wconversion -Wno-unused-parameter -Wno-sign-conversion -Wno-cast-function-type -Wno-misleading-indentation -Wno-conversion -Wno-unused-const-variable"
run: |
R CMD INSTALL r --preclean
shell: bash

- uses: r-lib/actions/setup-r-dependencies@v2
with:
extra-packages: any::rcmdcheck, arrow=?ignore-before-r=4.0.0, github::r-lib/[email protected]
extra-packages: any::rcmdcheck, arrow=?ignore-before-r=4.0.0
needs: check
working-directory: r

Expand Down
29 changes: 29 additions & 0 deletions extensions/nanoarrow_ipc/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,35 @@ else()

endif()

# Don't add extra warning flags when bundling, since we treat flatcc
# as a part of the nanoarrow_ipc target and we have no control over the
# warnings it produces.
if(CMAKE_BUILD_TYPE STREQUAL "Debug" AND NOT NANOARROW_IPC_BUNDLE)
if(CMAKE_C_COMPILER_ID STREQUAL "GNU")
target_compile_options(nanoarrow_ipc
PRIVATE -Wall
-Werror
-Wextra
-Wpedantic
-Wno-type-limits
-Wmaybe-uninitialized
-Wunused-result
-Wconversion
-Wno-sign-conversion
-Wno-misleading-indentation)
elseif(CMAKE_C_COMPILER_ID STREQUAL "AppleClang" OR CMAKE_C_COMPILER_ID STREQUAL
"Clang")
target_compile_options(nanoarrow_ipc
PRIVATE -Wall
-Werror
-Wextra
-Wpedantic
-Wdocumentation
-Wconversion
-Wno-sign-conversion)
endif()
endif()

if(NANOARROW_IPC_BUILD_TESTS)
set(MEMORYCHECK_COMMAND_OPTIONS
"--leak-check=full --suppressions=${CMAKE_CURRENT_LIST_DIR}/../../valgrind.supp --error-exitcode=1"
Expand Down
44 changes: 27 additions & 17 deletions extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_decoder.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,14 @@
#include "nanoarrow_ipc.h"
#include "nanoarrow_ipc_flatcc_generated.h"

// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA
#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA)
#define ENODATA 120
#endif

// A more readable expression way to refer to the fact that there are 8 bytes
// at the beginning of every message header.
const static int64_t kMessageHeaderPrefixSize = 8;
static const int32_t kMessageHeaderPrefixSize = 8;

// Internal representation of a parsed "Field" from flatbuffers. This
// represents a field in a depth-first walk of column arrays and their
Expand Down Expand Up @@ -155,6 +160,10 @@ int ArrowIpcSharedBufferIsThreadSafe(void) { return 0; }

static void ArrowIpcSharedBufferFree(struct ArrowBufferAllocator* allocator, uint8_t* ptr,
int64_t size) {
NANOARROW_UNUSED(allocator);
NANOARROW_UNUSED(ptr);
NANOARROW_UNUSED(size);

struct ArrowIpcSharedBufferPrivate* private_data =
(struct ArrowIpcSharedBufferPrivate*)allocator->private_data;

Expand Down Expand Up @@ -455,8 +464,10 @@ static int ArrowIpcDecoderSetTypeFixedSizeBinary(struct ArrowSchema* schema,
struct ArrowError* error) {
ns(FixedSizeBinary_table_t) type = (ns(FixedSizeBinary_table_t))type_generic;
int fixed_size = ns(FixedSizeBinary_byteWidth(type));
return ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY,
fixed_size);
NANOARROW_RETURN_NOT_OK_WITH_ERROR(
ArrowSchemaSetTypeFixedSize(schema, NANOARROW_TYPE_FIXED_SIZE_BINARY, fixed_size),
error);
return NANOARROW_OK;
}

static int ArrowIpcDecoderSetTypeDate(struct ArrowSchema* schema,
Expand Down Expand Up @@ -488,7 +499,7 @@ static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema,
case ns(TimeUnit_MILLISECOND):
if (bitwidth != 32) {
ArrowErrorSet(error, "Expected bitwidth of 32 for Time TimeUnit %s but found %d",
ns(TimeUnit_name(time_unit)), bitwidth);
ns(TimeUnit_name(ns(Time_unit(type)))), bitwidth);
return EINVAL;
}

Expand All @@ -499,7 +510,7 @@ static int ArrowIpcDecoderSetTypeTime(struct ArrowSchema* schema,
case ns(TimeUnit_NANOSECOND):
if (bitwidth != 64) {
ArrowErrorSet(error, "Expected bitwidth of 64 for Time TimeUnit %s but found %d",
ns(TimeUnit_name(time_unit)), bitwidth);
ns(TimeUnit_name(ns(Time_unit(type)))), bitwidth);
return EINVAL;
}

Expand Down Expand Up @@ -644,7 +655,6 @@ static int ArrowIpcDecoderSetTypeUnion(struct ArrowSchema* schema,
int format_out_size = sizeof(union_types_str);
int n_chars = 0;

const char* format_prefix;
switch (union_mode) {
case ns(UnionMode_Sparse):
n_chars = snprintf(format_cursor, format_out_size, "+us:");
Expand Down Expand Up @@ -826,9 +836,6 @@ static int ArrowIpcDecoderSetChildren(struct ArrowSchema* schema, ns(Field_vec_t
static int ArrowIpcDecoderDecodeSchemaHeader(struct ArrowIpcDecoder* decoder,
flatbuffers_generic_t message_header,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ns(Schema_table_t) schema = (ns(Schema_table_t))message_header;
int endianness = ns(Schema_endianness(schema));
switch (endianness) {
Expand Down Expand Up @@ -977,9 +984,6 @@ static inline int ArrowIpcDecoderReadHeaderPrefix(struct ArrowIpcDecoder* decode
ArrowErrorCode ArrowIpcDecoderPeekHeader(struct ArrowIpcDecoder* decoder,
struct ArrowBufferView data,
struct ArrowError* error) {
struct ArrowIpcDecoderPrivate* private_data =
(struct ArrowIpcDecoderPrivate*)decoder->private_data;

ArrowIpcDecoderResetHeaderInfo(decoder);
NANOARROW_RETURN_NOT_OK(ArrowIpcDecoderReadHeaderPrefix(
decoder, &data, &decoder->header_size_bytes, error));
Expand Down Expand Up @@ -1051,7 +1055,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
}

// Read some basic information from the message
int32_t metadata_version = ns(Message_version(message));
decoder->metadata_version = ns(Message_version(message));
decoder->message_type = ns(Message_header_type(message));
decoder->body_size_bytes = ns(Message_bodyLength(message));

Expand All @@ -1063,7 +1067,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
case ns(MetadataVersion_V3):
case ns(MetadataVersion_V4):
ArrowErrorSet(error, "Expected metadata version V5 but found %s",
ns(MetadataVersion_name(decoder->metadata_version)));
ns(MetadataVersion_name(ns(Message_version(message)))));
break;
default:
ArrowErrorSet(error, "Unexpected value for Message metadata version (%d)",
Expand All @@ -1085,7 +1089,7 @@ ArrowErrorCode ArrowIpcDecoderDecodeHeader(struct ArrowIpcDecoder* decoder,
case ns(MessageHeader_Tensor):
case ns(MessageHeader_SparseTensor):
ArrowErrorSet(error, "Unsupported message type: '%s'",
ns(MessageHeader_type_name(decoder->message_type)));
ns(MessageHeader_type_name(ns(Message_header_type(message)))));
return ENOTSUP;
default:
ArrowErrorSet(error, "Unknown message type: %d", (int)(decoder->message_type));
Expand Down Expand Up @@ -1245,7 +1249,7 @@ struct ArrowIpcBufferSource {
int64_t buffer_length_bytes;
enum ArrowIpcCompressionType codec;
enum ArrowType data_type;
int32_t element_size_bits;
int64_t element_size_bits;
int swap_endian;
};

Expand Down Expand Up @@ -1284,6 +1288,10 @@ static ArrowErrorCode ArrowIpcMakeBufferFromView(struct ArrowIpcBufferFactory* f
struct ArrowBufferView* dst_view,
struct ArrowBuffer* dst,
struct ArrowError* error) {
NANOARROW_UNUSED(factory);
NANOARROW_UNUSED(dst);
NANOARROW_UNUSED(error);

struct ArrowBufferView* body = (struct ArrowBufferView*)factory->private_data;
dst_view->data.as_uint8 = body->data.as_uint8 + src->body_offset_bytes;
dst_view->size_bytes = src->buffer_length_bytes;
Expand All @@ -1303,6 +1311,8 @@ static ArrowErrorCode ArrowIpcMakeBufferFromShared(struct ArrowIpcBufferFactory*
struct ArrowBufferView* dst_view,
struct ArrowBuffer* dst,
struct ArrowError* error) {
NANOARROW_UNUSED(error);

struct ArrowIpcSharedBuffer* shared =
(struct ArrowIpcSharedBuffer*)factory->private_data;
ArrowBufferReset(dst);
Expand Down Expand Up @@ -1364,7 +1374,7 @@ static int ArrowIpcDecoderSwapEndian(struct ArrowIpcBufferSource* src,
const uint64_t* ptr_src = out_view->data.as_uint64;
uint64_t* ptr_dst = (uint64_t*)dst->data;
uint64_t words[4];
int n_words = src->element_size_bits / 64;
int n_words = (int)(src->element_size_bits / 64);

for (int64_t i = 0; i < (dst->size_bytes / n_words / 8); i++) {
for (int j = 0; j < n_words; j++) {
Expand Down
7 changes: 7 additions & 0 deletions extensions/nanoarrow_ipc/src/nanoarrow/nanoarrow_ipc_reader.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
#include "nanoarrow.h"
#include "nanoarrow_ipc.h"

// R 3.6 / Windows builds on a very old toolchain that does not define ENODATA
#if defined(_WIN32) && !defined(_MSC_VER) && !defined(ENODATA)
#define ENODATA 120
#endif

void ArrowIpcInputStreamMove(struct ArrowIpcInputStream* src,
struct ArrowIpcInputStream* dst) {
memcpy(dst, src, sizeof(struct ArrowIpcInputStream));
Expand All @@ -37,6 +42,8 @@ static ArrowErrorCode ArrowIpcInputStreamBufferRead(struct ArrowIpcInputStream*
uint8_t* buf, int64_t buf_size_bytes,
int64_t* size_read_out,
struct ArrowError* error) {
NANOARROW_UNUSED(error);

if (buf_size_bytes == 0) {
*size_read_out = 0;
return NANOARROW_OK;
Expand Down
3 changes: 3 additions & 0 deletions r/.covrignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,6 @@

src/nanoarrow.c
src/nanoarrow.h
src/nanoarrow_ipc.h
src/nanoarrow_ipc.c
src/flatcc*
5 changes: 5 additions & 0 deletions r/NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ S3method(print,nanoarrow_array)
S3method(print,nanoarrow_array_stream)
S3method(print,nanoarrow_buffer)
S3method(print,nanoarrow_schema)
S3method(read_nanoarrow,character)
S3method(read_nanoarrow,connection)
S3method(read_nanoarrow,raw)
S3method(str,nanoarrow_array)
S3method(str,nanoarrow_array_stream)
S3method(str,nanoarrow_buffer)
Expand All @@ -124,6 +127,7 @@ export(convert_array)
export(convert_array_extension)
export(convert_array_stream)
export(convert_buffer)
export(example_ipc_stream)
export(infer_nanoarrow_ptype)
export(infer_nanoarrow_ptype_extension)
export(infer_nanoarrow_schema)
Expand Down Expand Up @@ -188,6 +192,7 @@ export(nanoarrow_pointer_set_protected)
export(nanoarrow_schema_modify)
export(nanoarrow_schema_parse)
export(nanoarrow_version)
export(read_nanoarrow)
export(register_nanoarrow_extension)
export(resolve_nanoarrow_extension)
export(unregister_nanoarrow_extension)
Expand Down
Loading

0 comments on commit e4f0754

Please sign in to comment.