Skip to content

Commit

Permalink
#353 Fix cache path uniqueness and add integration tests, and descrip…
Browse files Browse the repository at this point in the history
…tion in README.
  • Loading branch information
yruslan committed Feb 15, 2024
1 parent 3f4eba5 commit 3160d50
Show file tree
Hide file tree
Showing 8 changed files with 187 additions and 10 deletions.
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,13 @@ is determined by the pipeline configuration.
# Fix the input precision interpretation (fixes errors like "Decimal precision 14 exceeds max precision 13")
correct.decimals.fix.precision = true
# This is an experimental feature, please use with caution.
# When set to true, Pramen won't query the source for the record count as a separate query. It will always fetch
# the data first, cache it in temporary directory first. This is used on very large tables for sources that require
# full scan on count queries (for example, Hive 1.0 on Map Reduce)
# By default, count queries are enabled.
#disable.count.query = true
# Specifies the maximum number of records to fetch. Good for testing purposes.
#limit.records = 100
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import za.co.absa.pramen.api.CachePolicy
import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY
import za.co.absa.pramen.core.utils.FsUtils

import java.time.LocalDate
import java.time.{Instant, LocalDate}
import scala.collection.mutable
import scala.util.Random

object TransientTableManager {
private val log = LoggerFactory.getLogger(this.getClass)
Expand Down Expand Up @@ -71,10 +72,11 @@ object TransientTableManager {
spark = df.sparkSession
}

val partitionFolder = s"temp_partition_date=$infoDate"
val outputPath = new Path(tempDir, partitionFolder).toString
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)

val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, outputPath)
val partitionFolder = s"temp_partition_date=$infoDate"
val uniquePath = createTimedTempDir(new Path(tempDir), fsUtils)
val outputPath = new Path(uniquePath, partitionFolder).toString

fsUtils.createDirectoryRecursive(new Path(outputPath))

Expand All @@ -92,6 +94,21 @@ object TransientTableManager {
(spark.read.parquet(outputPath), Option(sizeBytes))
}

private[core] def createTimedTempDir(parentDir: Path, fsUtils: FsUtils): Path = {
fsUtils.createDirectoryRecursive(parentDir)

var tempDir = new Path(parentDir, s"${Instant.now().toEpochMilli}_${Random.nextInt()}")

while (fsUtils.exists(tempDir)) {
Thread.sleep(1)
tempDir = new Path(parentDir, s"${Instant.now().toEpochMilli}_${Random.nextInt()}")
}

fsUtils.fs.mkdirs(tempDir)

tempDir
}

