Skip to content

Commit 1114cb0

Browse files
committed
processing review feedback
1 parent 0bc0c0b commit 1114cb0

File tree

10 files changed

+130
-100
lines changed

10 files changed

+130
-100
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package org.jetbrains.kotlinx.dataframe.impl.io
2+
3+
import org.apache.commons.csv.CSVFormat
4+
import org.apache.commons.csv.CSVRecord
5+
import org.jetbrains.kotlinx.dataframe.AnyFrame
6+
import org.jetbrains.kotlinx.dataframe.DataColumn
7+
import org.jetbrains.kotlinx.dataframe.DataFrame
8+
import org.jetbrains.kotlinx.dataframe.api.ParserOptions
9+
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
10+
import org.jetbrains.kotlinx.dataframe.api.tryParse
11+
import org.jetbrains.kotlinx.dataframe.impl.ColumnNameGenerator
12+
import org.jetbrains.kotlinx.dataframe.io.ColType
13+
import org.jetbrains.kotlinx.dataframe.io.toKType
14+
import java.io.BufferedReader
15+
import java.io.Reader
16+
import kotlin.reflect.full.withNullability
17+
import kotlin.reflect.typeOf
18+
19+
internal fun DataFrame.Companion.readDelimImpl(
20+
reader: Reader,
21+
format: CSVFormat,
22+
colTypes: Map<String, ColType>,
23+
skipLines: Int,
24+
readLines: Int?,
25+
parserOptions: ParserOptions?,
26+
): AnyFrame {
27+
var reader = reader
28+
if (skipLines > 0) {
29+
reader = BufferedReader(reader)
30+
repeat(skipLines) { reader.readLine() }
31+
}
32+
33+
val csvParser = format.parse(reader)
34+
val records = if (readLines == null) {
35+
csvParser.records
36+
} else {
37+
require(readLines >= 0) { "`readLines` must not be negative" }
38+
val records = ArrayList<CSVRecord>(readLines)
39+
val iter = csvParser.iterator()
40+
var count = readLines ?: 0
41+
while (iter.hasNext() && 0 < count--) {
42+
records.add(iter.next())
43+
}
44+
records
45+
}
46+
47+
val columnNames = csvParser.headerNames.takeIf { it.isNotEmpty() }
48+
?: (1..(records.firstOrNull()?.count() ?: 0)).map { index -> "X$index" }
49+
50+
val generator = ColumnNameGenerator()
51+
val uniqueNames = columnNames.map { generator.addUnique(it) }
52+
53+
val cols = uniqueNames.mapIndexed { colIndex, colName ->
54+
val defaultColType = colTypes[".default"]
55+
val colType = colTypes[colName] ?: defaultColType
56+
var hasNulls = false
57+
val values = records.map {
58+
if (it.isSet(colIndex)) {
59+
it[colIndex].ifEmpty {
60+
hasNulls = true
61+
null
62+
}
63+
} else {
64+
hasNulls = true
65+
null
66+
}
67+
}
68+
val column = DataColumn.createValueColumn(colName, values, typeOf<String>().withNullability(hasNulls))
69+
val skipTypes = when {
70+
colType != null ->
71+
// skip all types except the desired type
72+
ParserOptions.allTypesExcept(colType.toKType())
73+
74+
else ->
75+
// respect the provided parser options
76+
parserOptions?.skipTypes ?: emptySet()
77+
}
78+
val adjustsedParserOptions = (parserOptions ?: ParserOptions())
79+
.copy(skipTypes = skipTypes)
80+
81+
return@mapIndexed column.tryParse(adjustsedParserOptions)
82+
}
83+
return cols.toDataFrame()
84+
}

core/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/csv.kt

