Skip to content

Commit

Permalink
Merge pull request #553 from AbsaOSS/#549-SparkSource-datetime-expres…
Browse files Browse the repository at this point in the history
…sions-parsing

 #549 SparkSource datetime expressions parsing
  • Loading branch information
yruslan authored Feb 17, 2025
2 parents 89f1bc3 + e1f746d commit 241c096
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,13 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
logConfiguration()

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val transformedQuery = TableReaderJdbcNative.applyInfoDateExpressionToQuery(query, infoDateBegin, infoDateEnd)
val start = Instant.now
val count = query match {
val count = transformedQuery match {
case Query.Table(tableName) =>
getCountForTable(tableName, infoDateBegin, infoDateEnd)
case Query.Sql(sql) =>
getCountForSql(sql, infoDateBegin, infoDateEnd)
getCountForSql(sql)
case other =>
throw new IllegalArgumentException(s"'${other.name}' is not supported by the JDBC reader. Use 'table' or 'sql' instead.")
}
Expand All @@ -55,11 +56,12 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
query match {
val transformedQuery = TableReaderJdbcNative.applyInfoDateExpressionToQuery(query, infoDateBegin, infoDateEnd)
transformedQuery match {
case Query.Table(tableName) =>
getDataForTable(tableName, infoDateBegin, infoDateEnd, columns)
case Query.Sql(sql) =>
getDataForSql(sql, infoDateBegin, infoDateEnd, columns)
getDataForSql(sql, columns)
case other =>
throw new IllegalArgumentException(s"'${other.name}' is not supported by the JDBC reader. Use 'table' or 'sql' instead.")
}
Expand Down Expand Up @@ -114,14 +116,12 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
)
}

private[core] def getCountSqlQuery(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)

sqlGen.getCountQueryForSql(filteredSql)
private[core] def getCountSqlQuery(sql: String): String = {
sqlGen.getCountQueryForSql(sql)
}

private[core] def getCountForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val countSql = getCountSqlQuery(sql, infoDateBegin, infoDateEnd)
private[core] def getCountForSql(sql: String): Long = {
val countSql = getCountSqlQuery(sql)
var count = 0L

log.info(s"Executing: $countSql")
Expand Down Expand Up @@ -171,9 +171,8 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
df
}

private[core] def getDataForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
getWithRetry[DataFrame](filteredSql, isDataQuery = true, jdbcRetries, None)(df => filterDfColumns(df, columns))
private[core] def getDataForSql(sql: String, columns: Seq[String]): DataFrame = {
getWithRetry[DataFrame](sql, isDataQuery = true, jdbcRetries, None)(df => filterDfColumns(df, columns))
}

private[core] def getDataFrame(sql: String, isDataQuery: Boolean, tableOpt: Option[String]): DataFrame = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,15 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val transformedQuery = TableReaderJdbcNative.applyInfoDateExpressionToQuery(query, infoDateBegin, infoDateEnd)

val start = Instant.now()
val sql = getFilteredSql(getSqlExpression(query), infoDateBegin, infoDateEnd)
val sql = transformedQuery match {
case Query.Sql(sql) => sql
case Query.Table(table) => getSqlDataQuery(table, infoDateBegin, infoDateEnd, Seq.empty)
case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the JDBC Native reader. Use 'sql' or 'table' instead.")
}

log.info(s"JDBC Native count of: $sql")
val count = JdbcNativeUtils.getJdbcNativeRecordCount(jdbcConfig, url, sql)
val finish = Instant.now()
Expand All @@ -57,9 +64,11 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
}

override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
log.info(s"JDBC Native data of: $query")
query match {
case Query.Sql(sql) => getDataFrame(getFilteredSql(sql, infoDateBegin, infoDateEnd), None)
val transformedQuery = TableReaderJdbcNative.applyInfoDateExpressionToQuery(query, infoDateBegin, infoDateEnd)

log.info(s"JDBC Native data of: $transformedQuery")
transformedQuery match {
case Query.Sql(sql) => getDataFrame(sql, None)
case Query.Table(table) => getDataFrame(getSqlDataQuery(table, infoDateBegin, infoDateEnd, columns), Option(table))
case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the JDBC Native reader. Use 'sql' or 'table' instead.")
}
Expand Down Expand Up @@ -170,7 +179,16 @@ object TableReaderJdbcNative {
}
}

