Skip to content

Commit

Permalink
#362 Add unit tests for the new feature.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Feb 23, 2024
1 parent f9a9b97 commit 4a708c1
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ trait RdbExampleTable {

object RdbExampleTable {
object Company extends RdbExampleTable {
val tableName: String = "company"
val tableName: String = "COMPANY"
val schemaName: String = "PUBLIC"
val databaseName: String = "PUBLIC"

val ddl: String =
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@ import za.co.absa.pramen.core.fixtures.{RelationalDbFixture, TextComparisonFixtu
import za.co.absa.pramen.core.reader.model.JdbcConfig
import za.co.absa.pramen.core.samples.RdbExampleTable
import za.co.absa.pramen.core.utils.JdbcSparkUtils
import za.co.absa.pramen.core.utils.JdbcSparkUtils.getJdbcOptions
import za.co.absa.pramen.core.utils.impl.JdbcFieldMetadata

class JdbcSparkUtilsSuite extends AnyWordSpec with BeforeAndAfterAll with SparkTestBase with RelationalDbFixture with TextComparisonFixture {

import spark.implicits._

val jdbcConfig: JdbcConfig = JdbcConfig(driver, Some(url), Nil, None, Some(user), Some(password))
Expand Down Expand Up @@ -76,6 +78,62 @@ class JdbcSparkUtilsSuite extends AnyWordSpec with BeforeAndAfterAll with SparkT
}
}

"addColumnDescriptionsFromJdbc" should {
"add comments and keep custom metadata intact" in {
val df = spark.read
.format("jdbc")
.options(getJdbcOptions(url, jdbcConfig, RdbExampleTable.Company.tableName, Map.empty))
.load()

var newSchema: StructType = null

JdbcSparkUtils.withJdbcMetadata(jdbcConfig, s"SELECT * FROM ${RdbExampleTable.Company.tableName}") { (connection, metadataRs) =>
newSchema = JdbcSparkUtils.addColumnDescriptionsFromJdbc(
JdbcSparkUtils.addMetadataFromJdbc(df.schema, metadataRs),
RdbExampleTable.Company.tableName,
connection)
}

assert(newSchema.fields(0).name == "ID")
assert(newSchema.fields(0).metadata.getString("comment") == "This is the record id")
assert(newSchema.fields(1).name == "NAME")
assert(newSchema.fields(1).metadata.getString("comment") == "This is company name")
assert(newSchema.fields(1).metadata.getLong("maxLength") == 50)
}
}

"getColumnMetadata" should {
"work when just table name is specified" in {
val metadataRs = JdbcSparkUtils.getColumnMetadata(RdbExampleTable.Company.tableName, getConnection)

assert(metadataRs.next())
metadataRs.close()
}

"work when just table name is specified in wrong case" in {
val metadataRs = JdbcSparkUtils.getColumnMetadata("company", getConnection)

assert(metadataRs.next())
metadataRs.close()
}

"work when schema and table name are specified" in {
val tableName = s"${RdbExampleTable.Company.schemaName}.${RdbExampleTable.Company.tableName}"
val metadataRs = JdbcSparkUtils.getColumnMetadata(tableName, getConnection)

assert(metadataRs.next())
metadataRs.close()
}

"work when database, schema, and table name are specified" in {
val tableName = s"${RdbExampleTable.Company.databaseName}.${RdbExampleTable.Company.schemaName}.${RdbExampleTable.Company.tableName}"
val metadataRs = JdbcSparkUtils.getColumnMetadata(tableName, getConnection)

assert(metadataRs.next())
metadataRs.close()
}
}

"withJdbcMetadata" should {
"provide the metadata object for the query" in {
JdbcSparkUtils.withJdbcMetadata(jdbcConfig, s"SELECT * FROM ${RdbExampleTable.Company.tableName}") { (connection, metadata) =>
Expand Down

0 comments on commit 4a708c1

Please sign in to comment.