+11-66
Original file line numberDiff line numberDiff line change
@@ -5,24 +5,20 @@ import kotlinx.datetime.LocalDate
55
import kotlinx.datetime.LocalDateTime
66
import kotlinx.datetime.LocalTime
77
import org.apache.commons.csv.CSVFormat
8-
import org.apache.commons.csv.CSVRecord
98
import org.apache.commons.io.input.BOMInputStream
109
import org.jetbrains.kotlinx.dataframe.AnyFrame
1110
import org.jetbrains.kotlinx.dataframe.AnyRow
12-
import org.jetbrains.kotlinx.dataframe.DataColumn
1311
import org.jetbrains.kotlinx.dataframe.DataFrame
1412
import org.jetbrains.kotlinx.dataframe.DataRow
1513
import org.jetbrains.kotlinx.dataframe.annotations.Interpretable
1614
import org.jetbrains.kotlinx.dataframe.annotations.OptInRefine
1715
import org.jetbrains.kotlinx.dataframe.annotations.Refine
1816
import org.jetbrains.kotlinx.dataframe.api.ParserOptions
1917
import org.jetbrains.kotlinx.dataframe.api.forEach
20-
import org.jetbrains.kotlinx.dataframe.api.toDataFrame
21-
import org.jetbrains.kotlinx.dataframe.api.tryParse
2218
import org.jetbrains.kotlinx.dataframe.codeGen.DefaultReadCsvMethod
2319
import org.jetbrains.kotlinx.dataframe.codeGen.DefaultReadDfMethod
24-
import org.jetbrains.kotlinx.dataframe.impl.ColumnNameGenerator
2520
import org.jetbrains.kotlinx.dataframe.impl.api.parse
21+
import org.jetbrains.kotlinx.dataframe.impl.io.readDelimImpl
2622
import org.jetbrains.kotlinx.dataframe.util.AS_URL
2723
import org.jetbrains.kotlinx.dataframe.util.AS_URL_IMPORT
2824
import org.jetbrains.kotlinx.dataframe.util.AS_URL_REPLACE
@@ -46,7 +42,6 @@ import java.nio.charset.Charset
4642
import java.util.zip.GZIPInputStream
4743
import kotlin.reflect.KClass
4844
import kotlin.reflect.KType
49-
import kotlin.reflect.full.withNullability
5045
import kotlin.reflect.typeOf
5146
import kotlin.time.Duration
5247

@@ -354,74 +349,24 @@ public fun DataFrame.Companion.readDelim(
354349
skipLines: Int = 0,
355350
readLines: Int? = null,
356351
parserOptions: ParserOptions? = null,
357-
): AnyFrame {
352+
): AnyFrame =
358353
try {
359-
var reader = reader
360-
if (skipLines > 0) {
361-
reader = BufferedReader(reader)
362-
repeat(skipLines) { reader.readLine() }
363-
}
364-
365-
val csvParser = format.parse(reader)
366-
val records = if (readLines == null) {
367-
csvParser.records
368-
} else {
369-
require(readLines >= 0) { "`readLines` must not be negative" }
370-
val records = ArrayList<CSVRecord>(readLines)
371-
val iter = csvParser.iterator()
372-
var count = readLines ?: 0
373-
while (iter.hasNext() && 0 < count--) {
374-
records.add(iter.next())
375-
}
376-
records
377-
}
378-
379-
val columnNames = csvParser.headerNames.takeIf { it.isNotEmpty() }
380-
?: (1..(records.firstOrNull()?.count() ?: 0)).map { index -> "X$index" }
381-
382-
val generator = ColumnNameGenerator()
383-
val uniqueNames = columnNames.map { generator.addUnique(it) }
384-
385-
val cols = uniqueNames.mapIndexed { colIndex, colName ->
386-
val defaultColType = colTypes[".default"]
387-
val colType = colTypes[colName] ?: defaultColType
388-
var hasNulls = false
389-
val values = records.map {
390-
if (it.isSet(colIndex)) {
391-
it[colIndex].ifEmpty {
392-
hasNulls = true
393-
null
394-
}
395-
} else {
396-
hasNulls = true
397-
null
398-
}
399-
}
400-
val column = DataColumn.createValueColumn(colName, values, typeOf<String>().withNullability(hasNulls))
401-
val skipTypes = when {
402-
colType != null ->
403-
// skip all types except the desired type
404-
ParserOptions.allTypesExcept(colType.toKType())
405-
406-
else ->
407-
// respect the provided parser options
408-
parserOptions?.skipTypes ?: emptySet()
409-
}
410-
val adjustsedParserOptions = (parserOptions ?: ParserOptions())
411-
.copy(skipTypes = skipTypes)
412-
413-
return@mapIndexed column.tryParse(adjustsedParserOptions)
414-
}
415-
return cols.toDataFrame()
416-
} catch (e: OutOfMemoryError) {
354+
readDelimImpl(
355+
reader = reader,
356+
format = format,
357+
colTypes = colTypes,
358+
skipLines = skipLines,
359+
readLines = readLines,
360+
parserOptions = parserOptions,
361+
)
362+
} catch (_: OutOfMemoryError) {
417363
throw OutOfMemoryError(
418364
"Ran out of memory reading this CSV-like file. " +
419365
"You can try our new experimental CSV reader by adding the dependency " +
420366
"\"org.jetbrains.kotlinx:dataframe-csv:{VERSION}\" and using `DataFrame.readCsv()` instead of " +
421367
"`DataFrame.readCSV()`.",
422368
)
423369
}
424-
}
425370

