diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala index b01b54c65..a044056d9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/SparkUtils.scala @@ -41,6 +41,9 @@ object SparkUtils { val MAX_LENGTH_METADATA_KEY = "maxLength" val COMMENT_METADATA_KEY = "comment" + // This seems to be limitation for multiple catalogs, like Glue and Hive. + val MAX_COMMENT_LENGTH = 255 + /** Get Spark StructType from a case class. */ def getStructType[T: TypeTag]: StructType = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType] @@ -223,10 +226,21 @@ object SparkUtils { */ def transformSchemaForCatalog(schema: StructType): StructType = { def transformField(field: StructField): StructField = { + val metadata = if (field.metadata.contains("comment")) { + val comment = field.metadata.getString("comment") + val fixedComment = sanitizeCommentForHiveDDL(comment) + new MetadataBuilder() + .withMetadata(field.metadata) + .putString("comment", fixedComment) + .build() + } else { + field.metadata + } + field.dataType match { - case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, field.metadata) - case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, field.metadata) - case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, field.metadata) + case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, metadata) + case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, metadata) + case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, metadata) } } @@ -277,6 +291,27 @@ object SparkUtils { } } + /** + * Sanitizes a comment for Hive DDL. Ideally this should be done by Spark, but because there are meny versions + * of Hive and other catalogs, it is sometimes hard to have an general solution. + * + * These transformations are tested for Hive 1.0. + * + * @param comment The comment (description) of a column or a table. + * @return + */ + def sanitizeCommentForHiveDDL(comment: String): String = { + val escapedComment = comment + .replace("\n", " ") // This breaks DBeaver (shows no columns) + .replace("\\n", " ") // This breaks DBeaver (shows no columns) + + if (escapedComment.length > MAX_COMMENT_LENGTH) { + s"${escapedComment.take(MAX_COMMENT_LENGTH - 3)}..." + } else { + escapedComment + } + } + /** * Removes metadata of nested fields to make DDL compatible with some Hive-like catalogs. * In addition, removes the nullability flag for all fields. diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala index 1b12ecbf3..0869ee7e1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/hive/HiveHelperSql.scala @@ -114,7 +114,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor, queryExecutor.execute(sqlHiveRepair) } - private def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = { + private[core] def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = { val partitionColsLower = partitionBy.map(_.toLowerCase()) val nonPartitionFields = SparkUtils.transformSchemaForCatalog(schema) @@ -128,7 +128,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor, } } - private def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = { + private[core] def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = { if (partitionBy.isEmpty) { "" } else { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala index efd5cbf15..2b201a927 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/SparkUtilsSuite.scala @@ -442,6 +442,25 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture } } + "sanitizeCommentForHiveDDL" should { + "replace line ending characters" in { + val comment = "line1\nline2\\nline3" + + val actual = sanitizeCommentForHiveDDL(comment) + + assert(actual == "line1 line2 line3") + } + + "truncate long comments" in { + val comment = "a" * 500 + + val actual = sanitizeCommentForHiveDDL(comment) + + assert(actual.length == 255) + assert(actual.endsWith("a...")) + } + } + "getLengthFromMetadata" should { "return length for long type" in { val metadata = new MetadataBuilder diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala index 5e1df1c62..a29e5f1af 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/hive/HiveHelperSqlSuite.scala @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory import org.apache.hadoop.fs.Path import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions.lit +import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType} import org.scalatest.wordspec.AnyWordSpec import za.co.absa.pramen.core.base.SparkTestBase import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} @@ -156,6 +157,24 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt } } + "getTableDDL()" should { + "work with columns with descriptions with line ending characters (regression)" in { + val schema = StructType(Seq( + StructField("a", StringType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build()), + StructField("b", IntegerType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build()) + )) + + val expected = "`a` STRING COMMENT 'line1\\'?\\' line2?'" + + val qe = new QueryExecutorMock(tableExists = true) + val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true) + + val actual = hiveHelper.getTableDDL(schema, Seq("b")) + + compareText(actual, expected) + } + } + private def getParquetPath(tempBaseDir: String, partitionBy: Seq[String] = Nil): String = { val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, "file:///")