Skip to content

Commit 8738e3a

Browse files
committed
WIP
1 parent e545cf9 commit 8738e3a

File tree

6 files changed

+132
-10
lines changed

6 files changed

+132
-10
lines changed

README.md

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ Among the motivations for this project, it is possible to highlight:
3232

3333
- Supports REDEFINES, OCCURS and DEPENDING ON fields (e.g. unchecked unions and variable-size arrays)
3434

35-
- Supports nested structures and arrays (including "flattened" nested names)
35+
- Supports nested structures and arrays
3636

3737
- Supports HDFS as well as local file systems
3838

@@ -350,8 +350,18 @@ Currently, specifying multiple paths in `load()` is not supported. Use the follo
350350
### Spark SQL schema extraction
351351
This library also provides convenient methods to extract Spark SQL schemas and Cobol layouts from copybooks.
352352

353-
If you want to extract a Spark SQL schema from a copybook:
353+
If you want to extract a Spark SQL schema from a copybook by providing same options you provide to Spark:
354+
```scala
355+
// Same options that you use for spark.read.format("cobol").option()
356+
val options = Map("schema_retention_policy" -> "keep_original")
357+
358+
val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook), options)
359+
val sparkSchema = cobolSchema.getSparkSchema.toString()
360+
361+
println(sparkSchema)
362+
```
354363

364+
If you want to extract a Spark SQL schema from a copybook using the Cobol parser directly:
355365
```scala
356366
import za.co.absa.cobrix.cobol.parser.CopybookParser
357367
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
@@ -1397,6 +1407,74 @@ When using `9` 8 refers to the number of digits the number has. Here, the size o
13971407
```
13981408
You can have decimals when using COMP-3 as well.
13991409

1410+
### Flattening schema with GROUPs and OCCURS
1411+
Flattening could be helpful when migrating data from mainframe data with fields that have OCCURs (arrays) to a relational
1412+
databases that do not support nested arrays.
1413+
1414+
Cobrix has a method that can flatten the schema automatically given a DataFrame produced by `spark-cobol`.
1415+
1416+
Spark Scala example:
1417+
```scala
1418+
val dfFlat = SparkUtils.flattenSchema(df, useShortFieldNames = false)
1419+
```
1420+
1421+
PySpark example
1422+
```python
1423+
from pyspark.sql import SparkSession, DataFrame, SQLContext
1424+
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, ArrayType
1425+
from py4j.java_gateway import java_import
1426+
1427+
schema = StructType([
1428+
StructField("id", IntegerType(), True),
1429+
StructField("name", StringType(), True),
1430+
StructField("subjects", ArrayType(StringType()), True)
1431+
])
1432+
1433+
# Sample data
1434+
data = [
1435+
(1, "Alice", ["Math", "Science"]),
1436+
(2, "Bob", ["History", "Geography"]),
1437+
(3, "Charlie", ["English", "Math", "Physics"])
1438+
]
1439+
1440+
# Create a test DataFrame
1441+
df = spark.createDataFrame(data, schema)
1442+
1443+
# Show the Dataframe before flattening
1444+
df.show()
1445+
1446+
# Flatten the schema using Cobrix Scala 'SparkUtils.flattenSchema' method
1447+
sc = spark.sparkContext
1448+
java_import(sc._gateway.jvm, "za.co.absa.cobrix.spark.cobol.utils.SparkUtils")
1449+
dfFlatJvm = spark._jvm.SparkUtils.flattenSchema(df._jdf, False)
1450+
dfFlat = DataFrame(dfFlatJvm, SQLContext(sc))
1451+
1452+
# Show the Dataframe after flattening
1453+
dfFlat.show(truncate=False)
1454+
dfFlat.printSchema()
1455+
```
1456+
1457+
The output looks like this:
1458+
```
1459+
# Before flaattening
1460+
+---+-------+------------------------+
1461+
|id |name |subjects |
1462+
+---+-------+------------------------+
1463+
|1 |Alice |[Math, Science] |
1464+
|2 |Bob |[History, Geography] |
1465+
|3 |Charlie|[English, Math, Physics]|
1466+
+---+-------+------------------------+
1467+
1468+
# After flattenning
1469+
+---+-------+----------+----------+----------+
1470+
|id |name |subjects_0|subjects_1|subjects_2|
1471+
+---+-------+----------+----------+----------+
1472+
|1 |Alice |Math |Science |null |
1473+
|2 |Bob |History |Geography |null |
1474+
|3 |Charlie|English |Math |Physics |
1475+
+---+-------+----------+----------+----------+
1476+
```
1477+
14001478
## Summary of all available options
14011479

