Skip to content

Commit

Permalink
#666 Implement record splitting based on record length field mapping.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Apr 22, 2024
1 parent 896c885 commit 0d5ee08
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 26 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,13 @@ or
.option("record_length_field", "FIELD1 * 10 + 200")
```

If the record field contains a string that can be mapped to a record size, you can add the mapping as a JSON:
```
.option("record_format", "F")
.option("record_length_field", "FIELD_STR")
.option("record_length_map", """{"SEG1":100,"SEG2":200}""")
```

### Use cases for various variable length formats

In order to understand the file format it is often sufficient to look at the first 4 bytes of the file (un case of RDW only files),
Expand Down Expand Up @@ -1547,7 +1554,7 @@ The output looks like this:
| .option("bdw_adjustment", 0) | If there is a mismatch between BDW and record length this option can be used to adjust the difference. |
| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. |
| .option("record_length_field", "RECORD-LEN") | Specifies a record length field or expression to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. The `record_format` should be set to `F`. This option is incompatible with `is_record_sequence`. |
| .option("record_length_map", """{"A":100}""") | Specifies a mapping between record length field values and actual record lengths. |
| .option("record_length_map", """{"A":100,"B":50}""") | Specifies a mapping between record length field values and actual record lengths. |
| .option("record_extractor", "com.example.record.extractor") | Specifies a class for parsing record in a custom way. The class must inherit `RawRecordExtractor` and `Serializable` traits. See the chapter on record extractors above. |
| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. |
| .option("maximum_record_length", 1000) | Specifies the maximum length a record is considered valid, will be skipped otherwise. |
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.iterator

import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator

case class RecordLengthField(
field: Primitive,
valueMap: Map[String, Int]
)
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,9 @@ class VRLRecordReader(cobolSchema: Copybook,
private var recordIndex = startRecordId - 1

private val copyBookRecordSize = cobolSchema.getRecordSize
private val (lengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, cobolSchema)
private val (recordLengthField, lengthFieldExpr) = ReaderParametersValidator.getEitherFieldAndExpression(readerProperties.lengthFieldExpression, readerProperties.lengthFieldMap, cobolSchema)
private val lengthField = recordLengthField.map(_.field)
private val lengthMap = recordLengthField.map(_.valueMap).getOrElse(Map.empty)
private val segmentIdField = ReaderParametersValidator.getSegmentIdField(readerProperties.multisegment, cobolSchema)
private val recordLengthAdjustment = readerProperties.rdwAdjustment
private val useRdw = lengthField.isEmpty && lengthFieldExpr.isEmpty
Expand Down Expand Up @@ -130,12 +132,22 @@ class VRLRecordReader(cobolSchema: Copybook,

val recordLength = lengthField match {
case Some(lengthAST) =>
cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => i + recordLengthAdjustment
case l: Long => l.toInt + recordLengthAdjustment
case s: String => s.toInt + recordLengthAdjustment
val length = cobolSchema.extractPrimitiveField(lengthAST, binaryDataStart, readerProperties.startOffset) match {
case i: Int => i
case l: Long => l.toInt
case s: String =>
if (lengthMap.isEmpty) {
s.toInt
} else {
lengthMap.get(s) match {
case Some(len) => len
case None => throw new IllegalStateException(s"Record length value '$s' is not mapped to a record length.")
}
}

case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.")
}
length + recordLengthAdjustment
case None => copyBookRecordSize
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,22 @@ package za.co.absa.cobrix.cobol.reader.validator
import za.co.absa.cobrix.cobol.parser.Copybook
import za.co.absa.cobrix.cobol.parser.ast.Primitive
import za.co.absa.cobrix.cobol.parser.expression.NumberExprEvaluator
import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression
import za.co.absa.cobrix.cobol.reader.iterator.{RecordLengthExpression, RecordLengthField}
import za.co.absa.cobrix.cobol.reader.parameters.MultisegmentParameters

import scala.util.Try

object ReaderParametersValidator {

def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], cobolSchema: Copybook): (Option[Primitive], Option[RecordLengthExpression]) = {
def getEitherFieldAndExpression(fieldOrExpressionOpt: Option[String], recordLengthMap: Map[String, Int], cobolSchema: Copybook): (Option[RecordLengthField], Option[RecordLengthExpression]) = {
fieldOrExpressionOpt match {
case Some(fieldOrExpression) =>
val canBeExpression = fieldOrExpression.exists(c => "+-*/".contains(c))

if (canBeExpression && Try(cobolSchema.getFieldByName(fieldOrExpression)).isSuccess) {
(getLengthField(fieldOrExpression, cobolSchema), None)
(getLengthField(fieldOrExpression, recordLengthMap, cobolSchema), None)
} else {
(None, getLengthFieldExpr(fieldOrExpression, cobolSchema))
(None, getLengthFieldExpr(fieldOrExpression, recordLengthMap, cobolSchema))
}
case None =>
(None, None)
Expand All @@ -43,13 +43,13 @@ object ReaderParametersValidator {
}

@throws(classOf[IllegalStateException])
def getLengthField(recordLengthFieldName: String, cobolSchema: Copybook): Option[Primitive] = {
def getLengthField(recordLengthFieldName: String, recordLengthMap: Map[String, Int], cobolSchema: Copybook): Option[RecordLengthField] = {
val field = cobolSchema.getFieldByName(recordLengthFieldName)

val astNode = field match {
case s: Primitive =>
if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral]) {
throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type.")
if (!s.dataType.isInstanceOf[za.co.absa.cobrix.cobol.parser.ast.datatype.Integral] && recordLengthMap.isEmpty) {
throw new IllegalStateException(s"The record length field $recordLengthFieldName must be an integral type or a value mapping must be specified.")
}
if (s.occurs.isDefined && s.occurs.get > 1) {
throw new IllegalStateException(s"The record length field '$recordLengthFieldName' cannot be an array.")
Expand All @@ -58,17 +58,17 @@ object ReaderParametersValidator {
case _ =>
throw new IllegalStateException(s"The record length field $recordLengthFieldName must have an primitive integral type.")
}
Some(astNode)
Some(RecordLengthField(astNode, recordLengthMap))
}

@throws(classOf[IllegalStateException])
def getLengthFieldExpr(recordLengthFieldExpr: String, cobolSchema: Copybook): Option[RecordLengthExpression] = {
def getLengthFieldExpr(recordLengthFieldExpr: String, recordLengthMap: Map[String, Int], cobolSchema: Copybook): Option[RecordLengthExpression] = {
val evaluator = new NumberExprEvaluator(recordLengthFieldExpr)
val vars = evaluator.getVariables
val fields = vars.map { field =>
val primitive = getLengthField(field, cobolSchema)
val primitive = getLengthField(field, recordLengthMap, cobolSchema)
.getOrElse(throw new IllegalArgumentException(s"The record length expression '$recordLengthFieldExpr' contains an unknown field '$field'."))
(field, primitive)
(field, primitive.field)
}
val requiredBytesToRead = if (fields.nonEmpty) {
fields.map { case (_, field) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ class VRLRecordReaderSpec extends AnyWordSpec {
lengthFieldExpression = Some("LEN"))
}

assert(ex.getMessage == "The record length field LEN must be an integral type.")
assert(ex.getMessage == "The record length field LEN must be an integral type or a value mapping must be specified.")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPointFormat
import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, FillerNamingPolicy, MetadataPolicy, StringTrimmingPolicy}
import za.co.absa.cobrix.cobol.parser.policies._
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat._
import za.co.absa.cobrix.cobol.reader.parameters._
Expand Down Expand Up @@ -926,8 +926,13 @@ object CobolParametersParser extends Logging {
@throws(classOf[IllegalArgumentException])
def getRecordLengthMappings(recordLengthMapJson: String): Map[String, Int] = {
val parser = new ParserJson()
parser.parseMap(recordLengthMapJson)
.toSeq // Converting to a non-lazy sequence first. If .mapValues() is used the map stays lazy and errors pop up later
val json = try {
parser.parseMap(recordLengthMapJson)
} catch {
case NonFatal(ex) => throw new IllegalArgumentException(s"Unable to parse record length mapping JSON.", ex)
}

json.toSeq // Converting to a non-lazy sequence first. If .mapValues() is used the map stays lazy and errors pop up later
.map { case (k, v) =>
val vInt = v match {
case num: Int => num
Expand All @@ -936,9 +941,9 @@ object CobolParametersParser extends Logging {
try {
str.toInt
} catch {
case NonFatal(ex) => throw new IllegalArgumentException(s"Unsupported record length value: '$str'. Please, use numeric values only", ex)
case NonFatal(ex) => throw new IllegalArgumentException(s"Unsupported record length value: '$str'. Please, use numeric values only.", ex)
}
case any => throw new IllegalArgumentException(s"Unsupported record length value: '$any'. Please, use numeric values only")
case any => throw new IllegalArgumentException(s"Unsupported record length value: '$any'. Please, use numeric values only.")
}
(k, vInt)
}.toMap[String, Int]
Expand Down
1 change: 1 addition & 0 deletions spark-cobol/src/test/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ log4j.logger.za.co.absa.cobrix.spark.cobol.source.index.IndexBuilder$=ERROR
log4j.logger.za.co.absa.cobrix.spark.cobol.source.streaming.FileStreamer=ERROR
log4j.logger.za.co.absa.cobrix.spark.cobol.utils.FileUtils$=OFF
log4j.logger.za.co.absa.cobrix.spark.cobol.utils.FileUtils=OFF
log4j.logger.za.co.absa.cobrix.cobol.parser.antlr.ParserJson=OFF
3 changes: 3 additions & 0 deletions spark-cobol/src/test/resources/log4j2.properties
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,6 @@ logger.cobrix_file_utils1.level = OFF

logger.cobrix_file_utils2.name = za.co.absa.cobrix.cobol.utils.FileUtils
logger.cobrix_file_utils2.level = OFF

logger.parserjson.name = za.co.absa.cobrix.cobol.parser.antlr.ParserJson
logger.parserjson.level = OFF
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,22 @@ class ParametersParsingSpec extends AnyFunSuite {
val ex = intercept[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""{"A": "ABC"}""")
}
assert(ex.getMessage == "Unsupported record length value: 'ABC'. Please, use numeric values only")
assert(ex.getMessage == "Unsupported record length value: 'ABC'. Please, use numeric values only.")

val ex2 = intercept[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""{"A": {"B": 12}}""")
}
assert(ex2.getMessage == "Unsupported record length value: 'Map(B -> 12)'. Please, use numeric values only")
assert(ex2.getMessage == "Unsupported record length value: 'Map(B -> 12)'. Please, use numeric values only.")

assertThrows[IllegalArgumentException] {
val ex3 = intercept[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""{"A": {"B": 5000000000}}""")
}
assert(ex3.getMessage == "Unsupported record length value: 'Map(B -> 5.0E9)'. Please, use numeric values only.")

val ex4 = intercept[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""Hmm...""")
}
assert(ex4.getMessage == "Unable to parse record length mapping JSON.")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.spark.cobol.source.integration

import org.apache.spark.SparkException
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture

class Test37RecordLengthMappingSpec extends AnyWordSpec with SparkTestBase with BinaryFileFixture {
private val copybook =
""" 01 R.
03 SEG-ID PIC X(1).
03 TEXT PIC X(7).
"""

val dataSimple: Array[Byte] = Array(
0xC1, 0xF1, 0xF2, 0xF3, // record 0 'A123'
0xC2, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, // record 1 'B123456'
0xC3, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7).map(_.toByte) // record 2 'C1234567890'

val dataWithFileOffsets: Array[Byte] = Array(
0x00, // header
0xC1, 0xF1, 0xF2, 0xF3, // record 0 'A123'
0xC2, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, // record 1 'B123456'
0xC3, 0xF1, 0xF2, 0xF3, 0xF4, 0xF5, 0xF6, 0xF7, // record 2 'C1234567890'
0x00, 0x00).map(_.toByte) // footer

"data with record length mapping" should {
"work for simple mappings" in {
withTempBinFile("rdw_test", ".tmp", dataSimple) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""

val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("input_split_records", "2")
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)

val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",")

assert(actual == expected)
}
}

"work for data with offsets" in {
withTempBinFile("rdw_test", ".tmp", dataSimple) { tempFile =>
val expected = """{"SEG_ID":"A","TEXT":"123"},{"SEG_ID":"B","TEXT":"123456"},{"SEG_ID":"C","TEXT":"1234567"}"""

val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("file_start_offset", 1)
.option("file_end_offset", 2)
.option("pedantic", "true")
.option("record_length_map", """{"A":4,"B":7,"C":8}""")
.load(tempFile)

val actual = df.orderBy("SEG_ID").toJSON.collect().mkString(",")

assert(actual == expected)
}
}

"throw an exception for unknown mapping" in {
withTempBinFile("rdw_test", ".tmp", dataSimple) { tempFile =>
val df = spark.read
.format("cobol")
.option("copybook_contents", copybook)
.option("record_format", "F")
.option("record_length_field", "SEG-ID")
.option("record_length_map", """{"A":4,"B":7}""")
.option("pedantic", "true")
.load(tempFile)

val ex = intercept[SparkException] {
df.count()
}

assert(ex.getMessage.contains("Record length value 'C' is not mapped to a record length"))
}
}
}
}

0 comments on commit 0d5ee08

Please sign in to comment.