def getFilteredSql(sqlExpression: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String = {
def applyInfoDateExpressionToQuery(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Query = {
query match {
case Query.Path(path) => Query.Path(applyInfoDateExpressionToString(path, infoDateBegin, infoDateEnd))
case Query.Table(table) => Query.Table(applyInfoDateExpressionToString(table, infoDateBegin, infoDateEnd))
case Query.Sql(sql) => Query.Sql(applyInfoDateExpressionToString(sql, infoDateBegin, infoDateEnd))
case other => other
}
}

def applyInfoDateExpressionToString(queryStr: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String = {
val expr = new DateExprEvaluator()

expr.setValue("dateFrom", infoDateBegin)
Expand All @@ -180,6 +198,6 @@ object TableReaderJdbcNative {
expr.setValue("infoDateEnd", infoDateEnd)
expr.setValue("infoDate", infoDateEnd)

StringUtils.replaceFormattedDateExpression(sqlExpression, expr)
StringUtils.replaceFormattedDateExpression(queryStr, expr)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@ import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetValue}
import za.co.absa.pramen.api.sql.SqlColumnType
import za.co.absa.pramen.api.{Query, TableReader}
import za.co.absa.pramen.core.expr.DateExprEvaluator
import za.co.absa.pramen.core.utils.DateUtils.fromIsoStrToDate
import za.co.absa.pramen.core.utils.StringUtils

import java.sql.Date
import java.time.LocalDate
Expand All @@ -43,30 +46,32 @@ class TableReaderSpark(formatOpt: Option[String],
private val dateFormatter = DateTimeFormatter.ofPattern(infoDateFormat)

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val transformedQuery = TableReaderJdbcNative.applyInfoDateExpressionToQuery(query, infoDateBegin, infoDateEnd)
if (hasInfoDateColumn) {
if (infoDateBegin.equals(infoDateEnd)) {
log.info(s"Reading COUNT(*) FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateBegin)}'")
getDailyDataFrame(query, infoDateBegin).count()
log.info(s"Reading COUNT(*) FROM ${transformedQuery.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateBegin)}'")
getDailyDataFrame(transformedQuery, infoDateBegin).count()
} else {
log.info(s"Reading COUNT(*) FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'")
getFilteredDataFrame(query, infoDateBegin, infoDateEnd).count()
log.info(s"Reading COUNT(*) FROM ${transformedQuery.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'")
getFilteredDataFrame(transformedQuery, infoDateBegin, infoDateEnd).count()
}
} else {
getBaseDataFrame(query).count()
getBaseDataFrame(transformedQuery).count()
}
}

override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
val transformedQuery = TableReaderJdbcNative.applyInfoDateExpressionToQuery(query, infoDateBegin, infoDateEnd)
if (hasInfoDateColumn) {
if (infoDateBegin.equals(infoDateEnd)) {
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateEnd)}'")
getDailyDataFrame(query, infoDateEnd)
log.info(s"Reading * FROM ${transformedQuery.query} WHERE $infoDateColumn='${dateFormatter.format(infoDateEnd)}'")
getDailyDataFrame(transformedQuery, infoDateEnd)
} else {
log.info(s"Reading * FROM ${query.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'")
getFilteredDataFrame(query, infoDateBegin, infoDateEnd)
log.info(s"Reading * FROM ${transformedQuery.query} WHERE $infoDateColumn BETWEEN '${dateFormatter.format(infoDateBegin)}' AND '${dateFormatter.format(infoDateEnd)}'")
getFilteredDataFrame(transformedQuery, infoDateBegin, infoDateEnd)
}
} else {
getBaseDataFrame(query)
getBaseDataFrame(transformedQuery)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.RelationalDbFixture
import za.co.absa.pramen.core.mocks.SqlGeneratorDummy
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.reader.{JdbcUrlSelector, TableReaderJdbc}
import za.co.absa.pramen.core.reader.{JdbcUrlSelector, TableReaderJdbc, TableReaderJdbcNative}
import za.co.absa.pramen.core.samples.RdbExampleTable
import za.co.absa.pramen.core.sql.SqlGeneratorHsqlDb
import za.co.absa.pramen.core.utils.SparkUtils.{COMMENT_METADATA_KEY, MAX_LENGTH_METADATA_KEY}
Expand Down Expand Up @@ -368,7 +368,7 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark
val testConfig = conf
val reader = TableReaderJdbc(testConfig.getConfig("reader"), testConfig.getConfig("reader"), "reader")

val sql = reader.getCountSqlQuery("SELECT * FROM COMPANY", infoDate, infoDate)
val sql = reader.getCountSqlQuery("SELECT * FROM COMPANY")

assert(sql == "SELECT COUNT(*) FROM (SELECT * FROM COMPANY)")
}
Expand All @@ -382,7 +382,9 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark

val reader = TableReaderJdbc(testConfig, testConfig, "reader")

val sql = reader.getCountSqlQuery("SELECT * FROM COMPANY WHERE info_date BETWEEN '@dateFrom' AND '@dateTo'", infoDate, infoDate)
val sqlTemplate = "SELECT * FROM COMPANY WHERE info_date BETWEEN '@dateFrom' AND '@dateTo'"
val sqlIn = TableReaderJdbcNative.applyInfoDateExpressionToString(sqlTemplate, infoDate, infoDate)
val sql = reader.getCountSqlQuery(sqlIn)

assert(sql == "SELECT COUNT(*) FROM (SELECT * FROM COMPANY WHERE info_date BETWEEN '2022-02-18' AND '2022-02-18')")
}
Expand All @@ -397,7 +399,9 @@ class TableReaderJdbcSuite extends AnyWordSpec with BeforeAndAfterAll with Spark

val reader = TableReaderJdbc(testConfig, testConfig, "reader")

val sql = reader.getCountSqlQuery("SELECT * FROM my_db.my_table WHERE info_date = CAST(REPLACE(CAST(CAST('@infoDate' AS DATE) AS VARCHAR(10)), '-', '') AS INTEGER)", infoDate, infoDate)
val sqlTemplate = "SELECT * FROM my_db.my_table WHERE info_date = CAST(REPLACE(CAST(CAST('@infoDate' AS DATE) AS VARCHAR(10)), '-', '') AS INTEGER)"
val sqlIn = TableReaderJdbcNative.applyInfoDateExpressionToString(sqlTemplate, infoDate, infoDate)
val sql = reader.getCountSqlQuery(sqlIn)

assert(sql == "SELECT COUNT(*) FROM (SELECT * FROM my_db.my_table WHERE info_date = CAST(REPLACE(CAST(CAST('2022-02-18' AS DATE) AS VARCHAR(10)), '-', '') AS INTEGER)) AS query")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import za.co.absa.pramen.api.{Query, TableReader}
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.TempDirFixture
import za.co.absa.pramen.core.mocks.reader.TableReaderSparkFactory
import za.co.absa.pramen.core.reader.TableReaderSpark
import za.co.absa.pramen.core.reader.{TableReaderJdbcNative, TableReaderSpark}
import za.co.absa.pramen.core.utils.FsUtils

import java.time.LocalDate
Expand Down Expand Up @@ -307,6 +307,26 @@ class TableReaderSparkSuite extends AnyWordSpec with SparkTestBase with TempDirF
}
}

"getFilteredQuery()" should {
"return pure query expretion with parsed date expration" in {
case class TestCase(queryExpression: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, expected: String)

val testCases = Seq(
TestCase("SELECT * FROM table_@infoDate%yyyyMMdd%", infoDate1, infoDate1, "SELECT * FROM table_20220805"),
TestCase("table1_@infoDate%yyyyMMdd%", infoDate1, infoDate1, "table1_20220805"),
TestCase("/some/path-@infoDate%yyyy-MM-dd%/", infoDate1, infoDate1, "/some/path-2022-08-05/"),
TestCase("/some/path-@{plusMonths(@infoDate, 1)}%yyyy-MM-dd%/", infoDate1, infoDate1, "/some/path-2022-09-05/")
)

testCases.foreach { testCase =>
val filteredQuery = TableReaderJdbcNative.applyInfoDateExpressionToString(
testCase.queryExpression, testCase.infoDateBegin, testCase.infoDateEnd
)
assert(filteredQuery == testCase.expected)
}
}
}

private def getUseCase(tempDir: String,
formatOpt: Option[String] = Some("csv"),
createData: Boolean = true,
Expand Down

0 comments on commit 241c096

Please sign in to comment.