14021480
##### File reading options

cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,4 @@ object CobolSchema {
137137
case None => CodePage.getCodePageByName(codePageName)
138138
}
139139
}
140-
}
140+
}

cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,6 @@ class StringDecodersSpec extends AnyWordSpec {
123123

124124
val actual = decodeEbcdicString(bytes, KeepAll, new CodePage500, improvedNullDetection = false)
125125

126-
println(actual)
127-
128126
assert(actual == expected)
129127
}
130128

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,4 +341,4 @@ object CobolSchema {
341341

342342
CobolSchema.fromBaseReader(CobolReaderSchema.fromReaderParameters(copyBookContents, readerParameters))
343343
}
344-
}
344+
}

spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,4 +112,4 @@ class DefaultSource
112112
copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
113113
)
114114
}
115-
}
115+
}

spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616

1717
package za.co.absa.cobrix.spark.cobol
1818

19-
import org.apache.spark.sql.types.{ArrayType, IntegerType, StringType, StructType}
19+
import org.apache.spark.sql.types.{ArrayType, IntegerType, LongType, StringType, StructType}
2020
import org.scalatest.wordspec.AnyWordSpec
2121
import org.slf4j.{Logger, LoggerFactory}
2222
import za.co.absa.cobrix.cobol.parser.CopybookParser
@@ -483,8 +483,6 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
483483

484484
val sparkSchema = cobolSchema.getSparkSchema
485485

486-
sparkSchema.printTreeString()
487-
488486
assert(sparkSchema.fields.length == 3)
489487
assert(sparkSchema.fields.head.name == "HEADER")
490488
assert(sparkSchema.fields.head.dataType == StringType)
@@ -502,6 +500,54 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase {
502500
assert(seg1.fields(2).name == "SEG3")
503501
assert(seg1.fields(2).dataType.isInstanceOf[ArrayType])
504502
}
503+
504+
"return a schema for a multi-segment copybook" in {
505+
val copybook: String =
506+
""" 01 RECORD.
507+
| 05 HEADER PIC X(5).
508+
| 05 SEGMENT-ID PIC X(2).
509+
| 05 SEG1.
510+
| 10 FIELD1 PIC 9(7).
511+
| 05 SEG2 REDEFINES SEG1.
512+
| 10 FIELD3 PIC X(7).
513+
| 05 SEG3 REDEFINES SEG1.
514+
| 10 FIELD4 PIC S9(7).
515+
|""".stripMargin
516+
517+
val cobolSchema = CobolSchema.fromSparkOptions(Seq(copybook),
518+
Map(
519+
"segment_field" -> "SEGMENT-ID",
520+
"redefine-segment-id-map:0" -> "SEG1 => 01",
521+
"redefine-segment-id-map:1" -> "SEG2 => 02",
522+
"redefine-segment-id-map:2" -> "SEG3 => 03",
523+
"segment_field" -> "SEGMENT-ID",
524+
"segment_id_level0" -> "TEST",
525+
"generate_record_id" -> "true"
526+
)
527+
)
528+
529+
val sparkSchema = cobolSchema.getSparkSchema
530+
531+
assert(sparkSchema.fields.length == 9)
532+
assert(sparkSchema.fields.head.name == "File_Id")
533+
assert(sparkSchema.fields.head.dataType == IntegerType)
534+
assert(sparkSchema.fields(1).name == "Record_Id")
535+
assert(sparkSchema.fields(1).dataType == LongType)
536+
assert(sparkSchema.fields(2).name == "Record_Byte_Length")
537+
assert(sparkSchema.fields(2).dataType == IntegerType)
538+
assert(sparkSchema.fields(3).name == "Seg_Id0")
539+
assert(sparkSchema.fields(3).dataType == StringType)
540+
assert(sparkSchema.fields(4).name == "HEADER")
541+
assert(sparkSchema.fields(4).dataType == StringType)
542+
assert(sparkSchema.fields(5).name == "SEGMENT_ID")
543+
assert(sparkSchema.fields(5).dataType == StringType)
544+
assert(sparkSchema.fields(6).name == "SEG1")
545+
assert(sparkSchema.fields(6).dataType.isInstanceOf[StructType])
546+
assert(sparkSchema.fields(7).name == "SEG2")
547+
assert(sparkSchema.fields(7).dataType.isInstanceOf[StructType])
548+
assert(sparkSchema.fields(8).name == "SEG3")
549+
assert(sparkSchema.fields(8).dataType.isInstanceOf[StructType])
550+
}
505551
}
506552

507553
}

0 commit comments

Comments
 (0)