Skip to content

Commit 5ddef63

Browse files
authored
apacheGH-40038: [Java] Export non empty offset buffer for variable-size layout through C Data Interface (apache#40043)
### Rationale for this change We encountered an error when exchanging string array from Java to Rust through Arrow C data interface. At Rust side, it complains that the buffer at position 1 (offset buffer) is null. After tracing down and some debugging, it looks like the issue is Java Arrow `BaseVariableWidthVector` class assigns an empty offset buffer if the array is empty (value count 0). According to Arrow [spec](https://arrow.apache.org/docs/format/Columnar.html#variable-size-binary-layout) for variable size binary layout: > The offsets buffer contains length + 1 signed integers ... So for an empty string array, its offset buffer should be a buffer with one element (generally it is `0`). ### What changes are included in this PR? This patch replaces current empty offset buffer in variable-size layout vector classes when exporting arrays through C Data Interface. ### Are these changes tested? Added test cases. ### Are there any user-facing changes? No * Closes: apache#40038 Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: David Li <[email protected]>
1 parent 1552293 commit 5ddef63

File tree

9 files changed

+174
-25
lines changed

9 files changed

+174
-25
lines changed

java/c/src/main/java/org/apache/arrow/c/ArrayExporter.java

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -98,15 +98,7 @@ void export(ArrowArray array, FieldVector vector, DictionaryProvider dictionaryP
9898
if (buffers != null) {
9999
data.buffers = new ArrayList<>(buffers.size());
100100
data.buffers_ptrs = allocator.buffer((long) buffers.size() * Long.BYTES);
101-
for (ArrowBuf arrowBuf : buffers) {
102-
if (arrowBuf != null) {
103-
arrowBuf.getReferenceManager().retain();
104-
data.buffers_ptrs.writeLong(arrowBuf.memoryAddress());
105-
} else {
106-
data.buffers_ptrs.writeLong(NULL);
107-
}
108-
data.buffers.add(arrowBuf);
109-
}
101+
vector.exportCDataBuffers(data.buffers, data.buffers_ptrs, NULL);
110102
}
111103

112104
if (dictionaryEncoding != null) {

java/c/src/test/java/org/apache/arrow/c/RoundtripTest.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.util.Map;
3434
import java.util.UUID;
3535
import java.util.stream.Collectors;
36+
import java.util.stream.IntStream;
3637
import java.util.stream.Stream;
3738

3839
import org.apache.arrow.memory.ArrowBuf;
@@ -165,10 +166,25 @@ VectorSchemaRoot vectorSchemaRootRoundtrip(VectorSchemaRoot root) {
165166
}
166167

167168
boolean roundtrip(FieldVector vector, Class<?> clazz) {
169+
List<ArrowBuf> fieldBuffers = vector.getFieldBuffers();
170+
List<Integer> orgRefCnts = fieldBuffers.stream().map(buf -> buf.refCnt()).collect(Collectors.toList());
171+
long orgMemorySize = allocator.getAllocatedMemory();
172+
173+
boolean result = false;
168174
try (ValueVector imported = vectorRoundtrip(vector)) {
169175
assertTrue(clazz.isInstance(imported), String.format("expected %s but was %s", clazz, imported.getClass()));
170-
return VectorEqualsVisitor.vectorEquals(vector, imported);
176+
result = VectorEqualsVisitor.vectorEquals(vector, imported);
171177
}
178+
179+
// Check that the ref counts of the buffers are the same after the roundtrip
180+
IntStream.range(0, orgRefCnts.size()).forEach(i -> {
181+
ArrowBuf buf = fieldBuffers.get(i);
182+
assertEquals(buf.refCnt(), orgRefCnts.get(i));
183+
});
184+
185+
assertEquals(orgMemorySize, allocator.getAllocatedMemory());
186+
187+
return result;
172188
}
173189

174190
@Test

java/vector/src/main/java/org/apache/arrow/vector/BaseLargeVariableWidthVector.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,34 @@ public List<ArrowBuf> getFieldBuffers() {
336336
return result;
337337
}
338338

339+
/**
340+
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
341+
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
342+
*/
343+
@Override
344+
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
345+
// before flight/IPC, we must bring the vector to a consistent state.
346+
// this is because, it is possible that the offset buffers of some trailing values
347+
// are not updated. this may cause some data in the data buffer being lost.
348+
// for details, please see TestValueVector#testUnloadVariableWidthVector.
349+
fillHoles(valueCount);
350+
351+
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);
352+
353+
if (offsetBuffer.capacity() == 0) {
354+
// Empty offset buffer is allowed for historical reason.
355+
// To export it through C Data interface, we need to allocate a buffer with one offset.
356+
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
357+
// The ref count of the newly created buffer (i.e., 1) already represents the usage
358+
// at imported side.
359+
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
360+
} else {
361+
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
362+
}
363+
364+
exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true);
365+
}
366+
339367
/**
340368
* Set the reader and writer indexes for the inner buffers.
341369
*/
@@ -456,10 +484,11 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) {
456484
}
457485