426371
public fun AnyFrame.writeCSV(file: File, format: CSVFormat = CSVFormat.DEFAULT): Unit =
427372
writeCSV(FileWriter(file), format)

dataframe-csv/build.gradle.kts

-4
Original file line numberDiff line numberDiff line change
@@ -163,8 +163,4 @@ kotlinPublications {
163163

164164
kotlin {
165165
explicitApi()
166-
sourceSets.all {
167-
languageSettings {
168-
}
169-
}
170166
}

dataframe-csv/src/main/kotlin/org/jetbrains/kotlinx/dataframe/documentation/DelimParams.kt

+2-2
Original file line numberDiff line numberDiff line change
@@ -189,8 +189,8 @@ internal object DelimParams {
189189
/**
190190
* @param parseParallel Whether to parse the data in parallel. Default: `true`.
191191
*
192-
* If `true`, the data will be parsed in parallel.
193-
* This is usually faster, but can be turned off for debugging.
192+
* If `true`, the data will be read and parsed in parallel by the Deephaven parser.
193+
* This is usually faster but can be turned off for debugging.
194194
*/
195195
const val PARSE_PARALLEL: Boolean = true
196196

dataframe-csv/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/io/DataFrameCustomDoubleParser.kt

+3-3
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,11 @@ internal class DataFrameCustomDoubleParser(parserOptions: ParserOptions) : Custo
1414
override fun parse(bs: ByteSlice): Double =
1515
try {
1616
fastDoubleParser.parseOrNull(bs.data(), bs.begin(), bs.size())
17-
} catch (e: Exception) {
17+
} catch (_: Exception) {
1818
null
19-
} ?: throw NumberFormatException("Failed to parse double")
19+
} ?: throw NumberFormatException()
2020

2121
override fun parse(cs: CharSequence): Double =
2222
fastDoubleParser.parseOrNull(cs.toString())
23-
?: throw NumberFormatException("Failed to parse double")
23+
?: throw NumberFormatException()
2424
}

dataframe-csv/src/main/kotlin/org/jetbrains/kotlinx/dataframe/impl/io/ListSink.kt

+2
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ internal class ListSink(val columnIndex: Int, val dataType: DataType) : SinkSour
194194
}
195195
}
196196

197+
// Deephaven's fast path for numeric type inference supports only byte, short, int, and long
198+
// so this should never be reached
197199
else -> error("unsupported sink state")
198200
}
199201
}

dataframe-csv/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/Compression.kt

+15-13
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,21 @@ import java.util.zip.ZipInputStream
1818
*/
1919
public sealed class Compression<I : InputStream>(public open val wrapStream: (InputStream) -> I) {
2020

21+
public companion object {
22+
public fun of(fileOrUrl: String): Compression<*> =
23+
when (fileOrUrl.split(".").last()) {
24+
"gz" -> Gzip
25+
"zip" -> Zip
26+
else -> None
27+
}
28+
29+
public fun of(file: File): Compression<*> = of(file.name)
30+
31+
public fun of(path: Path): Compression<*> = of(path.fileName?.toString() ?: "")
32+
33+
public fun of(url: URL): Compression<*> = of(url.path)
34+
}
35+
2136
/** Can be overridden to perform some actions before reading from the input stream. */
2237
public open fun doFirst(inputStream: I) {}
2338

@@ -90,16 +105,3 @@ public inline fun <T, I : InputStream> InputStream.useDecompressed(
90105
compression.doFinally(wrappedStream)
91106
}
92107
}
93-
94-
public fun compressionStateOf(fileOrUrl: String): Compression<*> =
95-
when (fileOrUrl.split(".").last()) {
96-
"gz" -> Compression.Gzip
97-
"zip" -> Compression.Zip
98-
else -> Compression.None
99-
}
100-
101-
public fun compressionStateOf(file: File): Compression<*> = compressionStateOf(file.name)
102-
103-
public fun compressionStateOf(path: Path): Compression<*> = compressionStateOf(path.fileName?.toString() ?: "")
104-
105-
public fun compressionStateOf(url: URL): Compression<*> = compressionStateOf(url.path)

dataframe-csv/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/readCsv.kt

