Skip to content

Commit 697a9d8

Browse files
committed
#389 Add Spark application id to jobs that fail before running a task.
1 parent 851daba commit 697a9d8

File tree

10 files changed

+36
-1
lines changed

10 files changed

+36
-1
lines changed

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotification.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ case class PipelineNotification(
2525
exception: Option[Throwable],
2626
pipelineName: String,
2727
environmentName: String,
28+
sparkAppId: Option[String],
2829
started: Instant,
2930
finished: Instant,
3031
tasksCompleted: List[TaskResult],

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilder.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ trait PipelineNotificationBuilder {
2626

2727
def addAppName(appName: String): Unit
2828

29+
def addSparkAppId(sparkAppId: String): Unit
30+
2931
def addEnvironmentName(env: String): Unit
3032

3133
def addAppDuration(appStarted: Instant, appFinished: Instant): Unit

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
5454

5555
var appException: Option[Throwable] = None
5656
var appName: String = "Unspecified Job"
57+
var sparkAppId: Option[String] = None
5758
var envName: String = "Unspecified Environment"
5859
var appStarted: Instant = Instant.now()
5960
var appFinished: Instant = Instant.now()
@@ -72,6 +73,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
7273
this.appName = appName
7374
}
7475

76+
override def addSparkAppId(sparkAppId: String): Unit = {
77+
this.sparkAppId = Option(sparkAppId)
78+
}
79+
7580
override def addEnvironmentName(envName: String): Unit = {
7681
this.envName = envName
7782
}
@@ -191,7 +196,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
191196

192197
introParagraph.withText(".")
193198

194-
val applicationIds = completedTasks.map(_.applicationId.trim).filter(_.nonEmpty).distinct
199+
val applicationIds = getSparkApplicationIds
195200

196201
// This handles the case when all tasks are run under the same Spark Session.
197202
// When Pramen support runners that run tasks in different Spark Sessions (via Yarn, Glue etc APIs), this will need
@@ -227,6 +232,13 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
227232
builder
228233
}
229234

235+
private[core] def getSparkApplicationIds: Seq[String] = {
236+
sparkAppId match {
237+
case Some(appId) => (appId +: completedTasks.map(_.applicationId.trim).filter(_.nonEmpty)).distinct
238+
case None => completedTasks.map(_.applicationId.trim).filter(_.nonEmpty).distinct
239+
}
240+
}
241+
230242
private[core] def getSuccessFlags: (Boolean, Boolean) = {
231243
val hasNotificationFailures = completedTasks.exists(t => t.notificationTargetErrors.nonEmpty)
232244
val someTasksSucceeded = completedTasks.exists(_.runStatus.isInstanceOf[Succeeded]) && appException.isEmpty

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationDirector.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ object PipelineNotificationDirector {
3636

3737
notificationBuilder.addAppName(notification.pipelineName)
3838
notificationBuilder.addEnvironmentName(notification.environmentName)
39+
notification.sparkAppId.foreach(id => notificationBuilder.addSparkAppId(id))
3940
notificationBuilder.addAppDuration(notification.started, notification.finished)
4041
notificationBuilder.addDryRun(dryRun)
4142
notificationBuilder.addUndercover(undercover)

pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ object AppRunner {
5555

5656
val exitCodeTry = for {
5757
spark <- getSparkSession(conf, state)
58+
_ <- Try { state.setSparkAppId(spark.sparkContext.applicationId) }
5859
_ <- logBanner(spark)
5960
_ <- logExecutorNodes(conf, state, spark)
6061
appContext <- createAppContext(conf, state, spark)

pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineState.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ trait PipelineState extends AutoCloseable {
2727

2828
def setFailure(stage: String, exception: Throwable): Unit
2929

30+
def setSparkAppId(sparkAppId: String): Unit
31+
3032
def addTaskCompletion(statuses: Seq[TaskResult]): Unit
3133

3234
def getExitCode: Int

pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
5454
@volatile private var exitedNormally = false
5555
@volatile private var isFinished = false
5656
@volatile private var customShutdownHookCanRun = false
57+
@volatile private var sparkAppId: Option[String] = None
5758

5859
init()
5960

@@ -100,6 +101,10 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
100101
}
101102
}
102103

104+
override def setSparkAppId(sparkAppId: String): Unit = synchronized {
105+
this.sparkAppId = Option(sparkAppId)
106+
}
107+
103108
override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized {
104109
taskResults ++= statuses.filter(_.runStatus != NotRan)
105110
if (statuses.exists(_.runStatus.isFailure)) {
@@ -178,6 +183,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
178183
val notification = PipelineNotification(failureException,
179184
pipelineName,
180185
environmentName,
186+
sparkAppId,
181187
startedInstant,
182188
finishedInstant,
183189
realTaskResults.toList,

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/PipelineNotificationFactory.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ object PipelineNotificationFactory {
2727
def getDummyNotification(exception: Option[Throwable] = None,
2828
pipelineName: String = "DummyPipeline",
2929
environmentName: String = "DummyEnvironment",
30+
sparkAppId: Option[String] = None,
3031
started: Instant = Instant.ofEpochSecond(1234567L),
3132
finished: Instant = Instant.ofEpochSecond(1234568L),
3233
tasksCompleted: List[TaskResult] = List(TaskResultFactory.getDummyTaskResult()),
@@ -37,6 +38,7 @@ object PipelineNotificationFactory {
3738
exception,
3839
pipelineName,
3940
environmentName,
41+
sparkAppId,
4042
started,
4143
finished,
4244
tasksCompleted,

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationBuilderSpy.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import java.time.Instant
2525
class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder {
2626
var failureException: Option[Throwable] = None
2727
var appName = ""
28+
var sparkId = ""
2829
var environmentName = ""
2930
var appStarted: Instant = Instant.MIN
3031
var appFinished: Instant = Instant.MIN
@@ -41,6 +42,8 @@ class PipelineNotificationBuilderSpy extends PipelineNotificationBuilder {
4142

4243
override def addAppName(name: String): Unit = appName = name
4344

45+
override def addSparkAppId(sparkAppId: String): Unit = sparkId = sparkAppId
46+
4447
override def addEnvironmentName(env: String): Unit = environmentName = env
4548

4649
override def addAppDuration(started: Instant, finished: Instant): Unit = {

pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/state/PipelineStateSpy.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class PipelineStateSpy extends PipelineState {
2929
val failures = new ListBuffer[(String, Throwable)]
3030
val completedStatuses = new ListBuffer[TaskResult]
3131
var closeCalled = 0
32+
var sparkAppId: Option[String] = None
3233

3334
override def getState(): PipelineStateSnapshot = {
3435
PipelineStateSnapshot(
@@ -53,6 +54,10 @@ class PipelineStateSpy extends PipelineState {
5354
failures.append((stage, ex))
5455
}
5556

57+
override def setSparkAppId(sparkAppId: String): Unit = synchronized {
58+
this.sparkAppId = Option(sparkAppId)
59+
}
60+
5661
override def addTaskCompletion(statuses: Seq[TaskResult]): Unit = synchronized {
5762
completedStatuses ++= statuses
5863
}

0 commit comments

Comments
 (0)