Skip to content

Commit 5ce70b9

Browse files
committed
Add support for reading parquet file thanks to arrow-dataset #576
1 parent 485f3ba commit 5ce70b9

File tree

7 files changed

+77
-10
lines changed

7 files changed

+77
-10
lines changed

dataframe-arrow/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ dependencies {
1919
implementation(libs.arrow.vector)
2020
implementation(libs.arrow.format)
2121
implementation(libs.arrow.memory)
22+
implementation(libs.arrow.dataset)
2223
implementation(libs.commonsCompress)
2324
implementation(libs.kotlin.reflect)
2425
implementation(libs.kotlin.datetimeJvm)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

+9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
34
import org.apache.arrow.memory.RootAllocator
45
import org.apache.arrow.vector.ipc.ArrowReader
56
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
@@ -184,3 +185,11 @@ public fun DataFrame.Companion.readArrow(
184185
*/
185186
public fun ArrowReader.toDataFrame(nullability: NullabilityOptions = NullabilityOptions.Infer): AnyFrame =
186187
DataFrame.Companion.readArrowImpl(this, nullability)
188+
189+
/**
190+
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
191+
*/
192+
public fun DataFrame.Companion.readParquet(
193+
url: URL,
194+
nullability: NullabilityOptions = NullabilityOptions.Infer,
195+
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

+29
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
4+
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
5+
import org.apache.arrow.dataset.jni.DirectReservationListener
6+
import org.apache.arrow.dataset.jni.NativeMemoryPool
7+
import org.apache.arrow.dataset.scanner.ScanOptions
38
import org.apache.arrow.memory.RootAllocator
49
import org.apache.arrow.vector.BigIntVector
510
import org.apache.arrow.vector.BitVector
@@ -383,3 +388,27 @@ internal fun DataFrame.Companion.readArrowImpl(
383388
return flattened.concatKeepingSchema()
384389
}
385390
}
391+
392+
internal fun DataFrame.Companion.readArrowDataset(
393+
fileUri: String,
394+
fileFormat: FileFormat,
395+
nullability: NullabilityOptions = NullabilityOptions.Infer,
396+
): AnyFrame {
397+
val scanOptions = ScanOptions(32768)
398+
RootAllocator().use { allocator ->
399+
FileSystemDatasetFactory(
400+
allocator,
401+
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
402+
fileFormat,
403+
fileUri,
404+
).use { datasetFactory ->
405+
datasetFactory.finish().use { dataset ->
406+
dataset.newScan(scanOptions).use { scanner ->
407+
scanner.scanBatches().use { reader ->
408+
return readArrow(reader, nullability)
409+
}
410+
}
411+
}
412+
}
413+
}
414+
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt

+13
Original file line numberDiff line numberDiff line change
@@ -627,4 +627,17 @@ internal class ArrowKtTest {
627627
DataFrame.readArrow(dbArrowReader) shouldBe expected
628628
}
629629
}
630+
631+
@Test
632+
fun testReadParquet() {
633+
val path = testResource("test.arrow.parquet").path
634+
val dataFrame = DataFrame.readParquet(URL("file:$path"))
635+
dataFrame.rowsCount() shouldBe 300
636+
assertEstimations(
637+
exampleFrame = dataFrame,
638+
expectedNullable = false,
639+
hasNulls = false,
640+
fromParquet = true,
641+
)
642+
}
630643
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt

+23-9
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ import kotlin.reflect.typeOf
1818
* Assert that we have got the same data that was originally saved on example creation.
1919
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
2020
*/
21-
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
21+
internal fun assertEstimations(
22+
exampleFrame: AnyFrame,
23+
expectedNullable: Boolean,
24+
hasNulls: Boolean,
25+
fromParquet: Boolean = false,
26+
) {
2227
/**
2328
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
2429
*/
@@ -136,14 +141,23 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
136141
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
137142
}
138143

139-
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
140-
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
141-
datetimeCol.forEachIndexed { i, element ->
142-
assertValueOrNull(
143-
rowNumber = iBatch(i),
144-
actual = element,
145-
expected = LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC),
146-
)
144+
if (fromParquet) {
145+
// parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
146+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
147+
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
148+
datetimeCol.forEachIndexed { i, element ->
149+
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
150+
}
151+
} else {
152+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
153+
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
154+
datetimeCol.forEachIndexed { i, element ->
155+
assertValueOrNull(
156+
rowNumber = iBatch(i),
157+
actual = element,
158+
expected = LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC),
159+
)
160+
}
147161
}
148162

149163
val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Binary file not shown.

gradle/libs.versions.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ junit-platform = "1.10.2"
4747
kotestAsserions = "5.5.4"
4848

4949
jsoup = "1.17.2"
50-
arrow = "15.0.0"
50+
arrow = "17.0.0"
5151
docProcessor = "0.3.10"
5252
simpleGit = "2.0.3"
5353
dependencyVersions = "0.51.0"
@@ -104,6 +104,7 @@ jsoup = { group = "org.jsoup", name = "jsoup", version.ref = "jsoup" }
104104
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
105105
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
106106
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
107+
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
107108
arrow-c-data = { group = "org.apache.arrow", name = "arrow-c-data", version.ref = "arrow" }
108109

109110

0 commit comments

Comments
 (0)