Skip to content

Commit

Permalink
#399 Add ability to format dates in SQL expressions.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed May 6, 2024
1 parent 892a31a commit 72f4bce
Show file tree
Hide file tree
Showing 5 changed files with 150 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

private[core] def getCountForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd, infoDateFormatter)
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
getWithRetry[Long](filteredSql, isDataQuery = false, jdbcRetries, None)(df => df.count())
}

Expand All @@ -129,7 +129,7 @@ class TableReaderJdbc(jdbcReaderConfig: TableReaderJdbcConfig,
}

private[core] def getDataForSql(sql: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd, infoDateFormatter)
val filteredSql = TableReaderJdbcNative.getFilteredSql(sql, infoDateBegin, infoDateEnd)
getWithRetry[DataFrame](filteredSql, isDataQuery = true, jdbcRetries, None)(df => filterDfColumns(df, columns))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.Query
import za.co.absa.pramen.core.reader.model.TableReaderJdbcConfig
import za.co.absa.pramen.core.utils.{JdbcNativeUtils, TimeUtils}
import za.co.absa.pramen.core.utils.{JdbcNativeUtils, StringUtils, TimeUtils}

import java.time.format.DateTimeFormatter
import java.time.{Instant, LocalDate}
Expand All @@ -45,7 +45,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,

