Skip to content

Commit 158ed95

Browse files
committed
Add support for reading parquet file thanks to arrow-dataset #576
1 parent d3b278f commit 158ed95

File tree

7 files changed

+78
-7
lines changed

7 files changed

+78
-7
lines changed

dataframe-arrow/build.gradle.kts

+1
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ dependencies {
1313
implementation(libs.arrow.vector)
1414
implementation(libs.arrow.format)
1515
implementation(libs.arrow.memory)
16+
implementation(libs.arrow.dataset)
1617
implementation(libs.commonsCompress)
1718
implementation(libs.kotlin.reflect)
1819
implementation(libs.kotlin.datetimeJvm)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReading.kt

+9
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
34
import org.apache.arrow.memory.RootAllocator
45
import org.apache.arrow.vector.ipc.ArrowReader
56
import org.apache.commons.compress.utils.SeekableInMemoryByteChannel
@@ -186,3 +187,11 @@ public fun DataFrame.Companion.readArrow(
186187
public fun ArrowReader.toDataFrame(
187188
nullability: NullabilityOptions = NullabilityOptions.Infer
188189
): AnyFrame = DataFrame.Companion.readArrowImpl(this, nullability)
190+
191+
/**
192+
* Read [Parquet](https://parquet.apache.org/) data from existing [url] by using [Arrow Dataset](https://arrow.apache.org/docs/java/dataset.html)
193+
*/
194+
public fun DataFrame.Companion.readParquet(
195+
url: URL,
196+
nullability: NullabilityOptions = NullabilityOptions.Infer
197+
): AnyFrame = readArrowDataset(url.toString(), FileFormat.PARQUET, nullability)

dataframe-arrow/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/arrowReadingImpl.kt

+30-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package org.jetbrains.kotlinx.dataframe.io
22

3+
import org.apache.arrow.dataset.file.FileFormat
4+
import org.apache.arrow.dataset.file.FileSystemDatasetFactory
5+
import org.apache.arrow.dataset.jni.DirectReservationListener
6+
import org.apache.arrow.dataset.jni.NativeMemoryPool
7+
import org.apache.arrow.dataset.scanner.ScanOptions
38
import org.apache.arrow.memory.RootAllocator
49
import org.apache.arrow.vector.BigIntVector
510
import org.apache.arrow.vector.BitVector
@@ -296,7 +301,7 @@ internal fun DataFrame.Companion.readArrowImpl(
296301
add(df)
297302
}
298303
}
299-
is ArrowStreamReader -> {
304+
else -> {
300305
val root = reader.vectorSchemaRoot
301306
val schema = root.schema
302307
while (reader.loadNextBatch()) {
@@ -309,3 +314,27 @@ internal fun DataFrame.Companion.readArrowImpl(
309314
return flattened.concatKeepingSchema()
310315
}
311316
}
317+
318+
internal fun DataFrame.Companion.readArrowDataset(
319+
fileUri: String,
320+
fileFormat: FileFormat,
321+
nullability: NullabilityOptions = NullabilityOptions.Infer,
322+
): AnyFrame {
323+
val scanOptions = ScanOptions(32768)
324+
RootAllocator().use { allocator ->
325+
FileSystemDatasetFactory(
326+
allocator,
327+
NativeMemoryPool.createListenable(DirectReservationListener.instance()),
328+
fileFormat,
329+
fileUri
330+
).use { datasetFactory ->
331+
datasetFactory.finish().use { dataset ->
332+
dataset.newScan(scanOptions).use { scanner ->
333+
scanner.scanBatches().use { reader ->
334+
return readArrow(reader, nullability)
335+
}
336+
}
337+
}
338+
}
339+
}
340+
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/ArrowKtTest.kt

+13
Original file line numberDiff line numberDiff line change
@@ -583,4 +583,17 @@ internal class ArrowKtTest {
583583
val arrowStreamReader = ArrowStreamReader(ipcInputStream, RootAllocator())
584584
arrowStreamReader.toDataFrame() shouldBe expected
585585
}
586+
587+
@Test
588+
fun testReadParquet(){
589+
val path = testResource("test.arrow.parquet").path
590+
val dataFrame = DataFrame.readParquet(URL("file:$path"))
591+
dataFrame.rowsCount() shouldBe 300
592+
assertEstimations(
593+
exampleFrame = dataFrame,
594+
expectedNullable = false,
595+
hasNulls = false,
596+
fromParquet = true
597+
)
598+
}
586599
}

dataframe-arrow/src/test/kotlin/org/jetbrains/kotlinx/dataframe/io/exampleEstimatesAssertions.kt

+23-5
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ import kotlin.reflect.typeOf
1818
* Assert that we have got the same data that was originally saved on example creation.
1919
* Example generation project is currently located at https://github.com/Kopilov/arrow_example
2020
*/
21-
internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean, hasNulls: Boolean) {
21+
internal fun assertEstimations(
22+
exampleFrame: AnyFrame,
23+
expectedNullable: Boolean,
24+
hasNulls: Boolean,
25+
fromParquet: Boolean = false
26+
) {
2227
/**
2328
* In [exampleFrame] we get two concatenated batches. To assert the estimations, we should transform frame row number to batch row number
2429
*/
@@ -129,10 +134,23 @@ internal fun assertEstimations(exampleFrame: AnyFrame, expectedNullable: Boolean
129134
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
130135
}
131136

132-
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
133-
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
134-
datetimeCol.forEachIndexed { i, element ->
135-
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
137+
/**
138+
*
139+
*
140+
*/
141+
if (fromParquet){
142+
//parquet format have only one type of date: https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#date without time
143+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDate?>
144+
datetimeCol.type() shouldBe typeOf<LocalDate>().withNullability(expectedNullable)
145+
datetimeCol.forEachIndexed { i, element ->
146+
assertValueOrNull(iBatch(i), element, LocalDate.ofEpochDay(iBatch(i).toLong() * 30))
147+
}
148+
}else {
149+
val datetimeCol = exampleFrame["date64"] as DataColumn<LocalDateTime?>
150+
datetimeCol.type() shouldBe typeOf<LocalDateTime>().withNullability(expectedNullable)
151+
datetimeCol.forEachIndexed { i, element ->
152+
assertValueOrNull(iBatch(i), element, LocalDateTime.ofEpochSecond(iBatch(i).toLong() * 60 * 60 * 24 * 30, 0, ZoneOffset.UTC))
153+
}
136154
}
137155

138156
val timeSecCol = exampleFrame["time32_seconds"] as DataColumn<LocalTime?>
Binary file not shown.

gradle/libs.versions.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ sl4j = "2.0.7"
3030
junit = "4.13.2"
3131
kotestAsserions = "4.6.3"
3232
jsoup = "1.14.3"
33-
arrow = "11.0.0"
33+
arrow = "15.0.0"
3434
docProcessor = "0.2.3"
3535
simpleGit = "2.0.3"
3636

@@ -65,6 +65,7 @@ jsoup = { module = "org.jsoup:jsoup", version.ref = "jsoup" }
6565
arrow-format = { group = "org.apache.arrow", name = "arrow-format", version.ref = "arrow" }
6666
arrow-vector = { group = "org.apache.arrow", name = "arrow-vector", version.ref = "arrow" }
6767
arrow-memory = { group = "org.apache.arrow", name = "arrow-memory-unsafe", version.ref = "arrow" }
68+
arrow-dataset = { group = "org.apache.arrow", name = "arrow-dataset", version.ref = "arrow" }
6869

6970
kotlinpoet = { group = "com.squareup", name = "kotlinpoet", version.ref = "kotlinpoet" }
7071
swagger = { group = "io.swagger.parser.v3", name = "swagger-parser", version.ref = "openapi" }

0 commit comments

Comments
 (0)