Skip to content

Commit f329141

Browse files
committed
#245 OnDemand -> Lazy everywhere since 'lazy' is a known concept. We could use either everywhere and chose 'lazy'.
1 parent 17bd1ab commit f329141

File tree

15 files changed

+89
-91
lines changed

15 files changed

+89
-91
lines changed

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2294,7 +2294,8 @@ The cache policy can be:
22942294
- `persist` - the table is persisted in the temporary directory for the duration of the pipeline run.
22952295

22962296
Transient tables are calculated on-demand by executing the operation that outputs to that table. This occurs when a
2297-
transformer or a sink invokes metastore.getTable() or metastore.getLatest(). Pramen ensures that if the same data is
2297+
transformer or a sink invokes metastore.getTable() or metastore.getLatest(). This is also known as 'lazy evaluation'
2298+
therefore jobs that output to transient tables are also known as 'lazy jobs'. Pramen ensures that if the same data is
22982299
required by multiple transformers (for the same job for the same date), the job will run only once.
22992300

23002301
### File-based sourcing

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ class MetastoreImpl(appConfig: Config,
4949

5050
override def isDataAvailable(tableName: String, infoDateFromOpt: Option[LocalDate], infoDateToOpt: Option[LocalDate]): Boolean = {
5151
val mt = getTableDef(tableName)
52-
val isOnDemand = mt.format.isLazy
52+
val isLazy = mt.format.isLazy
5353

54-
if (isOnDemand) {
54+
if (isLazy) {
5555
(infoDateFromOpt, infoDateToOpt) match {
5656
case (Some(infoDateFrom), Some(infoDateTo)) =>
5757
TransientJobManager.selectInfoDatesToExecute(tableName, infoDateFrom, infoDateTo).nonEmpty
@@ -76,8 +76,8 @@ class MetastoreImpl(appConfig: Config,
7676

7777
override def getLatest(tableName: String, until: Option[LocalDate]): DataFrame = {
7878
val mt = getTableDef(tableName)
79-
val isOnDemand = mt.format.isLazy
80-
if (isOnDemand) {
79+
val isLazy = mt.format.isLazy
80+
if (isLazy) {
8181
MetastorePersistence.fromMetaTable(mt, appConfig).loadTable(None, until)
8282
} else {
8383
bookkeeper.getLatestProcessedDate(tableName, until) match {

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceTransient.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -35,13 +35,13 @@ class MetastorePersistenceTransient(tempPath: Option[String],
3535
override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
3636
(infoDateFrom, infoDateTo) match {
3737
case (Some(from), Some(to)) if from == to =>
38-
runOnDemandTask(tableName, from)
38+
runLazyTask(tableName, from)
3939
case (Some(from), Some(to)) =>
40-
runOnDemandTasks(tableName, selectInfoDatesToExecute(tableName, from, to))
40+
runLazyTasks(tableName, selectInfoDatesToExecute(tableName, from, to))
4141
case (None, Some(until)) =>
42-
runOnDemandTask(tableName, selectLatestOnDemandSnapshot(tableName, until))
42+
runLazyTask(tableName, selectLatestLazySnapshot(tableName, until))
4343
case _ =>
44-
throw new IllegalArgumentException("Metastore 'on_demand' format requires info date for querying its contents.")
44+
throw new IllegalArgumentException("Metastore 'transient' format requires info date for querying its contents.")
4545
}
4646
}
4747

@@ -50,14 +50,14 @@ class MetastorePersistenceTransient(tempPath: Option[String],
5050
}
5151

5252
override def getStats(infoDate: LocalDate): MetaTableStats = {
53-
throw new UnsupportedOperationException("On demand format does not support getting record count and size statistics.")
53+
throw new UnsupportedOperationException("The 'transient' format does not support getting record count and size statistics.")
5454
}
5555

5656
override def createOrUpdateHiveTable(infoDate: LocalDate, hiveTableName: String, queryExecutor: QueryExecutor, hiveConfig: HiveConfig): Unit = {
57-
throw new UnsupportedOperationException("On demand format does not support Hive tables.")
57+
throw new UnsupportedOperationException("The 'transient' format does not support Hive tables.")
5858
}
5959

6060
override def repairHiveTable(hiveTableName: String, queryExecutor: QueryExecutor, hiveConfig: HiveConfig): Unit = {
61-
throw new UnsupportedOperationException("On demand format does not support Hive tables.")
61+
throw new UnsupportedOperationException("The 'transient' format does not support Hive tables.")
6262
}
6363
}

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientJobManager.scala

Lines changed: 25 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ object TransientJobManager {
3434
val MAXIMUM_UNIONS = 50
3535

3636
private val log = LoggerFactory.getLogger(this.getClass)
37-
private val onDemandJobs = new mutable.HashMap[String, Job]()
37+
private val lazyJobs = new mutable.HashMap[String, Job]()
3838
private val runningJobs = new mutable.HashMap[MetastorePartition, Future[DataFrame]]()
3939
private var taskRunnerOpt: Option[TaskRunner] = None
4040

@@ -46,8 +46,8 @@ object TransientJobManager {
4646
taskRunnerOpt.isDefined
4747
}
4848

49-
private[core] def addOnDemandJob(job: Job): Unit = synchronized {
50-
onDemandJobs += job.outputTable.name.toLowerCase -> job
49+
private[core] def addLazyJob(job: Job): Unit = synchronized {
50+
lazyJobs += job.outputTable.name.toLowerCase -> job
5151
}
5252

5353
private[core] def selectInfoDatesToExecute(outputTableName: String,
@@ -62,8 +62,8 @@ object TransientJobManager {
6262
job.operation.schedule)
6363
}
6464

65-
private[core] def selectLatestOnDemandSnapshot(outputTableName: String,
66-
infoDateUntil: LocalDate): LocalDate = {
65+
private[core] def selectLatestLazySnapshot(outputTableName: String,
66+
infoDateUntil: LocalDate): LocalDate = {
6767
val job = getJob(outputTableName)
6868

6969
ScheduleStrategyUtils.getLatestActiveInfoDate(outputTableName,
@@ -72,10 +72,10 @@ object TransientJobManager {
7272
job.operation.schedule)
7373
}
7474

75-
private[core] def runOnDemandTasks(outputTableName: String,
76-
infoDates: Seq[LocalDate])
77-
(implicit spark: SparkSession): DataFrame = {
78-
val dfs = infoDates.map(infoDate => runOnDemandTask(outputTableName, infoDate))
75+
private[core] def runLazyTasks(outputTableName: String,
76+
infoDates: Seq[LocalDate])
77+
(implicit spark: SparkSession): DataFrame = {
78+
val dfs = infoDates.map(infoDate => runLazyTask(outputTableName, infoDate))
7979

8080
if (dfs.isEmpty) {
8181
TransientTableManager.getEmptyDfForTable(outputTableName).getOrElse(spark.emptyDataFrame)
@@ -85,13 +85,13 @@ object TransientJobManager {
8585
s"since the number of dataframe unions is too big (${infoDates.length} > $WARN_UNIONS)")
8686
}
8787
if (infoDates.length > MAXIMUM_UNIONS) {
88-
throw new IllegalArgumentException(s"The number of subtasks requested for the on-demand job contains too many " +
88+
throw new IllegalArgumentException(s"The number of subtasks requested for the lazy job contains too many " +
8989
s"dataframe unions (${infoDates.length} > $MAXIMUM_UNIONS)")
9090
}
9191

92-
infoDates.tail.foldLeft(runOnDemandTask(outputTableName, infoDates.head))(
92+
infoDates.tail.foldLeft(runLazyTask(outputTableName, infoDates.head))(
9393
(acc, infoDate) => {
94-
val df = runOnDemandTask(outputTableName, infoDate)
94+
val df = runLazyTask(outputTableName, infoDate)
9595
safeUnion(acc, df)
9696
}
9797
)
@@ -113,11 +113,11 @@ object TransientJobManager {
113113
}
114114
}
115115

116-
private[core] def runOnDemandTask(outputTableName: String,
117-
infoDate: LocalDate)
118-
(implicit spark: SparkSession): DataFrame = {
116+
private[core] def runLazyTask(outputTableName: String,
117+
infoDate: LocalDate)
118+
(implicit spark: SparkSession): DataFrame = {
119119
val start = Instant.now()
120-
val fut = getOnDemandTaskFuture(outputTableName, infoDate)
120+
val fut = getLazyTaskFuture(outputTableName, infoDate)
121121

122122
if (!fut.isCompleted) {
123123
log.info(s"Waiting for the dependent task to finish ($outputTableName for $infoDate)...")
@@ -130,9 +130,9 @@ object TransientJobManager {
130130
df
131131
}
132132

133-
private[core] def getOnDemandTaskFuture(outputTableName: String,
134-
infoDate: LocalDate)
135-
(implicit spark: SparkSession): Future[DataFrame] = {
133+
private[core] def getLazyTaskFuture(outputTableName: String,
134+
infoDate: LocalDate)
135+
(implicit spark: SparkSession): Future[DataFrame] = {
136136

137137
val promise = Promise[DataFrame]()
138138

@@ -186,7 +186,7 @@ object TransientJobManager {
186186
log.info(s"The task ($outputTableName for $infoDate) is already running. Waiting for results...")
187187
Some(runningJobs(metastorePartition))
188188
} else {
189-
log.info(s"Running the on-demand task ($outputTableName for $infoDate)...")
189+
log.info(s"Running (materializing) the lazy task ($outputTableName for $infoDate)...")
190190
runningJobs += metastorePartition -> newJonFuture
191191
None
192192
}
@@ -201,27 +201,27 @@ object TransientJobManager {
201201
}
202202

203203
private[core] def getJob(outputTableName: String): Job = {
204-
val jobOpt = onDemandJobs.get(outputTableName.toLowerCase)
204+
val jobOpt = lazyJobs.get(outputTableName.toLowerCase)
205205

206206
jobOpt match {
207207
case Some(job) => job
208-
case None => throw new IllegalArgumentException(s"On-demand job with output table name '$outputTableName' not found or haven't registered yet.")
208+
case None => throw new IllegalArgumentException(s"Lazy job with output table name '$outputTableName' not found or haven't registered yet.")
209209
}
210210
}
211211

212212
private[core] def reset(): Unit = synchronized {
213-
onDemandJobs.clear()
213+
lazyJobs.clear()
214214
runningJobs.clear()
215215
taskRunnerOpt = None
216216
}
217217

218218
private[core] def runJob(job: Job,
219219
infoDate: LocalDate)
220220
(implicit spark: SparkSession): DataFrame = {
221-
val jobStr = s"On-demand job outputting to '${job.outputTable.name}' for '$infoDate'"
221+
val jobStr = s"Lazy job outputting to '${job.outputTable.name}' for '$infoDate'"
222222
taskRunnerOpt match {
223223
case Some(taskRunner) =>
224-
taskRunner.runOnDemand(job, infoDate) match {
224+
taskRunner.runLazyTask(job, infoDate) match {
225225
case _: RunStatus.Succeeded => TransientTableManager.getDataForTheDate(job.outputTable.name, infoDate)
226226
case _: RunStatus.Skipped => spark.emptyDataFrame
227227
case RunStatus.ValidationFailed(ex) => throw new IllegalStateException(s"$jobStr validation failed.", ex)

pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/TaskRunReason.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ object TaskRunReason {
3939
override def toString: String = "Skip"
4040
}
4141

42-
case object OnDemand extends TaskRunReason {
43-
override def toString: String = "On Demand"
42+
case object OnRequest extends TaskRunReason {
43+
override def toString: String = "On Request"
4444
}
4545
}

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
146146
}
147147

148148
private[core] def runLazyJob(job: Job): Boolean = {
149-
TransientJobManager.addOnDemandJob(job)
149+
TransientJobManager.addLazyJob(job)
150150
true
151151
}
152152

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ import com.github.yruslan.channel.Channel
2020
import com.typesafe.config.Config
2121
import org.apache.spark.sql.SparkSession
2222
import org.slf4j.LoggerFactory
23-
import za.co.absa.pramen.api.DataFormat
2423
import za.co.absa.pramen.core.app.AppContext
2524
import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ValidationException}
2625
import za.co.absa.pramen.core.pipeline.{Job, JobDependency, OperationType}
@@ -92,10 +91,10 @@ class OrchestratorImpl extends Orchestrator {
9291
log.info(s"There is another job outputting to ${finishedJob.outputTable.name}. Waiting for it to finish before marking the table as finished.")
9392
}
9493

95-
val isOnDemand = finishedJob.outputTable.format.isInstanceOf[DataFormat.Transient]
94+
val isLazy = finishedJob.outputTable.format.isLazy
9695

9796
if (!hasAnotherUnfinishedJob || !isSucceeded) {
98-
updateDependencyResolver(dependencyResolver, finishedJob, isSucceeded, isOnDemand)
97+
updateDependencyResolver(dependencyResolver, finishedJob, isSucceeded, isLazy)
9998
}
10099

101100
state.addTaskCompletion(taskResults)
@@ -188,12 +187,12 @@ class OrchestratorImpl extends Orchestrator {
188187
private def updateDependencyResolver(dependencyResolver: DependencyResolver,
189188
job: Job,
190189
isSucceeded: Boolean,
191-
isOnDemand: Boolean): Unit = {
190+
isLazy: Boolean): Unit = {
192191
val outputTable = job.outputTable
193192

194193
if (isSucceeded) {
195-
if (isOnDemand) {
196-
log.info(s"On-demand job '${job.name}' outputting to '${outputTable.name}' has been registered for the future use.")
194+
if (isLazy) {
195+
log.info(s"Lazy job '${job.name}' outputting to '${outputTable.name}' has been registered for the future use.")
197196
} else {
198197
log.info(s"$SUCCESS Job '${job.name}' outputting to '${outputTable.name}' has SUCCEEDED.")
199198
}

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunner.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,6 @@ trait TaskRunner extends AutoCloseable {
2525
/** Run a job for specified information dates as a part of pipeline execution. */
2626
def runJobTasks(job: Job, infoDates: Seq[TaskPreDef]): Future[Seq[RunStatus]]
2727

28-
/** Run a job for specified information date on-demand from another job. */
29-
def runOnDemand(job: Job, infoDate: LocalDate): RunStatus
28+
/** Run a job for specified information date when requested from another job. */
29+
def runLazyTask(job: Job, infoDate: LocalDate): RunStatus
3030
}

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,15 +83,15 @@ abstract class TaskRunnerBase(conf: Config,
8383
}
8484
}
8585

86-
override def runOnDemand(job: Job, infoDate: LocalDate): RunStatus = {
86+
override def runLazyTask(job: Job, infoDate: LocalDate): RunStatus = {
8787
val started = Instant.now()
88-
val task = Task(job, infoDate, TaskRunReason.OnDemand)
88+
val task = Task(job, infoDate, TaskRunReason.OnRequest)
8989
val result: TaskResult = validate(task, started) match {
9090
case Left(failedResult) => failedResult
9191
case Right(validationResult) => run(task, started, validationResult)
9292
}
9393

94-
onTaskCompletion(task, result, isOnDemand = true)
94+
onTaskCompletion(task, result, isLazy = true)
9595
}
9696

9797
/** Runs multiple tasks in the single thread in the order of info dates. If one task fails, the rest will be skipped. */
@@ -125,7 +125,7 @@ abstract class TaskRunnerBase(conf: Config,
125125
case Left(failedResult) => failedResult
126126
case Right(validationResult) => run(task, started, validationResult)
127127
}
128-
onTaskCompletion(task, result, isOnDemand = false)
128+
onTaskCompletion(task, result, isLazy = false)
129129
}
130130
}
131131

@@ -137,7 +137,7 @@ abstract class TaskRunnerBase(conf: Config,
137137
val isTransient = task.job.outputTable.format.isTransient
138138
val taskResult = TaskResult(task.job, runStatus, Some(runInfo), applicationId, isTransient, Nil, Nil, Nil)
139139

140-
onTaskCompletion(task, taskResult, isOnDemand = false)
140+
onTaskCompletion(task, taskResult, isLazy = false)
141141
}
142142

143143
/**
@@ -362,11 +362,11 @@ abstract class TaskRunnerBase(conf: Config,
362362
}
363363

364364
/** Logs task completion and sends corresponding notifications. */
365-
private def onTaskCompletion(task: Task, taskResult: TaskResult, isOnDemand: Boolean): RunStatus = {
365+
private def onTaskCompletion(task: Task, taskResult: TaskResult, isLazy: Boolean): RunStatus = {
366366
val notificationTargetErrors = sendNotifications(task, taskResult)
367367
val updatedResult = taskResult.copy(notificationTargetErrors = notificationTargetErrors)
368368

369-
logTaskResult(updatedResult, isOnDemand)
369+
logTaskResult(updatedResult, isLazy)
370370
pipelineState.addTaskCompletion(Seq(updatedResult))
371371
addJournalEntry(task, updatedResult)
372372

@@ -447,14 +447,14 @@ abstract class TaskRunnerBase(conf: Config,
447447
Some(RunInfo(infoDate, started, Instant.now()))
448448
}
449449

450-
private def logTaskResult(result: TaskResult, isOnDemand: Boolean): Unit = synchronized {
450+
private def logTaskResult(result: TaskResult, isLazy: Boolean): Unit = synchronized {
451451
val infoDateMsg = result.runInfo match {
452452
case Some(date) => s" for ${date.infoDate}"
453453
case None => ""
454454
}
455455

456-
val taskStr = if (isOnDemand) {
457-
"On demand task"
456+
val taskStr = if (isLazy) {
457+
"Requested lazy task"
458458
} else {
459459
"Task"
460460
}

pramen/core/src/test/resources/test/config/integration_transient_transformer.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ pramen.operations = [
6464
info.date.expr = "@runDate - 1"
6565
},
6666
{
67-
name = "Identity transformer (on-demand)"
67+
name = "Identity transformer (lazy)"
6868
type = "transformation"
6969

7070
class = "za.co.absa.pramen.core.transformers.IdentityTransformer"

0 commit comments

Comments
 (0)