Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#498 Allow dynamically setting job descriptions #500

Merged
merged 1 commit into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2454,6 +2454,27 @@ You can use any source/sink combination in transfer jobs.

We describe here a more complicated use cases.

### Dynamically changing Spark Application description
You can set up a template for Spark Application, and it will be set dynamically each time a new job is executing.

Example configuration:
```hocon
pramen.job.description.template = "Pramen - running @pipeline, job @jobName for @infoDate"
```

These variables are available:

| Variable | Description |
|--------------|-------------------------------------------------------------------------------|
| @pipeline | The name of the pipeline (if defined at `pramen.pipeline.name`). |
| @tenant | The name of the tenant (if defined at `pramen.environment.name`). |
| @environment | The environment (if defined at `pramen.tenant`). |
| @jobName | The name of the job as defined in the operation definition. |
| @infoDate | The information date the job is running for. |
| @outputTable | The output metastore table of the job. |
| @dryRun | Adds `(DRY RUN)` when running in the dry run mode, am empty string otherwise. |


### Startup and shutdown hooks

Startup and shutdown hooks allow running custom code before and after the pipeline runs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ case class RuntimeConfig(
parallelTasks: Int,
stopSparkSession: Boolean,
allowEmptyPipeline: Boolean,
historicalRunMode: RunMode
historicalRunMode: RunMode,
sparkAppDescriptionTemplate: Option[String]
)

