Skip to content

Commit 96f686b

Browse files
apacheGH-40061: [C++][Python] Basic conversion of RecordBatch to Arrow Tensor - add option to cast NULL to NaN (apache#40803)
### Rationale for this change The conversion from `RecordBatch` to `Tensor` class exists but it doesn't support record batches with validity bitmaps. This PR adds support for an option to convert null values to NaN. ### What changes are included in this PR? This PR adds a `nul_to_nan` option in `RecordBatch::ToTensor` so that null values are converted to NaN in the resulting `Tensor`. This for example works: ```python >>> import pyarrow as pa >>> batch = pa.record_batch( ... [ ... pa.array([1, 2, 3, 4, None], type=pa.int32()), ... pa.array([10, 20, 30, 40, None], type=pa.float32()), ... ], names = ["a", "b"] ... ) >>> batch pyarrow.RecordBatch a: int32 b: float ---- a: [1,2,3,4,null] b: [10,20,30,40,null] >>> batch.to_tensor(null_to_nan=True) <pyarrow.Tensor> type: double shape: (5, 2) strides: (8, 40) >>> batch.to_tensor(null_to_nan=True).to_numpy() array([[ 1., 10.], [ 2., 20.], [ 3., 30.], [ 4., 40.], [nan, nan]]) ``` but default would raise: ```python >>> batch.to_tensor() Traceback (most recent call last): File "<stdin>", line 1, in <module> File "pyarrow/table.pxi", line 3421, in pyarrow.lib.RecordBatch.to_tensor a: int32 File "pyarrow/error.pxi", line 154, in pyarrow.lib.pyarrow_internal_check_status return check_status(status) File "pyarrow/error.pxi", line 91, in pyarrow.lib.check_status raise convert_status(status) pyarrow.lib.ArrowTypeError: Can only convert a RecordBatch with no nulls. Set null_to_nan to true to convert nulls to nan ``` ### Are these changes tested? Yes. ### Are there any user-facing changes? No. * GitHub Issue: apache#40061 Lead-authored-by: AlenkaF <[email protected]> Co-authored-by: Alenka Frim <[email protected]> Co-authored-by: Joris Van den Bossche <[email protected]> Signed-off-by: Joris Van den Bossche <[email protected]>
1 parent ed8c363 commit 96f686b

File tree

6 files changed

+208
-20
lines changed

6 files changed

+208
-20
lines changed

cpp/src/arrow/record_batch.cc

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "arrow/record_batch.h"
1919

2020
#include <algorithm>
21+
#include <cmath>
2122
#include <cstdlib>
2223
#include <memory>
2324
#include <sstream>
@@ -261,12 +262,19 @@ struct ConvertColumnsToTensorVisitor {
261262
using In = typename T::c_type;
262263
auto in_values = ArraySpan(in_data).GetSpan<In>(1, in_data.length);
263264

264-
if constexpr (std::is_same_v<In, Out>) {
265-
memcpy(out_values, in_values.data(), in_values.size_bytes());
266-
out_values += in_values.size();
265+
if (in_data.null_count == 0) {
266+
if constexpr (std::is_same_v<In, Out>) {
267+
memcpy(out_values, in_values.data(), in_values.size_bytes());
268+
out_values += in_values.size();
269+
} else {
270+
for (In in_value : in_values) {
271+
*out_values++ = static_cast<Out>(in_value);
272+
}
273+
}
267274
} else {
268-
for (In in_value : in_values) {
269-
*out_values++ = static_cast<Out>(in_value);
275+
for (int64_t i = 0; i < in_data.length; ++i) {
276+
*out_values++ =
277+
in_data.IsNull(i) ? static_cast<Out>(NAN) : static_cast<Out>(in_values[i]);
270278
}
271279
}
272280
return Status::OK();
@@ -286,16 +294,20 @@ inline void ConvertColumnsToTensor(const RecordBatch& batch, uint8_t* out) {
286294
}
287295
}
288296

289-
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
297+
Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(bool null_to_nan,
298+
MemoryPool* pool) const {
290299
if (num_columns() == 0) {
291300
return Status::TypeError(
292301
"Conversion to Tensor for RecordBatches without columns/schema is not "
293302
"supported.");
294303
}
295304
// Check for no validity bitmap of each field
305+
// if null_to_nan conversion is set to false
296306
for (int i = 0; i < num_columns(); ++i) {
297-
if (column(i)->null_count() > 0) {
298-
return Status::TypeError("Can only convert a RecordBatch with no nulls.");
307+
if (column(i)->null_count() > 0 && !null_to_nan) {
308+
return Status::TypeError(
309+
"Can only convert a RecordBatch with no nulls. Set null_to_nan to true to "
310+
"convert nulls to NaN");
299311
}
300312
}
301313

@@ -308,12 +320,12 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
308320
std::shared_ptr<Field> result_field = schema_->field(0);
309321
std::shared_ptr<DataType> result_type = result_field->type();
310322

311-
if (num_columns() > 1) {
312-
Field::MergeOptions options;
313-
options.promote_integer_to_float = true;
314-
options.promote_integer_sign = true;
315-
options.promote_numeric_width = true;
323+
Field::MergeOptions options;
324+
options.promote_integer_to_float = true;
325+
options.promote_integer_sign = true;
326+
options.promote_numeric_width = true;
316327

328+
if (num_columns() > 1) {
317329
for (int i = 1; i < num_columns(); ++i) {
318330
if (!is_numeric(column(i)->type()->id())) {
319331
return Status::TypeError("DataType is not supported: ",
@@ -334,6 +346,15 @@ Result<std::shared_ptr<Tensor>> RecordBatch::ToTensor(MemoryPool* pool) const {
334346
result_type = result_field->type();
335347
}
336348

349+
// Check if result_type is signed or unsigned integer and null_to_nan is set to true
350+
// Then all columns should be promoted to float type
351+
if (is_integer(result_type->id()) && null_to_nan) {
352+
ARROW_ASSIGN_OR_RAISE(
353+
result_field,
354+
result_field->MergeWith(field(result_field->name(), float32()), options));
355+
result_type = result_field->type();
356+
}
357+
337358
// Allocate memory
338359
ARROW_ASSIGN_OR_RAISE(
339360
std::shared_ptr<Buffer> result,

cpp/src/arrow/record_batch.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,12 @@ class ARROW_EXPORT RecordBatch {
8585
/// Create a Tensor object with shape (number of rows, number of columns) and
8686
/// strides (type size in bytes, type size in bytes * number of rows).
8787
/// Generated Tensor will have column-major layout.
88+
///
89+
/// \param[in] null_to_nan if true, convert nulls to NaN
90+
/// \param[in] pool the memory pool to allocate the tensor buffer
91+
/// \return the resulting Tensor
8892
Result<std::shared_ptr<Tensor>> ToTensor(
89-
MemoryPool* pool = default_memory_pool()) const;
93+
bool null_to_nan = false, MemoryPool* pool = default_memory_pool()) const;
9094

9195
/// \brief Construct record batch from struct array
9296
///

cpp/src/arrow/record_batch_test.cc

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,8 @@ TEST_F(TestRecordBatch, ToTensorUnsupportedMissing) {
667667
auto batch = RecordBatch::Make(schema, length, {a0, a1});
668668

669669
ASSERT_RAISES_WITH_MESSAGE(TypeError,
670-
"Type error: Can only convert a RecordBatch with no nulls.",
670+
"Type error: Can only convert a RecordBatch with no nulls. "
671+
"Set null_to_nan to true to convert nulls to NaN",
671672
batch->ToTensor());
672673
}
673674

@@ -740,6 +741,79 @@ TEST_F(TestRecordBatch, ToTensorSupportedNaN) {
740741
CheckTensor<FloatType>(tensor, 18, shape, f_strides);
741742
}
742743

744+
TEST_F(TestRecordBatch, ToTensorSupportedNullToNan) {
745+
const int length = 9;
746+
747+
// int32 + float32 = float64
748+
auto f0 = field("f0", int32());
749+
auto f1 = field("f1", float32());
750+
751+
std::vector<std::shared_ptr<Field>> fields = {f0, f1};
752+
auto schema = ::arrow::schema(fields);
753+
754+
auto a0 = ArrayFromJSON(int32(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
755+
auto a1 = ArrayFromJSON(float32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
756+
757+
auto batch = RecordBatch::Make(schema, length, {a0, a1});
758+
759+
ASSERT_OK_AND_ASSIGN(auto tensor, batch->ToTensor(/*null_to_nan=*/true));
760+
ASSERT_OK(tensor->Validate());
761+
762+
std::vector<int64_t> shape = {9, 2};
763+
const int64_t f64_size = sizeof(double);
764+
std::vector<int64_t> f_strides = {f64_size, f64_size * shape[0]};
765+
std::shared_ptr<Tensor> tensor_expected = TensorFromJSON(
766+
float64(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]",
767+
shape, f_strides);
768+
769+
EXPECT_FALSE(tensor_expected->Equals(*tensor));
770+
EXPECT_TRUE(tensor_expected->Equals(*tensor, EqualOptions().nans_equal(true)));
771+
772+
CheckTensor<DoubleType>(tensor, 18, shape, f_strides);
773+
774+
// int32 -> float64
775+
auto f2 = field("f2", int32());
776+
777+
std::vector<std::shared_ptr<Field>> fields1 = {f0, f2};
778+
auto schema1 = ::arrow::schema(fields1);
779+
780+
auto a2 = ArrayFromJSON(int32(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
781+
auto batch1 = RecordBatch::Make(schema1, length, {a0, a2});
782+
783+
ASSERT_OK_AND_ASSIGN(auto tensor1, batch1->ToTensor(/*null_to_nan=*/true));
784+
ASSERT_OK(tensor1->Validate());
785+
786+
EXPECT_FALSE(tensor_expected->Equals(*tensor1));
787+
EXPECT_TRUE(tensor_expected->Equals(*tensor1, EqualOptions().nans_equal(true)));
788+
789+
CheckTensor<DoubleType>(tensor1, 18, shape, f_strides);
790+
791+
// int8 -> float32
792+
auto f3 = field("f3", int8());
793+
auto f4 = field("f4", int8());
794+
795+
std::vector<std::shared_ptr<Field>> fields2 = {f3, f4};
796+
auto schema2 = ::arrow::schema(fields2);
797+
798+
auto a3 = ArrayFromJSON(int8(), "[null, 2, 3, 4, 5, 6, 7, 8, 9]");
799+
auto a4 = ArrayFromJSON(int8(), "[10, 20, 30, 40, null, 60, 70, 80, 90]");
800+
auto batch2 = RecordBatch::Make(schema2, length, {a3, a4});
801+
802+
ASSERT_OK_AND_ASSIGN(auto tensor2, batch2->ToTensor(/*null_to_nan=*/true));
803+
ASSERT_OK(tensor2->Validate());
804+
805+
const int64_t f32_size = sizeof(float);
806+
std::vector<int64_t> f_strides_2 = {f32_size, f32_size * shape[0]};
807+
std::shared_ptr<Tensor> tensor_expected_2 = TensorFromJSON(
808+
float32(), "[NaN, 2, 3, 4, 5, 6, 7, 8, 9, 10, 20, 30, 40, NaN, 60, 70, 80, 90]",
809+
shape, f_strides_2);
810+
811+
EXPECT_FALSE(tensor_expected_2->Equals(*tensor2));
812+
EXPECT_TRUE(tensor_expected_2->Equals(*tensor2, EqualOptions().nans_equal(true)));
813+
814+
CheckTensor<FloatType>(tensor2, 18, shape, f_strides_2);
815+
}
816+
743817
TEST_F(TestRecordBatch, ToTensorSupportedTypesMixed) {
744818
const int length = 9;
745819

python/pyarrow/includes/libarrow.pxd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -984,7 +984,7 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
984984
shared_ptr[CRecordBatch] Slice(int64_t offset)
985985
shared_ptr[CRecordBatch] Slice(int64_t offset, int64_t length)
986986

987-
CResult[shared_ptr[CTensor]] ToTensor() const
987+
CResult[shared_ptr[CTensor]] ToTensor(c_bool null_to_nan, CMemoryPool* pool) const
988988

989989
cdef cppclass CRecordBatchWithMetadata" arrow::RecordBatchWithMetadata":
990990
shared_ptr[CRecordBatch] batch

python/pyarrow/table.pxi

Lines changed: 46 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3389,21 +3389,64 @@ cdef class RecordBatch(_Tabular):
33893389
<CResult[shared_ptr[CArray]]>deref(c_record_batch).ToStructArray())
33903390
return pyarrow_wrap_array(c_array)
33913391

3392-
def to_tensor(self):
3392+
def to_tensor(self, c_bool null_to_nan=False, MemoryPool memory_pool=None):
33933393
"""
33943394
Convert to a :class:`~pyarrow.Tensor`.
33953395
33963396
RecordBatches that can be converted have fields of type signed or unsigned
3397-
integer or float, including all bit-widths, with no validity bitmask.
3397+
integer or float, including all bit-widths. RecordBatches with validity bitmask
3398+
for any of the arrays can be converted with ``null_to_nan``turned to ``True``.
3399+
In this case null values are converted to NaN and signed or unsigned integer
3400+
type arrays are promoted to appropriate float type.
3401+
3402+
Parameters
3403+
----------
3404+
null_to_nan : bool, default False
3405+
Whether to write null values in the result as ``NaN``.
3406+
memory_pool : MemoryPool, default None
3407+
For memory allocations, if required, otherwise use default pool
3408+
3409+
Examples
3410+
--------
3411+
>>> import pyarrow as pa
3412+
>>> batch = pa.record_batch(
3413+
... [
3414+
... pa.array([1, 2, 3, 4, None], type=pa.int32()),
3415+
... pa.array([10, 20, 30, 40, None], type=pa.float32()),
3416+
... ], names = ["a", "b"]
3417+
... )
3418+
3419+
>>> batch
3420+
pyarrow.RecordBatch
3421+
a: int32
3422+
b: float
3423+
----
3424+
a: [1,2,3,4,null]
3425+
b: [10,20,30,40,null]
3426+
3427+
>>> batch.to_tensor(null_to_nan=True)
3428+
<pyarrow.Tensor>
3429+
type: double
3430+
shape: (5, 2)
3431+
strides: (8, 40)
3432+
3433+
>>> batch.to_tensor(null_to_nan=True).to_numpy()
3434+
array([[ 1., 10.],
3435+
[ 2., 20.],
3436+
[ 3., 30.],
3437+
[ 4., 40.],
3438+
[nan, nan]])
33983439
"""
33993440
cdef:
34003441
shared_ptr[CRecordBatch] c_record_batch
34013442
shared_ptr[CTensor] c_tensor
3443+
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool)
34023444

34033445
c_record_batch = pyarrow_unwrap_batch(self)
34043446
with nogil:
34053447
c_tensor = GetResultValue(
3406-
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor())
3448+
<CResult[shared_ptr[CTensor]]>deref(c_record_batch).ToTensor(null_to_nan,
3449+
pool))
34073450
return pyarrow_wrap_tensor(c_tensor)
34083451

34093452
def _export_to_c(self, out_ptr, out_schema_ptr=0):

python/pyarrow/tests/test_table.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1061,7 +1061,7 @@ def test_recordbatch_to_tensor_null():
10611061
arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90]
10621062
batch = pa.RecordBatch.from_arrays(
10631063
[
1064-
pa.array(arr1, type=pa.float32()),
1064+
pa.array(arr1, type=pa.int32()),
10651065
pa.array(arr2, type=pa.float32()),
10661066
], ["a", "b"]
10671067
)
@@ -1071,6 +1071,52 @@ def test_recordbatch_to_tensor_null():
10711071
):
10721072
batch.to_tensor()
10731073

1074+
result = batch.to_tensor(null_to_nan=True)
1075+
1076+
x = np.array([arr1, arr2], np.float64).transpose()
1077+
expected = pa.Tensor.from_numpy(x)
1078+
1079+
np.testing.assert_equal(result.to_numpy(), x)
1080+
assert result.size == 18
1081+
assert result.type == pa.float64()
1082+
assert result.shape == expected.shape
1083+
assert result.strides == expected.strides
1084+
1085+
# int32 -> float64
1086+
batch = pa.RecordBatch.from_arrays(
1087+
[
1088+
pa.array(arr1, type=pa.int32()),
1089+
pa.array(arr2, type=pa.int32()),
1090+
], ["a", "b"]
1091+
)
1092+
1093+
result = batch.to_tensor(null_to_nan=True)
1094+
1095+
np.testing.assert_equal(result.to_numpy(), x)
1096+
assert result.size == 18
1097+
assert result.type == pa.float64()
1098+
assert result.shape == expected.shape
1099+
assert result.strides == expected.strides
1100+
1101+
# int8 -> float32
1102+
batch = pa.RecordBatch.from_arrays(
1103+
[
1104+
pa.array(arr1, type=pa.int8()),
1105+
pa.array(arr2, type=pa.int8()),
1106+
], ["a", "b"]
1107+
)
1108+
1109+
result = batch.to_tensor(null_to_nan=True)
1110+
1111+
x = np.array([arr1, arr2], np.float32).transpose()
1112+
expected = pa.Tensor.from_numpy(x)
1113+
1114+
np.testing.assert_equal(result.to_numpy(), x)
1115+
assert result.size == 18
1116+
assert result.type == pa.float32()
1117+
assert result.shape == expected.shape
1118+
assert result.strides == expected.strides
1119+
10741120

10751121
def test_recordbatch_to_tensor_empty():
10761122
batch = pa.RecordBatch.from_arrays(

0 commit comments

Comments
 (0)