private[core] def hasDataForTheDate(tableName: String, infoDate: LocalDate): Boolean = synchronized {
val partition = getMetastorePartition(tableName, infoDate)

Expand Down Expand Up @@ -172,7 +189,9 @@ object TransientTableManager {
private[core] def getTempDirectory(cachePolicy: CachePolicy, conf: Config): Option[String] = {
if (cachePolicy == CachePolicy.Persist) {
if (conf.hasPath(TEMPORARY_DIRECTORY_KEY) && conf.getString(TEMPORARY_DIRECTORY_KEY).nonEmpty) {
Option(conf.getString(TEMPORARY_DIRECTORY_KEY))
Option(conf.getString(TEMPORARY_DIRECTORY_KEY)).map { basePath =>
new Path(basePath, "cache").toString
}
} else {
throw new IllegalArgumentException(s"Transient metastore tables with persist cache policy require temporary directory to be defined at: $TEMPORARY_DIRECTORY_KEY")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package za.co.absa.pramen.core.pipeline

import com.typesafe.config.Config
import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{DataFrame, SparkSession}
import za.co.absa.pramen.api.{Query, Reason, Source, SourceResult}
import za.co.absa.pramen.core.app.config.GeneralConfig.TEMPORARY_DIRECTORY_KEY
Expand Down Expand Up @@ -198,8 +199,9 @@ class IngestionJob(operationDef: OperationDef,
if (TransientTableManager.hasDataForTheDate(cacheTableName, from)) {
TransientTableManager.getDataForTheDate(cacheTableName, from)
} else {
val cacheDir = new Path(tempDirectory.get, "cache").toString
val sourceDf = getData(source, query, from, to).data
val (cachedDf, _) = TransientTableManager.addPersistedDataFrame(cacheTableName, from, sourceDf, tempDirectory.get)
val (cachedDf, _) = TransientTableManager.addPersistedDataFrame(cacheTableName, from, sourceDf, cacheDir)
cachedDf
}
}
Expand All @@ -209,7 +211,7 @@ class IngestionJob(operationDef: OperationDef,
* infoDateFrom is used as a second key to transient table manager, so it it not used to form the name.
*/
private def getVirtualTableName(query: Query, infoDateTo: LocalDate): String = {
s"jdbc://$sourceName|${query.query}|$infoDateTo"
s"source_cache://$sourceName|${query.query}|$infoDateTo"
}

private def processInsufficientDataCase(infoDate: LocalDate,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2022 ABSA Group Limited
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# This variable is expected to be set up by the test suite
#base.path = "/tmp"

pramen {
pipeline.name = "Integration test with count query disabled for the source"

temporary.directory = ${base.path}/temp

bookkeeping.enabled = false
stop.spark.session = false
}

pramen.metastore {
tables = [
{
name = "table1"
description = "Table 1"
format = "parquet"
path = ${base.path}/table1
}
]
}

pramen.sources.1 = [
{
name = "spark_source"
factory.class = "za.co.absa.pramen.core.source.SparkSource"

array.list = ["a", "b", "c"]

format = "csv"
disable.count.query = true

option {
header = true
}
}
]

pramen.operations = [
{
name = "Sourcing from a CSV file"
type = "ingestion"
schedule.type = "daily"

source = "spark_source"

info.date.expr = "@runDate"

tables = [
{
input.path = ${base.path}
output.metastore.table = table1
}
]
}
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.integration

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.hadoop.fs.Path
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.core.base.SparkTestBase
import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture}
import za.co.absa.pramen.core.runner.AppRunner
import za.co.absa.pramen.core.utils.{FsUtils, ResourceUtils}

import java.time.LocalDate

class DisableCountQueryLongSuite extends AnyWordSpec with SparkTestBase with TempDirFixture with TextComparisonFixture {
private val infoDate = LocalDate.of(2021, 2, 18)

"Inner transformer should" should {
val expected =
"""{"id":"1","name":"John"}
|{"id":"2","name":"Jack"}
|{"id":"3","name":"Jill"}
|""".stripMargin

"be able to access inner source configuration" in {
withTempDirectory("integration_disable_count_query") { tempDir =>
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir)

fsUtils.writeFile(new Path(tempDir, "landing_file1.csv"), "id,name\n1,John\n2,Jack\n3,Jill\n")

val conf = getConfig(tempDir)

val exitCode = AppRunner.runPipeline(conf)

assert(exitCode == 0)

val table1Path = new Path(new Path(tempDir, "table1"), s"pramen_info_date=$infoDate")

assert(fsUtils.exists(table1Path))

val df = spark.read.parquet(table1Path.toString)
val actual = df.orderBy("id").toJSON.collect().mkString("\n")

compareText(actual, expected)
}
}
}

def getConfig(basePath: String): Config = {
val configContents = ResourceUtils.getResourceString("/test/config/disable_count_query.conf")
val basePathEscaped = basePath.replace("\\", "\\\\")

val conf = ConfigFactory.parseString(
s"""base.path = "$basePathEscaped"
|pramen.runtime.is.rerun = true
|pramen.current.date = "$infoDate"
|$configContents
|""".stripMargin
).withFallback(ConfigFactory.load())
.resolve()

conf
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class TransientTableManagerSuite extends AnyWordSpec with BeforeAndAfterAll with
}

"return the directory for the persist policy" in {
assert(TransientTableManager.getTempDirectory(CachePolicy.Persist, conf).contains("/a/b/c"))
assert(TransientTableManager.getTempDirectory(CachePolicy.Persist, conf).contains("/a/b/c/cache"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis
val preRunCheck = job.preRunCheckJob(infoDate, conf, Seq.empty)
assert(preRunCheck.status == JobPreRunStatus.Ready)
assert(preRunCheck.inputRecordsCount.contains(4))
assert(TransientTableManager.hasDataForTheDate("jdbc://jdbc|company|2022-02-18", infoDate))
assert(TransientTableManager.hasDataForTheDate("source_cache://jdbc|company|2022-02-18", infoDate))

val runResult = job.run(infoDate, conf)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso
val preRunCheck = job.preRunCheckJob(infoDate, conf, Seq.empty)
assert(preRunCheck.status == JobPreRunStatus.Ready)
assert(preRunCheck.inputRecordsCount.contains(5))
assert(TransientTableManager.hasDataForTheDate("jdbc://testsource|table1|2022-01-18", infoDate))
assert(TransientTableManager.hasDataForTheDate("source_cache://testsource|table1|2022-01-18", infoDate))

val runResult = job.run(infoDate, conf)

Expand Down

0 comments on commit 3160d50

Please sign in to comment.