override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = {
val start = Instant.now()
val sql = getFilteredSql(getSqlExpression(query), infoDateBegin, infoDateEnd, infoDateFormatter)
val sql = getFilteredSql(getSqlExpression(query), infoDateBegin, infoDateEnd)
log.info(s"JDBC Native count of: $sql")
val count = JdbcNativeUtils.getJdbcNativeRecordCount(jdbcConfig, url, sql)
val finish = Instant.now()
Expand All @@ -57,7 +57,7 @@ class TableReaderJdbcNative(jdbcReaderConfig: TableReaderJdbcConfig,
override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): DataFrame = {
log.info(s"JDBC Native data of: $query")
query match {
case Query.Sql(sql) => getDataFrame(getFilteredSql(sql, infoDateBegin, infoDateEnd, infoDateFormatter))
case Query.Sql(sql) => getDataFrame(getFilteredSql(sql, infoDateBegin, infoDateEnd))
case Query.Table(table) => getDataFrame(getSqlDataQuery(table, infoDateBegin, infoDateEnd, columns))
case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the JDBC Native reader. Use 'sql' or 'table' instead.")
}
Expand Down Expand Up @@ -101,13 +101,12 @@ object TableReaderJdbcNative {
new TableReaderJdbcNative(tableReaderJdbc, urlSelector, conf)
}

def getFilteredSql(sqlExpression: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate, formatter: DateTimeFormatter): String = {
sqlExpression
.replaceAll("@dateFrom", formatter.format(infoDateBegin))
.replaceAll("@dateTo", formatter.format(infoDateEnd))
.replaceAll("@date", formatter.format(infoDateEnd))
.replaceAll("@infoDateBegin", formatter.format(infoDateBegin))
.replaceAll("@infoDateEnd", formatter.format(infoDateEnd))
.replaceAll("@infoDate", formatter.format(infoDateEnd))
def getFilteredSql(sqlExpression: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): String = {
val f1 = StringUtils.replaceFormattedDate(sqlExpression, "@dateFrom", infoDateBegin)
val f2 = StringUtils.replaceFormattedDate(f1, "@dateTo", infoDateEnd)
val f3 = StringUtils.replaceFormattedDate(f2, "@date", infoDateEnd)
val f4 = StringUtils.replaceFormattedDate(f3, "@infoDateBegin", infoDateBegin)
val f5 = StringUtils.replaceFormattedDate(f4, "@infoDateEnd", infoDateEnd)
StringUtils.replaceFormattedDate(f5, "@infoDate", infoDateEnd)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,11 +194,11 @@ object SparkUtils {

def applyFilters(df: DataFrame, filters: Seq[String], infoDate: LocalDate, dateFrom: LocalDate, dateTo: LocalDate): DataFrame = {
filters.foldLeft(df)((df, filter) => {
val actualFilter = filter
.replaceAll("@dateFrom", s"${dateFrom.toString}")
.replaceAll("@dateTo", s"${dateTo.toString}")
.replaceAll("@date", s"${infoDate.toString}")
.replaceAll("@infoDate", s"date'${infoDate.toString}'")
val f1 = StringUtils.replaceFormattedDate(filter, "@dateFrom", dateFrom)
val f2 = StringUtils.replaceFormattedDate(f1, "@dateTo", dateTo)
val f3 = StringUtils.replaceFormattedDate(f2, "@date", infoDate)
val actualFilter = f3.replaceAll("@infoDate", s"date'${infoDate.toString}'")

log.info(s"Applying filter: $actualFilter")
df.filter(expr(actualFilter))
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@

package za.co.absa.pramen.core.utils

import za.co.absa.pramen.core.exceptions.{OsSignalException, ThreadStackTrace}
import za.co.absa.pramen.core.exceptions.ThreadStackTrace
import za.co.absa.pramen.core.expr.DateExprEvaluator

import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.{Base64, StringTokenizer}
import scala.compat.Platform.EOL
import scala.util.control.NonFatal
Expand Down Expand Up @@ -193,4 +195,66 @@ object StringUtils {
base + details
}

def replaceFormattedDate(template: String, dateVar: String, date: LocalDate): String = {
val output = new StringBuilder()
val outputPartial = new StringBuilder()
var state = 0
var i = 0
var j = 0

val STATE_TEMPLATE_AS_IS = 0
val CATCH_VARIABLE = 1
val END_OF_VARIABLE = 2
val END_OF_FORMAT = 3

while (i < template.length) {
val c = template(i)
state match {
case STATE_TEMPLATE_AS_IS =>
if (c == dateVar(0)) {
state = CATCH_VARIABLE
j = 1
outputPartial.clear()
outputPartial.append(s"$c")
} else {
output.append(s"$c")
}
case CATCH_VARIABLE =>
outputPartial.append(s"$c")
if (c == dateVar(j)) {
j += 1
if (j == dateVar.length) {
state = END_OF_VARIABLE
if (i == template.length - 1) {
output.append(s"$date")
}
}
} else {
output.append(s"${outputPartial.toString()}")
outputPartial.clear()
state = STATE_TEMPLATE_AS_IS
}
case END_OF_VARIABLE =>
if (c == '%') {
state = END_OF_FORMAT
outputPartial.clear()
} else {
output.append(s"$date$c")
state = STATE_TEMPLATE_AS_IS
}
case END_OF_FORMAT =>
if (c == '%') {
state = STATE_TEMPLATE_AS_IS
val formatter = DateTimeFormatter.ofPattern(outputPartial.toString())
output.append(s"${formatter.format(date)}")
} else {
outputPartial.append(s"$c")
}
}
i += 1
}
output.toString()
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -227,4 +227,72 @@ class StringUtilsSuite extends AnyWordSpec {
}
}

"replaceFormattedDate" should {
val infoDate = LocalDate.of(2022, 2, 18)

"work with normal variables" in {
val template = "SELECT @dat FROM my_table_@date + 1"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "SELECT @dat FROM my_table_2022-02-18 + 1")
}

"work with variables at the end" in {
val template = "SELECT @dat FROM my_table_@date"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "SELECT @dat FROM my_table_2022-02-18")
}

"work with just variables" in {
val template = "@date"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "2022-02-18")
}

"work with 2 variables" in {
val template = "@date @date"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "2022-02-18 2022-02-18")
}

"work with formatted variables" in {
val template = "SELECT * FROM my_table_@date%yyyyMMdd% WHERE a = b"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "SELECT * FROM my_table_20220218 WHERE a = b")
}

"work with just formatted variables" in {
val template = "@date%yyyyMMdd%"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "20220218")
}

"work with 2 formatted variables" in {
val template = "@date%yyyyMMdd%@date%ddMMyyy%"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "2022021818022022")
}

"work with partial formatter" in {
val template = "@date%yyyyMM%"

val replaced = replaceFormattedDate(template, "@date", infoDate)

assert(replaced == "202202")
}
}

}

0 comments on commit 72f4bce

Please sign in to comment.