+5-4
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.jetbrains.kotlinx.dataframe.documentation.DelimParams.READ_LINES
2828
import org.jetbrains.kotlinx.dataframe.documentation.DelimParams.SKIP_LINES
2929
import org.jetbrains.kotlinx.dataframe.documentation.DelimParams.TRIM_INSIDE_QUOTED
3030
import org.jetbrains.kotlinx.dataframe.impl.io.readDelimImpl
31+
import org.jetbrains.kotlinx.dataframe.io.Compression.Companion
3132
import java.io.File
3233
import java.io.FileInputStream
3334
import java.io.InputStream
@@ -50,7 +51,7 @@ public fun DataFrame.Companion.readCsv(
5051
header: List<String> = HEADER,
5152
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
5253
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
53-
compression: Compression<*> = compressionStateOf(path),
54+
compression: Compression<*> = Compression.of(path),
5455
colTypes: Map<String, ColType> = COL_TYPES,
5556
skipLines: Long = SKIP_LINES,
5657
readLines: Long? = READ_LINES,
@@ -101,7 +102,7 @@ public fun DataFrame.Companion.readCsv(
101102
header: List<String> = HEADER,
102103
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
103104
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
104-
compression: Compression<*> = compressionStateOf(file),
105+
compression: Compression<*> = Compression.of(file),
105106
colTypes: Map<String, ColType> = COL_TYPES,
106107
skipLines: Long = SKIP_LINES,
107108
readLines: Long? = READ_LINES,
@@ -152,7 +153,7 @@ public fun DataFrame.Companion.readCsv(
152153
header: List<String> = HEADER,
153154
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
154155
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
155-
compression: Compression<*> = compressionStateOf(url),
156+
compression: Compression<*> = Compression.of(url),
156157
colTypes: Map<String, ColType> = COL_TYPES,
157158
skipLines: Long = SKIP_LINES,
158159
readLines: Long? = READ_LINES,
@@ -203,7 +204,7 @@ public fun DataFrame.Companion.readCsv(
203204
header: List<String> = HEADER,
204205
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
205206
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
206-
compression: Compression<*> = compressionStateOf(fileOrUrl),
207+
compression: Compression<*> = Compression.of(fileOrUrl),
207208
colTypes: Map<String, ColType> = COL_TYPES,
208209
skipLines: Long = SKIP_LINES,
209210
readLines: Long? = READ_LINES,

dataframe-csv/src/main/kotlin/org/jetbrains/kotlinx/dataframe/io/readDelim.kt

+4-4
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public fun DataFrame.Companion.readDelim(
5656
header: List<String> = HEADER,
5757
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
5858
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
59-
compression: Compression<*> = compressionStateOf(path),
59+
compression: Compression<*> = Compression.of(path),
6060
colTypes: Map<String, ColType> = COL_TYPES,
6161
skipLines: Long = SKIP_LINES,
6262
readLines: Long? = READ_LINES,
@@ -107,7 +107,7 @@ public fun DataFrame.Companion.readDelim(
107107
header: List<String> = HEADER,
108108
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
109109
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
110-
compression: Compression<*> = compressionStateOf(file),
110+
compression: Compression<*> = Compression.of(file),
111111
colTypes: Map<String, ColType> = COL_TYPES,
112112
skipLines: Long = SKIP_LINES,
113113
readLines: Long? = READ_LINES,
@@ -158,7 +158,7 @@ public fun DataFrame.Companion.readDelim(
158158
header: List<String> = HEADER,
159159
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
160160
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
161-
compression: Compression<*> = compressionStateOf(url),
161+
compression: Compression<*> = Compression.of(url),
162162
colTypes: Map<String, ColType> = COL_TYPES,
163163
skipLines: Long = SKIP_LINES,
164164
readLines: Long? = READ_LINES,
@@ -209,7 +209,7 @@ public fun DataFrame.Companion.readDelim(
209209
header: List<String> = HEADER,
210210
hasFixedWidthColumns: Boolean = HAS_FIXED_WIDTH_COLUMNS,
211211
fixedColumnWidths: List<Int> = FIXED_COLUMN_WIDTHS,
212-
compression: Compression<*> = compressionStateOf(fileOrUrl),
212+
compression: Compression<*> = Compression.of(fileOrUrl),
213213
colTypes: Map<String, ColType> = COL_TYPES,
214214
skipLines: Long = SKIP_LINES,
215215
readLines: Long? = READ_LINES,

0 commit comments

Comments
 (0)