1
1
package org.jetbrains.kotlinx.dataframe.impl.api
2
2
3
+ import io.github.oshai.kotlinlogging.KotlinLogging
3
4
import org.jetbrains.kotlinx.dataframe.AnyFrame
4
5
import org.jetbrains.kotlinx.dataframe.AnyRow
5
6
import org.jetbrains.kotlinx.dataframe.ColumnsSelector
@@ -11,13 +12,13 @@ import org.jetbrains.kotlinx.dataframe.api.ConvertSchemaDsl
11
12
import org.jetbrains.kotlinx.dataframe.api.ConverterScope
12
13
import org.jetbrains.kotlinx.dataframe.api.ExcessiveColumns
13
14
import org.jetbrains.kotlinx.dataframe.api.Infer
15
+ import org.jetbrains.kotlinx.dataframe.api.add
14
16
import org.jetbrains.kotlinx.dataframe.api.all
15
17
import org.jetbrains.kotlinx.dataframe.api.allNulls
16
18
import org.jetbrains.kotlinx.dataframe.api.asColumnGroup
17
19
import org.jetbrains.kotlinx.dataframe.api.concat
18
20
import org.jetbrains.kotlinx.dataframe.api.convertTo
19
21
import org.jetbrains.kotlinx.dataframe.api.emptyDataFrame
20
- import org.jetbrains.kotlinx.dataframe.api.getColumnPaths
21
22
import org.jetbrains.kotlinx.dataframe.api.isEmpty
22
23
import org.jetbrains.kotlinx.dataframe.api.map
23
24
import org.jetbrains.kotlinx.dataframe.api.name
@@ -29,12 +30,14 @@ import org.jetbrains.kotlinx.dataframe.columns.ColumnGroup
29
30
import org.jetbrains.kotlinx.dataframe.columns.ColumnKind
30
31
import org.jetbrains.kotlinx.dataframe.columns.ColumnPath
31
32
import org.jetbrains.kotlinx.dataframe.columns.FrameColumn
33
+ import org.jetbrains.kotlinx.dataframe.columns.UnresolvedColumnsPolicy
32
34
import org.jetbrains.kotlinx.dataframe.columns.toColumnSet
33
35
import org.jetbrains.kotlinx.dataframe.exceptions.ExcessiveColumnsException
34
36
import org.jetbrains.kotlinx.dataframe.exceptions.TypeConversionException
35
37
import org.jetbrains.kotlinx.dataframe.impl.emptyPath
36
- import org.jetbrains.kotlinx.dataframe.impl.schema.createEmptyColumn
38
+ import org.jetbrains.kotlinx.dataframe.impl.getColumnPaths
37
39
import org.jetbrains.kotlinx.dataframe.impl.schema.createEmptyDataFrame
40
+ import org.jetbrains.kotlinx.dataframe.impl.schema.createNullFilledColumn
38
41
import org.jetbrains.kotlinx.dataframe.impl.schema.extractSchema
39
42
import org.jetbrains.kotlinx.dataframe.impl.schema.render
40
43
import org.jetbrains.kotlinx.dataframe.kind
@@ -45,6 +48,8 @@ import kotlin.reflect.KType
45
48
import kotlin.reflect.full.withNullability
46
49
import kotlin.reflect.jvm.jvmErasure
47
50
51
+ private val logger = KotlinLogging .logger {}
52
+
48
53
private open class Converter (val transform : ConverterScope .(Any? ) -> Any? , val skipNulls : Boolean )
49
54
50
55
private class Filler (val columns : ColumnsSelector <* , * >, val expr : RowExpression <* , * >)
@@ -252,22 +257,16 @@ internal fun AnyFrame.convertToImpl(
252
257
}
253
258
}.toMutableList()
254
259
255
- // when the target is nullable but the source does not contain a column, fill it in with nulls / empty dataframes
260
+ // when the target is nullable but the source does not contain a column,
261
+ // fill it in with nulls / empty dataframes
256
262
val size = this .size.nrow
257
263
schema.columns.forEach { (name, targetColumn) ->
258
- val isNullable =
259
- // like value column of type Int?
260
- targetColumn.nullable ||
261
- // like value column of type Int? (backup check)
262
- targetColumn.type.isMarkedNullable ||
263
- // like DataRow<Something?> for a group column (all columns in the group will be nullable)
264
- targetColumn.contentType?.isMarkedNullable == true ||
265
- // frame column can be filled with empty dataframes
266
- targetColumn.kind == ColumnKind .Frame
267
-
268
264
if (name !in visited) {
269
- newColumns + = targetColumn.createEmptyColumn(name, size)
270
- if (! isNullable) {
265
+ try {
266
+ newColumns + = targetColumn.createNullFilledColumn(name, size)
267
+ } catch (e: IllegalStateException ) {
268
+ logger.debug(e) { " " }
269
+ // if this could not be done automatically, they need to be filled manually
271
270
missingPaths.add(path + name)
272
271
}
273
272
}
@@ -279,14 +278,39 @@ internal fun AnyFrame.convertToImpl(
279
278
val marker = MarkersExtractor .get(clazz)
280
279
var result = convertToSchema(marker.schema, emptyPath())
281
280
281
+ /*
282
+ * Here we handle all registered fillers of the user.
283
+ * Fillers are registered in the DSL like:
284
+ * ```kt
285
+ * df.convertTo<Target> {
286
+ * fill { col1 and col2 }.with { something }
287
+ * fill { col3 }.with { somethingElse }
288
+ * }
289
+ * ```
290
+ * Users can use this to fill up any column that was missing during the conversion.
291
+ * They can also fill up and thus overwrite any existing column here.
292
+ */
282
293
dsl.fillers.forEach { filler ->
283
- val paths = result.getColumnPaths(filler.columns)
284
- missingPaths.removeAll(paths.toSet())
285
- result = result.update { paths.toColumnSet() }.with {
286
- filler.expr(this , this )
294
+ // get all paths from the `fill { col1 and col2 }` part
295
+ val paths = result.getColumnPaths(UnresolvedColumnsPolicy .Create , filler.columns).toSet()
296
+
297
+ // split the paths into those that are already in the df and those that are missing
298
+ val (newPaths, existingPaths) = paths.partition { it in missingPaths }
299
+
300
+ // first fill cols that are already in the df using the `with {}` part of the dsl
301
+ result = result.update { existingPaths.toColumnSet() }.with { filler.expr(this , this ) }
302
+
303
+ // then create any missing ones by filling using the `with {}` part of the dsl
304
+ result = newPaths.fold(result) { df, newPath ->
305
+ df.add(newPath, Infer .Type ) { filler.expr(this , this ) }
287
306
}
307
+
308
+ // remove the paths that are now filled
309
+ missingPaths - = paths
288
310
}
289
311
312
+ // Inform the user which target columns could not be created in the conversion
313
+ // The user will need to supply extra information for these, like `fill {}` them.
290
314
if (missingPaths.isNotEmpty()) {
291
315
throw IllegalArgumentException (
292
316
" The following columns were not found in DataFrame: ${
0 commit comments