Skip to content

Commit

Permalink
#666 Add record length value mapping option to Cobrix.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Apr 22, 2024
1 parent 6f78ed2 commit 3aa4cf3
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 0 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,7 @@ The output looks like this:
| .option("bdw_adjustment", 0) | If there is a mismatch between BDW and record length this option can be used to adjust the difference. |
| .option("re_additional_info", "") | Passes a string as an additional info parameter passed to a custom record extractor to its constructor. |
| .option("record_length_field", "RECORD-LEN") | Specifies a record length field or expression to use instead of RDW. Use `rdw_adjustment` option if the record length field differs from the actual length by a fixed amount of bytes. The `record_format` should be set to `F`. This option is incompatible with `is_record_sequence`. |
| .option("record_length_map", """{"A":100}""") | Specifies a mapping between record length field values and actual record lengths. |
| .option("record_extractor", "com.example.record.extractor") | Specifies a class for parsing record in a custom way. The class must inherit `RawRecordExtractor` and `Serializable` traits. See the chapter on record extractors above. |
| .option("minimum_record_length", 1) | Specifies the minimum length a record is considered valid, will be skipped otherwise. |
| .option("maximum_record_length", 1000) | Specifies the maximum length a record is considered valid, will be skipped otherwise. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param minimumRecordLength Minium record length for which the record is considered valid.
* @param maximumRecordLength Maximum record length for which the record is considered valid.
* @param lengthFieldExpression A name of a field that contains record length. Optional. If not set the copybook record length will be used.
* @param lengthFieldMap Mapping between record length field values to actual length. The field name should be specified in lengthFieldExpression.
* @param isRecordSequence Does input files have 4 byte record length headers
* @param bdw Block descriptor word (if specified), for FB and VB record formats
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
Expand Down Expand Up @@ -88,6 +89,7 @@ case class ReaderParameters(
minimumRecordLength: Int = 1,
maximumRecordLength: Int = Int.MaxValue,
lengthFieldExpression: Option[String] = None,
lengthFieldMap: Map[String, Int] = Map.empty,
isRecordSequence: Boolean = false,
bdw: Option[Bdw] = None,
isRdwBigEndian: Boolean = false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ package za.co.absa.cobrix.cobol.reader.parameters
* @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser
* @param reAdditionalInfo An optional additional option string passed to a custom record extractor
* @param recordLengthField A field that stores record length
* @param recordLengthMap A mapping between field value and record size.
* @param fileStartOffset A number of bytes to skip at the beginning of each file
* @param fileEndOffset A number of bytes to skip at the end of each file
* @param generateRecordId Generate a sequential record number for each record to be able to retain the order of the original data
Expand All @@ -50,6 +51,7 @@ case class VariableLengthParameters(
rhpAdditionalInfo: Option[String],
reAdditionalInfo: String,
recordLengthField: String,
recordLengthMap: Map[String, Int],
fileStartOffset: Int,
fileEndOffset: Int,
generateRecordId: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten

import scala.collection.mutable
import scala.collection.mutable.ListBuffer
import scala.util.control.NonFatal

/**
* This class provides methods for parsing the parameters set as Spark options.
Expand All @@ -55,6 +56,7 @@ object CobolParametersParser extends Logging {
val PARAM_MAXIMUM_RECORD_LENGTH = "maximum_record_length"
val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence"
val PARAM_RECORD_LENGTH_FIELD = "record_length_field"
val PARAM_RECORD_LENGTH_MAP = "record_length_map"
val PARAM_RECORD_START_OFFSET = "record_start_offset"
val PARAM_RECORD_END_OFFSET = "record_end_offset"
val PARAM_FILE_START_OFFSET = "file_start_offset"
Expand Down Expand Up @@ -348,6 +350,7 @@ object CobolParametersParser extends Logging {
rhpAdditionalInfo = None,
reAdditionalInfo = "",
recordLengthField = "",
Map.empty,
fileStartOffset = 0,
fileEndOffset = 0,
generateRecordId = false,
Expand Down Expand Up @@ -380,6 +383,7 @@ object CobolParametersParser extends Logging {
minimumRecordLength = parameters.minimumRecordLength.getOrElse(1),
maximumRecordLength = parameters.maximumRecordLength.getOrElse(Int.MaxValue),
lengthFieldExpression = recordLengthField,
lengthFieldMap = varLenParams.recordLengthMap,
isRecordSequence = varLenParams.isRecordSequence,
bdw = varLenParams.bdw,
isRdwBigEndian = varLenParams.isRdwBigEndian,
Expand Down Expand Up @@ -461,6 +465,7 @@ object CobolParametersParser extends Logging {
params.get(PARAM_RHP_ADDITIONAL_INFO),
params.get(PARAM_RE_ADDITIONAL_INFO).getOrElse(""),
recordLengthFieldOpt.getOrElse(""),
getRecordLengthMappings(params.getOrElse(PARAM_RECORD_LENGTH_MAP, "{}")),
fileStartOffset,
fileEndOffset,
isRecordIdGenerationEnabled,
Expand Down Expand Up @@ -912,6 +917,33 @@ object CobolParametersParser extends Logging {
}
}

/**
* Parses the options for the record length mappings.
*
* @param recordLengthMapJson Parameters provided by spark.read.option(...)
* @return Returns a mapping from the record length field values to the actual record length
*/
@throws(classOf[IllegalArgumentException])
def getRecordLengthMappings(recordLengthMapJson: String): Map[String, Int] = {
val parser = new ParserJson()
parser.parseMap(recordLengthMapJson)
.toSeq // Converting to a non-lazy sequence first. If .mapValues() is used the map stays lazy and errors pop up later
.map { case (k, v) =>
val vInt = v match {
case num: Int => num
case num: Long => num.toInt
case str: String =>
try {
str.toInt
} catch {
case NonFatal(ex) => throw new IllegalArgumentException(s"Unsupported record length value: '$str'. Please, use numeric values only", ex)
}
case any => throw new IllegalArgumentException(s"Unsupported record length value: '$any'. Please, use numeric values only")
}
(k, vInt)
}.toMap[String, Int]
}

/**
* Parses the options for the occurs mappings.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,32 @@ class ParametersParsingSpec extends AnyFunSuite {
assert(fieldCodePageMap("field_3") == "us-ascii")
}

test("Test getRecordLengthMappings() works as expected") {
val map1 = CobolParametersParser.getRecordLengthMappings("""{}""")
assert(map1.isEmpty)

val map2 = CobolParametersParser.getRecordLengthMappings("""{"A": 12}""")
assert(map2("A") == 12)

val map3 = CobolParametersParser.getRecordLengthMappings("""{"0A1": "1234", "B": 122}""")
assert(map3("0A1") == 1234)
assert(map3("B") == 122)
}

test("Test getRecordLengthMappings() exceptional situations") {
val ex = intercept[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""{"A": "ABC"}""")
}
assert(ex.getMessage == "Unsupported record length value: 'ABC'. Please, use numeric values only")

val ex2 = intercept[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""{"A": {"B": 12}}""")
}
assert(ex2.getMessage == "Unsupported record length value: 'Map(B -> 12)'. Please, use numeric values only")

assertThrows[IllegalArgumentException] {
CobolParametersParser.getRecordLengthMappings("""{"A": {"B": 5000000000}}""")
}
}

}

0 comments on commit 3aa4cf3

Please sign in to comment.