458486
/* allocate offset buffer */
459-
private void allocateOffsetBuffer(final long size) {
460-
offsetBuffer = allocator.buffer(size);
487+
private ArrowBuf allocateOffsetBuffer(final long size) {
488+
ArrowBuf offsetBuffer = allocator.buffer(size);
461489
offsetBuffer.readerIndex(0);
462490
initOffsetBuffer();
491+
return offsetBuffer;
463492
}
464493

465494
/* allocate validity buffer */
@@ -760,7 +789,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseLargeV
760789
final long start = getStartOffset(startIndex);
761790
final long end = getStartOffset(startIndex + length);
762791
final long dataLength = end - start;
763-
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
792+
target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
764793
for (int i = 0; i < length + 1; i++) {
765794
final long relativeSourceOffset = getStartOffset(startIndex + i) - start;
766795
target.offsetBuffer.setLong((long) i * OFFSET_WIDTH, relativeSourceOffset);

java/vector/src/main/java/org/apache/arrow/vector/BaseVariableWidthVector.java

Lines changed: 32 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,6 +355,34 @@ public List<ArrowBuf> getFieldBuffers() {
355355
return result;
356356
}
357357

358+
/**
359+
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
360+
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
361+
*/
362+
@Override
363+
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
364+
// before flight/IPC, we must bring the vector to a consistent state.
365+
// this is because, it is possible that the offset buffers of some trailing values
366+
// are not updated. this may cause some data in the data buffer being lost.
367+
// for details, please see TestValueVector#testUnloadVariableWidthVector.
368+
fillHoles(valueCount);
369+
370+
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);
371+
372+
if (offsetBuffer.capacity() == 0) {
373+
// Empty offset buffer is allowed for historical reason.
374+
// To export it through C Data interface, we need to allocate a buffer with one offset.
375+
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
376+
// The ref count of the newly created buffer (i.e., 1) already represents the usage
377+
// at imported side.
378+
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
379+
} else {
380+
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
381+
}
382+
383+
exportBuffer(valueBuffer, buffers, buffersPtr, nullValue, true);
384+
}
385+
358386
/**
359387
* Set the reader and writer indexes for the inner buffers.
360388
*/
@@ -476,11 +504,12 @@ private void allocateBytes(final long valueBufferSize, final int valueCount) {
476504
}
477505

