|
1 | 1 | package org.jetbrains.kotlinx.dataframe.impl.api
|
2 | 2 |
|
3 |
| -import kotlinx.coroutines.async |
4 |
| -import kotlinx.coroutines.awaitAll |
5 |
| -import kotlinx.coroutines.coroutineScope |
6 |
| -import kotlinx.coroutines.runBlocking |
7 | 3 | import kotlinx.datetime.Instant
|
8 | 4 | import kotlinx.datetime.LocalDate
|
9 | 5 | import kotlinx.datetime.LocalDateTime
|
@@ -527,44 +523,32 @@ internal fun <T> DataColumn<String?>.parse(parser: StringParser<T>, options: Par
|
527 | 523 | return DataColumn.createValueColumn(name(), parsedValues, parser.type.withNullability(hasNulls)) as DataColumn<T?>
|
528 | 524 | }
|
529 | 525 |
|
530 |
| -internal fun <T> DataFrame<T>.parseImpl(options: ParserOptions?, columns: ColumnsSelector<T, Any?>): DataFrame<T> = |
531 |
| - runBlocking { parseParallel(options, columns) } |
532 |
| - |
533 |
| -private suspend fun <T> DataFrame<T>.parseParallel( |
534 |
| - options: ParserOptions?, |
535 |
| - columns: ColumnsSelector<T, Any?>, |
536 |
| -): DataFrame<T> = |
537 |
| - coroutineScope { |
538 |
| - val convertedCols = getColumnsWithPaths(columns).map { col -> |
539 |
| - async { |
540 |
| - when { |
541 |
| - // when a frame column is requested to be parsed, |
542 |
| - // parse each value/frame column at any depth inside each DataFrame in the frame column |
543 |
| - col.isFrameColumn() -> |
544 |
| - col.values.map { |
545 |
| - async { |
546 |
| - it.parseParallel(options) { |
547 |
| - colsAtAnyDepth { !it.isColumnGroup() } |
548 |
| - } |
549 |
| - } |
550 |
| - }.awaitAll() |
551 |
| - .toColumn(col.name) |
552 |
| - |
553 |
| - // when a column group is requested to be parsed, |
554 |
| - // parse each column in the group |
555 |
| - col.isColumnGroup() -> |
556 |
| - col.parseParallel(options) { all() } |
557 |
| - .asColumnGroup(col.name()) |
558 |
| - .asDataColumn() |
559 |
| - |
560 |
| - // Base case, parse the column if it's a `String?` column |
561 |
| - col.isSubtypeOf<String?>() -> |
562 |
| - col.cast<String?>().tryParse(options) |
563 |
| - |
564 |
| - else -> col |
565 |
| - }.let { ColumnToInsert(col.path, it) } |
566 |
| - } |
567 |
| - }.awaitAll() |
568 |
| - |
569 |
| - emptyDataFrame<T>().insertImpl(convertedCols) |
| 526 | +internal fun <T> DataFrame<T>.parseImpl(options: ParserOptions?, columns: ColumnsSelector<T, Any?>): DataFrame<T> { |
| 527 | + val convertedCols = getColumnsWithPaths(columns).map { col -> |
| 528 | + when { |
| 529 | + // when a frame column is requested to be parsed, |
| 530 | + // parse each value/frame column at any depth inside each DataFrame in the frame column |
| 531 | + col.isFrameColumn() -> |
| 532 | + col.values.map { |
| 533 | + it.parseImpl(options) { |
| 534 | + colsAtAnyDepth { !it.isColumnGroup() } |
| 535 | + } |
| 536 | + }.toColumn(col.name) |
| 537 | + |
| 538 | + // when a column group is requested to be parsed, |
| 539 | + // parse each column in the group |
| 540 | + col.isColumnGroup() -> |
| 541 | + col.parseImpl(options) { all() } |
| 542 | + .asColumnGroup(col.name()) |
| 543 | + .asDataColumn() |
| 544 | + |
| 545 | + // Base case, parse the column if it's a `String?` column |
| 546 | + col.isSubtypeOf<String?>() -> |
| 547 | + col.cast<String?>().tryParse(options) |
| 548 | + |
| 549 | + else -> col |
| 550 | + }.let { ColumnToInsert(col.path, it) } |
570 | 551 | }
|
| 552 | + |
| 553 | + return emptyDataFrame<T>().insertImpl(convertedCols) |
| 554 | +} |
0 commit comments