Skip to content

Commit e66906a

Browse files
committed
Add support for reading parquet file thanks to arrow-dataset #576
1 parent 117e958 commit e66906a

File tree

7 files changed

+80
-11
lines changed

7 files changed

+80
-11
lines changed

dataframe-arrow/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
implementation(libs.arrow.vector)
2020
implementation(libs.arrow.format)
2121
implementation(libs.arrow.memory)
22+
implementation(libs.arrow.dataset)
2223
implementation(libs.commonsCompress)
2324
implementation(libs.kotlin.reflect)
2425
implementation(libs.kotlin.datetimeJvm)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

+9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
34
import org.apache.arrow.memory.RootAllocator
45
import org.apache.arrow.vector.ipc.ArrowReader
56
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
@@ -184,3 +185,11 @@ public fun DataFrame.Companion.readArrow(
184185
*/
185186
public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame =
186187
DataFrame.Companion.readArrowImpl(this, nullability)
188+
189+
/**
190+
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
191+
*/
192+
public fun DataFrame.Companion.readParquet(
193+
url: URL,
194+
nullability: NullabilityOptions = NullabilityOptions.Infer,
195+
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

+29
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,11 @@ import kotlinx.datetime.LocalTime
66
import kotlinx.datetime.toKotlinLocalDate
77
import kotlinx.datetime.toKotlinLocalDateTime
88
import kotlinx.datetime.toKotlinLocalTime
9+
import org.apache.arrow.dataset.file.FileFormat
10+
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
11+
import org.apache.arrow.dataset.jni.DirectReservationListener
12+
import org.apache.arrow.dataset.jni.NativeMemoryPool
13+
import org.apache.arrow.dataset.scanner.ScanOptions
914
import org.apache.arrow.memory.RootAllocator
1015
import org.apache.arrow.vector.BigIntVector
1116
import org.apache.arrow.vector.BitVector
@@ -414,3 +419,27 @@ internal fun DataFrame.Companion.readArrowImpl(
414419
return flattened.concatKeepingSchema()
415420
}
416421
}
422+
423+
internal fun DataFrame.Companion.readArrowDataset(
424+
fileUri: String,
425+
fileFormat: FileFormat,
426+
nullability: NullabilityOptions = NullabilityOptions.Infer,
427+
): AnyFrame {
428+
val scanOptions = ScanOptions(32768)
429+
RootAllocator().use { allocator ->
430+
FileSystemDatasetFactory(
431+
allocator,
432+
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
433+
fileFormat,
434+
fileUri,
435+
).use { datasetFactory ->
436+
datasetFactory.finish().use { dataset ->
437+
dataset.newScan(scanOptions).use { scanner ->
438+
scanner.scanBatches().use { reader ->
439+
return readArrow(reader, nullability)
440+
}
441+
}
442+
}
443+
}
444+
}
445+
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt

+13
Original file line numberDiff line numberDiff line change
@@ -653,4 +653,17 @@ internal class ArrowKtTest {
653653
DataFrame.readArrow(dbArrowReader) shouldBe expected
654654
}
655655
}
656+
657+
@Test
658+
fun testReadParquet() {
659+
val path = testResource("test.arrow.parquet").path
660+
val dataFrame = DataFrame.readParquet(URL("file:$path"))
661+
dataFrame.rowsCount() shouldBe 300
662+
assertEstimations(
663+
exampleFrame = dataFrame,
664+
expectedNullable = false,
665+
hasNulls = false,
666+
fromParquet = true,
667+
)
668+
}
656669
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt

+27-11
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,12 @@ import java.time.LocalTime as JavaLocalTime
2424
* Assert that we have got the same data that was originally saved on example creation.
2525
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
2626
*/
27-
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
27+
internal fun assertEstimations(
28+
exampleFrame: AnyFrame,
29+
expectedNullable: Boolean,
30+
hasNulls: Boolean,
31+
fromParquet: Boolean = false,
32+
) {
2833
/**
2934
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
3035
*/
@@ -142,16 +147,27 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
142147
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
143148
}
144149

145-
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
146-
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
147-
datetimeCol.forEachIndexed { i, element ->
148-
assertValueOrNull(
149-
rowNumber = iBatch(i),
150-
actual = element,
151-
expected = JavaLocalDateTime
152-
.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC)
153-
.toKotlinLocalDateTime(),
154-
)
150+
if (fromParquet) {
151+
// parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
152+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
153+
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
154+
datetimeCol.forEachIndexed { i, element ->
155+
assertValueOrNull(iBatch(i), element, JavaLocalDate.ofEpochDay(iBatch(i).toLong() * 30).toKotlinLocalDate())
156+
}
157+
} else {
158+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
159+
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
160+
datetimeCol.forEachIndexed { i, element ->
161+
assertValueOrNull(
162+
rowNumber = iBatch(i),
163+
actual = element,
164+
expected = JavaLocalDateTime.ofEpochSecond(
165+
iBatch(i).toLong() * 60 * 60 * 24 * 30,
166+
0,
167+
ZoneOffset.UTC,
168+
).toKotlinLocalDateTime(),
169+
)
170+
}
155171
}
156172

157173
val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Binary file not shown.

gradle/libs.versions.toml

+1
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
101101
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
102102
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
103103
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
104+
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
104105
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }
105106

106107

0 commit comments

Comments
 (0)