478506
/* allocate offset buffer */
479-
private void allocateOffsetBuffer(final long size) {
507+
private ArrowBuf allocateOffsetBuffer(final long size) {
480508
final int curSize = (int) size;
481-
offsetBuffer = allocator.buffer(curSize);
509+
ArrowBuf offsetBuffer = allocator.buffer(curSize);
482510
offsetBuffer.readerIndex(0);
483511
initOffsetBuffer();
512+
return offsetBuffer;
484513
}
485514

486515
/* allocate validity buffer */
@@ -805,7 +834,7 @@ private void splitAndTransferOffsetBuffer(int startIndex, int length, BaseVariab
805834
(1 + length) * ((long) OFFSET_WIDTH));
806835
target.offsetBuffer = transferBuffer(slicedOffsetBuffer, target.allocator);
807836
} else {
808-
target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
837+
target.offsetBuffer = target.allocateOffsetBuffer((long) (length + 1) * OFFSET_WIDTH);
809838
for (int i = 0; i < length + 1; i++) {
810839
final int relativeSourceOffset = getStartOffset(startIndex + i) - start;
811840
target.offsetBuffer.setInt((long) i * OFFSET_WIDTH, relativeSourceOffset);

java/vector/src/main/java/org/apache/arrow/vector/FieldVector.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,47 @@ public interface FieldVector extends ValueVector {
6060
*/
6161
List<ArrowBuf> getFieldBuffers();
6262

63+
/**
64+
* Export a given buffer and its memory address into a list of buffers and a pointer to the list of buffers.
65+
*
66+
* @param buffer the buffer to export
67+
* @param buffers the list of buffers
68+
* @param buffersPtr the pointer to the list of buffers
69+
* @param nullValue the null value to use for null buffer
70+
* @param retain whether to retain the buffer when exporting
71+
*/
72+
default void exportBuffer(
73+
ArrowBuf buffer,
74+
List<ArrowBuf> buffers,
75+
ArrowBuf buffersPtr,
76+
long nullValue,
77+
boolean retain) {
78+
if (buffer != null) {
79+
if (retain) {
80+
buffer.getReferenceManager().retain();
81+
}
82+
buffersPtr.writeLong(buffer.memoryAddress());
83+
} else {
84+
buffersPtr.writeLong(nullValue);
85+
}
86+
buffers.add(buffer);
87+
}
88+
89+
/**
90+
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
91+
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
92+
*
93+
* By default, when exporting a buffer, it will increase ref count for exported buffer that counts
94+
* the usage at imported side.
95+
*/
96+
default void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
97+
List<ArrowBuf> fieldBuffers = getFieldBuffers();
98+
99+
for (ArrowBuf arrowBuf : fieldBuffers) {
100+
exportBuffer(arrowBuf, buffers, buffersPtr, nullValue, true);
101+
}
102+
}
103+
63104
/**
64105
* Get the inner vectors.
65106
*

java/vector/src/main/java/org/apache/arrow/vector/complex/BaseRepeatedValueVector.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public String getName() {
8383
public boolean allocateNewSafe() {
8484
boolean dataAlloc = false;
8585
try {
86-
allocateOffsetBuffer(offsetAllocationSizeInBytes);
86+
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
8787
dataAlloc = vector.allocateNewSafe();
8888
} catch (Exception e) {
8989
e.printStackTrace();
@@ -97,12 +97,13 @@ public boolean allocateNewSafe() {
9797
return dataAlloc;
9898
}
9999

100-
protected void allocateOffsetBuffer(final long size) {
100+
protected ArrowBuf allocateOffsetBuffer(final long size) {
101101
final int curSize = (int) size;
102-
offsetBuffer = allocator.buffer(curSize);
102+
ArrowBuf offsetBuffer = allocator.buffer(curSize);
103103
offsetBuffer.readerIndex(0);
104104
offsetAllocationSizeInBytes = curSize;
105105
offsetBuffer.setZero(0, offsetBuffer.capacity());
106+
return offsetBuffer;
106107
}
107108

108109
@Override

java/vector/src/main/java/org/apache/arrow/vector/complex/LargeListVector.java

Lines changed: 25 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,26 @@ public List<ArrowBuf> getFieldBuffers() {
287287
return result;
288288
}
289289

290+
/**
291+
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
292+
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
293+
*/
294+
@Override
295+
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
296+
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);
297+
298+
if (offsetBuffer.capacity() == 0) {
299+
// Empty offset buffer is allowed for historical reason.
300+
// To export it through C Data interface, we need to allocate a buffer with one offset.
301+
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
302+
// The ref count of the newly created buffer (i.e., 1) already represents the usage
303+
// at imported side.
304+
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
305+
} else {
306+
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
307+
}
308+
}
309+
290310
/**
291311
* Set the reader and writer indexes for the inner buffers.
292312
*/
@@ -343,7 +363,7 @@ public boolean allocateNewSafe() {
343363
/* allocate offset and data buffer */
344364
boolean dataAlloc = false;
345365
try {
346-
allocateOffsetBuffer(offsetAllocationSizeInBytes);
366+
offsetBuffer = allocateOffsetBuffer(offsetAllocationSizeInBytes);
347367
dataAlloc = vector.allocateNewSafe();
348368
} catch (Exception e) {
349369
e.printStackTrace();
@@ -371,11 +391,12 @@ private void allocateValidityBuffer(final long size) {
371391
validityBuffer.setZero(0, validityBuffer.capacity());
372392
}
373393

374-
protected void allocateOffsetBuffer(final long size) {
375-
offsetBuffer = allocator.buffer(size);
394+
protected ArrowBuf allocateOffsetBuffer(final long size) {
395+
ArrowBuf offsetBuffer = allocator.buffer(size);
376396
offsetBuffer.readerIndex(0);
377397
offsetAllocationSizeInBytes = size;
378398
offsetBuffer.setZero(0, offsetBuffer.capacity());
399+
return offsetBuffer;
379400
}
380401

381402
/**
@@ -656,7 +677,7 @@ public void splitAndTransfer(int startIndex, int length) {
656677
final long startPoint = offsetBuffer.getLong((long) startIndex * OFFSET_WIDTH);
657678
final long sliceLength = offsetBuffer.getLong((long) (startIndex + length) * OFFSET_WIDTH) - startPoint;
658679
to.clear();
659-
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
680+
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
660681
/* splitAndTransfer offset buffer */
661682
for (int i = 0; i < length + 1; i++) {
662683
final long relativeOffset = offsetBuffer.getLong((long) (startIndex + i) * OFFSET_WIDTH) - startPoint;

java/vector/src/main/java/org/apache/arrow/vector/complex/ListVector.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,26 @@ public List<ArrowBuf> getFieldBuffers() {
242242
return result;
243243
}
244244

245+
/**
246+
* Export the buffers of the fields for C Data Interface. This method traverse the buffers and
247+
* export buffer and buffer's memory address into a list of buffers and a pointer to the list of buffers.
248+
*/
249+
@Override
250+
public void exportCDataBuffers(List<ArrowBuf> buffers, ArrowBuf buffersPtr, long nullValue) {
251+
exportBuffer(validityBuffer, buffers, buffersPtr, nullValue, true);
252+
253+
if (offsetBuffer.capacity() == 0) {
254+
// Empty offset buffer is allowed for historical reason.
255+
// To export it through C Data interface, we need to allocate a buffer with one offset.
256+
// We set `retain = false` to explicitly not increase the ref count for the exported buffer.
257+
// The ref count of the newly created buffer (i.e., 1) already represents the usage
258+
// at imported side.
259+
exportBuffer(allocateOffsetBuffer(OFFSET_WIDTH), buffers, buffersPtr, nullValue, false);
260+
} else {
261+
exportBuffer(offsetBuffer, buffers, buffersPtr, nullValue, true);
262+
}
263+
}
264+
245265
/**
246266
* Set the reader and writer indexes for the inner buffers.
247267
*/
@@ -535,7 +555,7 @@ public void splitAndTransfer(int startIndex, int length) {
535555
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
536556
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
537557
to.clear();
538-
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
558+
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
539559
/* splitAndTransfer offset buffer */
540560
for (int i = 0; i < length + 1; i++) {
541561
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;

java/vector/src/main/java/org/apache/arrow/vector/complex/MapVector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,7 +209,7 @@ public void splitAndTransfer(int startIndex, int length) {
209209
final int startPoint = offsetBuffer.getInt(startIndex * OFFSET_WIDTH);
210210
final int sliceLength = offsetBuffer.getInt((startIndex + length) * OFFSET_WIDTH) - startPoint;
211211
to.clear();
212-
to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
212+
to.offsetBuffer = to.allocateOffsetBuffer((length + 1) * OFFSET_WIDTH);
213213
/* splitAndTransfer offset buffer */
214214
for (int i = 0; i < length + 1; i++) {
215215
final int relativeOffset = offsetBuffer.getInt((startIndex + i) * OFFSET_WIDTH) - startPoint;

0 commit comments

Comments
 (0)