Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#552 Fix Hive DDL for tables with long descriptions with '\n' characters. #554

Merged
merged 1 commit into from
Feb 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ object SparkUtils {
val MAX_LENGTH_METADATA_KEY = "maxLength"
val COMMENT_METADATA_KEY = "comment"

// This seems to be limitation for multiple catalogs, like Glue and Hive.
val MAX_COMMENT_LENGTH = 255

/** Get Spark StructType from a case class. */
def getStructType[T: TypeTag]: StructType = ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]

Expand Down Expand Up @@ -223,10 +226,21 @@ object SparkUtils {
*/
def transformSchemaForCatalog(schema: StructType): StructType = {
def transformField(field: StructField): StructField = {
val metadata = if (field.metadata.contains("comment")) {
val comment = field.metadata.getString("comment")
val fixedComment = sanitizeCommentForHiveDDL(comment)
new MetadataBuilder()
.withMetadata(field.metadata)
.putString("comment", fixedComment)
.build()
} else {
field.metadata
}

field.dataType match {
case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, field.metadata)
case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, field.metadata)
case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, field.metadata)
case struct: StructType => StructField(field.name, transformStruct(struct), nullable = true, metadata)
case arr: ArrayType => StructField(field.name, transformArray(arr, field), nullable = true, metadata)
case dataType: DataType => StructField(field.name, transformPrimitive(dataType, field), nullable = true, metadata)
}
}

Expand Down Expand Up @@ -277,6 +291,27 @@ object SparkUtils {
}
}

/**
* Sanitizes a comment for Hive DDL. Ideally this should be done by Spark, but because there are meny versions
* of Hive and other catalogs, it is sometimes hard to have an general solution.
*
* These transformations are tested for Hive 1.0.
*
* @param comment The comment (description) of a column or a table.
* @return
*/
def sanitizeCommentForHiveDDL(comment: String): String = {
val escapedComment = comment
.replace("\n", " ") // This breaks DBeaver (shows no columns)
.replace("\\n", " ") // This breaks DBeaver (shows no columns)

if (escapedComment.length > MAX_COMMENT_LENGTH) {
s"${escapedComment.take(MAX_COMMENT_LENGTH - 3)}..."
} else {
escapedComment
}
}

/**
* Removes metadata of nested fields to make DDL compatible with some Hive-like catalogs.
* In addition, removes the nullability flag for all fields.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
queryExecutor.execute(sqlHiveRepair)
}

private def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = {
private[core] def getTableDDL(schema: StructType, partitionBy: Seq[String]): String = {
val partitionColsLower = partitionBy.map(_.toLowerCase())

val nonPartitionFields = SparkUtils.transformSchemaForCatalog(schema)
Expand All @@ -128,7 +128,7 @@ class HiveHelperSql(val queryExecutor: QueryExecutor,
}
}

private def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = {
private[core] def getPartitionDDL(schema: StructType, partitionBy: Seq[String]): String = {
if (partitionBy.isEmpty) {
""
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,25 @@ class SparkUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture
}
}

"sanitizeCommentForHiveDDL" should {
"replace line ending characters" in {
val comment = "line1\nline2\\nline3"

val actual = sanitizeCommentForHiveDDL(comment)

assert(actual == "line1 line2 line3")
}

"truncate long comments" in {
val comment = "a" * 500

val actual = sanitizeCommentForHiveDDL(comment)

assert(actual.length == 255)
assert(actual.endsWith("a..."))
}
}

"getLengthFromMetadata" should {
"return length for long type" in {
val metadata = new MetadataBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.typesafe.config.ConfigFactory
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.types.{IntegerType, MetadataBuilder, StringType, StructField, StructType}
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
Expand Down Expand Up @@ -156,6 +157,24 @@ class HiveHelperSqlSuite extends AnyWordSpec with SparkTestBase with TempDirFixt
}
}

"getTableDDL()" should {
"work with columns with descriptions with line ending characters (regression)" in {
val schema = StructType(Seq(
StructField("a", StringType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build()),
StructField("b", IntegerType, nullable = true, new MetadataBuilder().putString("comment", "line1'?'\nline2?").build())
))

val expected = "`a` STRING COMMENT 'line1\\'?\\' line2?'"

val qe = new QueryExecutorMock(tableExists = true)
val hiveHelper = new HiveHelperSql(qe, defaultHiveConfig, true)

val actual = hiveHelper.getTableDDL(schema, Seq("b"))

compareText(actual, expected)
}
}

private def getParquetPath(tempBaseDir: String, partitionBy: Seq[String] = Nil): String = {
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, "file:///")

Expand Down
Loading