object RuntimeConfig {
Expand All @@ -66,6 +67,7 @@ object RuntimeConfig {
val STOP_SPARK_SESSION = "pramen.stop.spark.session"
val VERBOSE = "pramen.verbose"
val ALLOW_EMPTY_PIPELINE = "pramen.allow.empty.pipeline"
val SPARK_APP_DESCRIPTION_TEMPLATE = "pramen.job.description.template"

def fromConfig(conf: Config): RuntimeConfig = {
val infoDateFormat = conf.getString(INFORMATION_DATE_FORMAT_APP)
Expand Down Expand Up @@ -128,6 +130,7 @@ object RuntimeConfig {
}

val allowEmptyPipeline = ConfigUtils.getOptionBoolean(conf, ALLOW_EMPTY_PIPELINE).getOrElse(false)
val sparkAppDescriptionTemplate = ConfigUtils.getOptionString(conf, SPARK_APP_DESCRIPTION_TEMPLATE)

RuntimeConfig(
isDryRun = isDryRun,
Expand All @@ -144,7 +147,8 @@ object RuntimeConfig {
parallelTasks = parallelTasks,
stopSparkSession = conf.getBoolean(STOP_SPARK_SESSION),
allowEmptyPipeline,
runMode
runMode,
sparkAppDescriptionTemplate
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
completedJobsChannel.close()
}

private[core] def setSparkAppDescription(): Unit = synchronized {
???
}

private[core] def onFatalException(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
log.error(s"${Emoji.FAILURE} A FATAL error has been encountered.", ex)
val fatalEx = new FatalErrorWrapper(s"FATAL exception encountered, stopping the pipeline.", ex)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.runner.task

import com.typesafe.config.Config
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions.lit
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api._
Expand All @@ -31,11 +31,12 @@ import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline._
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji._
import za.co.absa.pramen.core.utils.SparkUtils._
import za.co.absa.pramen.core.utils.{ThreadUtils, TimeUtils}
import za.co.absa.pramen.core.utils.{ConfigUtils, ThreadUtils, TimeUtils}
import za.co.absa.pramen.core.utils.hive.HiveHelper

import java.sql.Date
Expand All @@ -53,6 +54,8 @@ abstract class TaskRunnerBase(conf: Config,
runtimeConfig: RuntimeConfig,
pipelineState: PipelineState,
applicationId: String) extends TaskRunner {
import TaskRunnerBase._

implicit private val ecDefault: ExecutionContext = ExecutionContext.global
implicit val localDateOrdering: Ordering[LocalDate] = Ordering.by(_.toEpochDay)

Expand Down Expand Up @@ -123,6 +126,12 @@ abstract class TaskRunnerBase(conf: Config,
@volatile var runStatus: RunStatus = null

try {
runtimeConfig.sparkAppDescriptionTemplate.foreach { template =>
val description = applyAppDescriptionTemplate(template, task, runtimeConfig, conf)
val spark = SparkSession.builder().getOrCreate()
spark.sparkContext.setJobDescription(description)
}

ThreadUtils.runWithTimeout(Duration(timeout, TimeUnit.SECONDS)) {
log.info(s"Running ${task.job.name} with the hard timeout = $timeout seconds.")
runStatus = doValidateAndRunTask(task)
Expand Down Expand Up @@ -604,3 +613,23 @@ abstract class TaskRunnerBase(conf: Config,
}
}
}

object TaskRunnerBase {
def applyAppDescriptionTemplate(template: String, task: Task, runtimeConfig: RuntimeConfig, conf: Config): String = {
val job = task.job
val pipelineName = conf.getString(PIPELINE_NAME_KEY)
val environmentName = ConfigUtils.getOptionString(conf, ENVIRONMENT_NAME).getOrElse("UNKNOWN")
val tenant = ConfigUtils.getOptionString(conf, TENANT_KEY).getOrElse("UNKNOWN")
val dryRun = if (runtimeConfig.isDryRun) "(DRY RUN)" else ""

template.replaceAll("@jobName", job.name)
.replaceAll("@infoDate", task.infoDate.toString)
.replaceAll("@metastoreTable", job.outputTable.name)
.replaceAll("@outputTable", job.outputTable.name)
.replaceAll("@table", job.outputTable.name)
.replaceAll("@pipeline", pipelineName)
.replaceAll("@tenant", tenant)
.replaceAll("@environment", environmentName)
.replaceAll("@dryRun", dryRun)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.pipeline.Task
import za.co.absa.pramen.core.pipeline.PipelineDef.{ENVIRONMENT_NAME, PIPELINE_NAME_KEY, TENANT_KEY}
import za.co.absa.pramen.core.pipeline.{Job, Task}
import za.co.absa.pramen.core.state.PipelineState
import za.co.absa.pramen.core.utils.Emoji
import za.co.absa.pramen.core.utils.{ConfigUtils, Emoji}

import java.util.concurrent.Executors.newFixedThreadPool
import java.util.concurrent.{ExecutorService, Semaphore}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ object RuntimeConfigFactory {
parallelTasks: Int = 1,
stopSparkSession: Boolean = false,
allowEmptyPipeline: Boolean = false,
historicalRunMode: RunMode = RunMode.CheckUpdates): RuntimeConfig = {
historicalRunMode: RunMode = RunMode.CheckUpdates,
sparkAppDescriptionTemplate: Option[String] = None): RuntimeConfig = {
RuntimeConfig(isDryRun,
isRerun,
runTables,
Expand All @@ -52,7 +53,8 @@ object RuntimeConfigFactory {
parallelTasks,
stopSparkSession,
allowEmptyPipeline,
historicalRunMode)
historicalRunMode,
sparkAppDescriptionTemplate)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
| load.date.to = 2021-01-10
| parallel.tasks = 4
| stop.spark.session = true
| job.description.template = "Test template"
|}
|""".stripMargin

Expand All @@ -64,6 +65,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.runDateTo.get.toString == "2021-01-10")
assert(runtimeConfig.parallelTasks == 4)
assert(runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.contains("Test template"))
}

"have default values" in {
Expand All @@ -84,6 +86,7 @@ class RuntimeConfigSuite extends AnyWordSpec {
assert(runtimeConfig.runDateTo.isEmpty)
assert(runtimeConfig.parallelTasks == 1)
assert(!runtimeConfig.stopSparkSession)
assert(runtimeConfig.sparkAppDescriptionTemplate.isEmpty)
}
}

Expand Down
Loading