Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/support/1.9' into feature/merge-…
Browse files Browse the repository at this point in the history
…support-19-2

@ Conflicts:
@	pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/JobBase.scala
@	pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala
@	pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TransformationJob.scala
@	pramen/version.sbt
  • Loading branch information
yruslan committed Nov 1, 2024
2 parents a8e7a64 + e1ab9b0 commit 67390a3
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}

if (jdbcReaderConfig.enableSchemaMetadata) {
JdbcSparkUtils.withJdbcMetadata(jdbcReaderConfig.jdbcConfig, sql) { (connection, _) =>
JdbcSparkUtils.withJdbcMetadata(jdbcConfig, sql) { (connection, _) =>
val schemaWithColumnDescriptions = tableOpt match {
case Some(table) =>
log.info(s"Reading JDBC metadata descriptions the table: $table")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,30 +143,34 @@ object JdbcSparkUtils {
def getColumnMetadata(fullTableName: String, connection: Connection): ResultSet = {
val dbMetadata: DatabaseMetaData = connection.getMetaData

if (!dbMetadata.getColumns(null, null, fullTableName, null).next()) {
val parts = fullTableName.split('.')
if (parts.length == 3) {
// database, schema, and table table are all present
dbMetadata.getColumns(parts(0), parts(1), parts(2), null)
} else if (parts.length == 2) {
if (dbMetadata.getColumns(null, parts(0), parts(1), null).next()) {
dbMetadata.getColumns(null, parts(0), parts(1), null)
// schema and table only
} else {
// database and table only
dbMetadata.getColumns(parts(0), null, parts(1), null)
}
val parts = fullTableName.split('.')
if (parts.length == 3) {
// database, schema, and table table are all present
dbMetadata.getColumns(parts(0), parts(1), parts(2), null)
} else if (parts.length == 2) {
val rs = dbMetadata.getColumns(null, parts(0), parts(1), null)
if (rs.isBeforeFirst) {
rs
// schema and table only
} else {
// database and table only
dbMetadata.getColumns(parts(0), null, parts(1), null)
}
} else {
// Table only.
val rs = dbMetadata.getColumns(null, null, fullTableName, null)

if (rs.isBeforeFirst) {
rs
} else {
// Table only. The exact casing was already checked. Checking upper and lower casing in case
// the JDBC driver is case-sensitive, but objects ub db metadata are automatically upper- or lower- cased.
if (dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null).next())
dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null)
// The exact casing was already checked. Checking upper and lower casing in case
// the JDBC driver is case-sensitive, but objects ub db metadata are automatically upper- or lower- cased (HSQLDB).
val rsUpper = dbMetadata.getColumns(null, null, fullTableName.toUpperCase, null)
if (rsUpper.isBeforeFirst)
rsUpper
else
dbMetadata.getColumns(null, null, fullTableName.toLowerCase, null)
}
} else {
// table only
dbMetadata.getColumns(null, null, fullTableName, null)
}
}

Expand All @@ -185,10 +189,19 @@ object JdbcSparkUtils {

connection.setAutoCommit(false)

/** If not filtered out, some JDBC drivers will try to receive all data before closing the result set.
* ToDo Fix this properly using SQL generators by adding a generator for schema query. */
val q = if (nativeQuery.toLowerCase.contains(" where ")) {
nativeQuery + " AND 0=1"
} else {
nativeQuery + " WHERE 0=1"
}

log.info(s"Successfully connected to JDBC URL: $url")
log.info(s"Getting metadata for: $q")

try {
withMetadataResultSet(connection, nativeQuery) { rs =>
withMetadataResultSet(connection, q) { rs =>
action(connection, rs.getMetaData)
}
} finally {
Expand Down

0 comments on commit 67390a3

Please sign in to comment.