From cf6526be6039cc6c81185169c052352d6d65dfde Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 3 May 2024 14:26:33 +0200 Subject: [PATCH] #372 Add integration tests for notification targets. --- README.md | 7 +- .../integration_notification_targets.conf | 85 +++++++++++++++++ .../integration/NotificationTargetSuite.scala | 93 +++++++++++++++++++ .../mocks/notify/NotificationTargetMock.scala | 46 +++++++++ .../PipelineNotificationTargetMock.scala | 37 ++++++++ 5 files changed, 266 insertions(+), 2 deletions(-) create mode 100644 pramen/core/src/test/resources/test/config/integration_notification_targets.conf create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala diff --git a/README.md b/README.md index 2c02d4fc0..d22ae4f37 100644 --- a/README.md +++ b/README.md @@ -2631,12 +2631,15 @@ used to send custom notifications to external systems. A pipeline notification t ```scala package com.example +import com.typesafe.config.Config import za.co.absa.pramen.api.PipelineNotificationTarget -class MyPipelineNotificationTarget extends PipelineNotificationTarget { - def sendNotification(pipelineStarted: Instant, +class MyPipelineNotificationTarget(conf: Config) extends PipelineNotificationTarget { + override def sendNotification(pipelineStarted: Instant, appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = ??? + + override def config: Config = conf } ``` diff --git a/pramen/core/src/test/resources/test/config/integration_notification_targets.conf b/pramen/core/src/test/resources/test/config/integration_notification_targets.conf new file mode 100644 index 000000000..d03b8ed46 --- /dev/null +++ b/pramen/core/src/test/resources/test/config/integration_notification_targets.conf @@ -0,0 +1,85 @@ +# Copyright 2022 ABSA Group Limited +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This variable is expected to be set up by the test suite +#base.path = "/tmp" + +pramen { + pipeline.name = "Integration test with notificaiton targets" + + temporary.directory = ${base.path}/temp + + bookkeeping.enabled = false + stop.spark.session = false +} + +pramen.notification.targets = [ + { + name = "dummy_notification_target" + factory.class = "za.co.absa.pramen.core.mocks.notify.NotificationTargetMock" + + test.fail.notification = ${test.fail.notification} + } +] + +pramen.pipeline.notification.targets = [ "za.co.absa.pramen.core.mocks.notify.PipelineNotificationTargetMock" ] + +pramen.metastore { + tables = [ + { + name = "table1" + format = "parquet" + path = ${base.path}/table1 + }, + { + name = "table2" + format = "parquet" + path = ${base.path}/table2 + } + ] +} + +pramen.operations = [ + { + name = "Generating dataframe" + type = "transformation" + + class = "za.co.absa.pramen.core.mocks.transformer.GeneratingTransformer" + schedule.type = "daily" + output.table = "table1" + + notification.targets = [ "dummy_notification_target" ] + }, + { + name = "Identity transformer" + type = "transformation" + class = "za.co.absa.pramen.core.transformers.IdentityTransformer" + schedule.type = "daily" + + output.table = "table2" + + dependencies = [ + { + tables = [ table1 ] + date.from = "@infoDate" + optional = true # Since no bookkeeping available the table will be seen as empty for the dependency manager + } + ] + + notification.targets = [ "dummy_notification_target" ] + + option { + table = "table1" + } + } +] diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala new file mode 100644 index 000000000..b296f2d1b --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/integration/NotificationTargetSuite.scala @@ -0,0 +1,93 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.integration + +import com.typesafe.config.{Config, ConfigFactory} +import org.apache.hadoop.fs.Path +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.core.base.SparkTestBase +import za.co.absa.pramen.core.fixtures.{TempDirFixture, TextComparisonFixture} +import za.co.absa.pramen.core.runner.AppRunner +import za.co.absa.pramen.core.utils.{FsUtils, ResourceUtils} + +import java.time.LocalDate + +class NotificationTargetSuite extends AnyWordSpec with SparkTestBase with TempDirFixture with TextComparisonFixture { + private val infoDate = LocalDate.of(2021, 2, 18) + + "Pipeline with notification targets" should { + val expectedSingle = + """{"a":"D","b":4} + |{"a":"E","b":5} + |{"a":"F","b":6} + |""".stripMargin + + "work end to end for non-failing pipelines" in { + withTempDirectory("notification_targets") { tempDir => + val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, tempDir) + + + val conf = getConfig(tempDir) + val exitCode = AppRunner.runPipeline(conf) + + assert(exitCode == 0) + + val table2Path = new Path(new Path(tempDir, "table2"), s"pramen_info_date=$infoDate") + + assert(fsUtils.exists(table2Path)) + + val df2 = spark.read.parquet(table2Path.toString) + val actual2 = df2.orderBy("a").toJSON.collect().mkString("\n") + + compareText(actual2, expectedSingle) + + assert(System.getProperty("pramen.test.notification.tasks.completed").toInt == 2) + assert(System.getProperty("pramen.test.notification.table") == "table2") + } + } + + "still return zero exit code on notification failures" in { + withTempDirectory("notification_targets") { tempDir => + val conf = getConfig(tempDir, failNotifications = true) + val exitCode = AppRunner.runPipeline(conf) + + assert(exitCode == 0) + + assert(System.getProperty("pramen.test.notification.pipeline.failure").toBoolean) + assert(System.getProperty("pramen.test.notification.target.failure").toBoolean) + } + } + } + + def getConfig(basePath: String, failNotifications: Boolean = false): Config = { + val configContents = ResourceUtils.getResourceString("/test/config/integration_notification_targets.conf") + val basePathEscaped = basePath.replace("\\", "\\\\") + + val conf = ConfigFactory.parseString( + s"""base.path = "$basePathEscaped" + |pramen.runtime.is.rerun = true + |pramen.current.date = "$infoDate" + |test.fail.notification = $failNotifications + |$configContents + |""".stripMargin + ).withFallback(ConfigFactory.load()) + .resolve() + + conf + } + +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala new file mode 100644 index 000000000..b7e355e61 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/NotificationTargetMock.scala @@ -0,0 +1,46 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.mocks.notify + +import com.typesafe.config.Config +import org.apache.spark.sql.SparkSession +import za.co.absa.pramen.api.{ExternalChannelFactory, NotificationTarget, TaskNotification} +import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY + +import scala.collection.mutable.ListBuffer + +class NotificationTargetMock(conf: Config) extends NotificationTarget { + val notificationsSent: ListBuffer[TaskNotification] = new ListBuffer[TaskNotification]() + + override def config: Config = conf + + override def sendNotification(notification: TaskNotification): Unit = { + if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) { + System.setProperty("pramen.test.notification.target.failure", "true") + throw new RuntimeException("Notification target test exception") + } + System.setProperty("pramen.test.notification.table", notification.tableName) + } +} + +object NotificationTargetMock extends ExternalChannelFactory[NotificationTargetMock] { + val TEST_NOTIFICATION_FAIL_KEY = "test.fail.notification" + + override def apply(conf: Config, parentPath: String, spark: SparkSession): NotificationTargetMock = { + new NotificationTargetMock(conf) + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala new file mode 100644 index 000000000..dd53d10e1 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/notify/PipelineNotificationTargetMock.scala @@ -0,0 +1,37 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.mocks.notify + +import com.typesafe.config.Config +import za.co.absa.pramen.api.{PipelineNotificationTarget, TaskNotification} +import za.co.absa.pramen.core.mocks.notify.NotificationTargetMock.TEST_NOTIFICATION_FAIL_KEY + +import java.time.Instant + +class PipelineNotificationTargetMock(conf: Config) extends PipelineNotificationTarget { + + override def sendNotification(pipelineStarted: Instant, appException: Option[Throwable], tasksCompleted: Seq[TaskNotification]): Unit = { + if (conf.hasPath(TEST_NOTIFICATION_FAIL_KEY) && conf.getBoolean(TEST_NOTIFICATION_FAIL_KEY)) { + System.setProperty("pramen.test.notification.pipeline.failure", "true") + throw new RuntimeException("Pipeline notification target test exception") + } + + System.setProperty("pramen.test.notification.tasks.completed", tasksCompleted.length.toString) + } + + override def config: Config = conf +}