Skip to content

Commit

Permalink
#423 Add pipeline failures to the pipeline info object.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Jun 20, 2024
1 parent 7732a4c commit 3bf4ec1
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.api

import za.co.absa.pramen.api.status.RuntimeInfo
import za.co.absa.pramen.api.status.{PipelineNotificationFailure, RuntimeInfo}

import java.time.Instant

Expand All @@ -27,5 +27,6 @@ case class PipelineInfo(
startedAt: Instant,
finishedAt: Option[Instant],
sparkApplicationId: Option[String],
failureException: Option[Throwable]
failureException: Option[Throwable],
pipelineNotificationFailures: Seq[PipelineNotificationFailure]
)
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
startedInstant,
finishedInstant,
sparkAppId,
appException
appException,
pipelineNotificationFailures.toSeq
),
isFinished,
exitedNormally,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.pramen.core.mocks

import za.co.absa.pramen.api.PipelineInfo
import za.co.absa.pramen.api.status.RuntimeInfo
import za.co.absa.pramen.api.status.{PipelineNotificationFailure, RuntimeInfo}

import java.time.Instant

Expand All @@ -28,7 +28,8 @@ object PipelineInfoFactory {
startedAt: Instant = Instant.ofEpochSecond(1718609409),
finishedAt: Option[Instant] = None,
sparkApplicationId: Option[String] = Some("testid-12345"),
failureException: Option[Throwable] = None): PipelineInfo = {
PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, sparkApplicationId, failureException)
failureException: Option[Throwable] = None,
pipelineNotificationFailures: Seq[PipelineNotificationFailure] = Seq.empty): PipelineInfo = {
PipelineInfo(pipelineName, environment, runtimeInfo, startedAt, finishedAt, sparkApplicationId, failureException, pipelineNotificationFailures)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class EcsPipelineNotificationTargetSuite extends AnyWordSpec {
val task3 = TestPrototypes.taskNotification.copy(jobName = "Job 3", outputTable = metaTableDef3)

notificationTarget.sendNotification(
PipelineInfo("Dummy", "DEV", RuntimeInfo(), Instant.now, None, None, None),
PipelineInfo("Dummy", "DEV", RuntimeInfo(), Instant.now, None, None, None, Seq.empty),
Seq(task1, task2, task3),
CustomNotification(Seq.empty, Seq.empty)
)
Expand Down

0 comments on commit 3bf4ec1

Please sign in to comment.