From fd79310581695d37d902fb7fc608ebfa13e96b47 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 11 Jan 2024 15:16:38 +0100 Subject: [PATCH] Do not include records per seconds if there are not many records. --- .../notify/pipeline/PipelineNotificationBuilderHtml.scala | 5 +++-- .../test/notify/test_pipeline_email_body_complex.txt | 6 +++--- .../scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index f8502e87a..764171fb1 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.notification._ import za.co.absa.pramen.core.config.Keys.TIMEZONE import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException} import za.co.absa.pramen.core.notify.message._ -import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.MIN_RPS_JOB_DURATION_SECONDS +import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS} import za.co.absa.pramen.core.pipeline.TaskRunReason import za.co.absa.pramen.core.runner.task.RunStatus._ import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunStatus, TaskResult} @@ -36,6 +36,7 @@ import scala.collection.mutable.ListBuffer object PipelineNotificationBuilderHtml { val MIN_RPS_JOB_DURATION_SECONDS = 60 + val MIN_RPS_RECORDS = 1000 } class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNotificationBuilder { @@ -450,7 +451,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot task.runInfo match { case Some(runInfo) => val jobDuration = Duration.between(runInfo.started, runInfo.finished).getSeconds - if (jobDuration > MIN_RPS_JOB_DURATION_SECONDS && recordCount > 0L) { + if (jobDuration > MIN_RPS_JOB_DURATION_SECONDS && recordCount >= MIN_RPS_RECORDS) { val throughput = recordCount / jobDuration throughput match { diff --git a/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt b/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt index 62dbd45a6..84aad3cfe 100644 --- a/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt +++ b/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt @@ -122,10 +122,10 @@ table_out db.table1 2022-02-18 -200 (+100) +20000 (+10000) 01:14:04 -1000 B -0 +97 KiB +4 1970-01-01 03:34 +0200 Warning Test warning diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala index 50743a9ed..f2b2ed94b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala @@ -24,7 +24,7 @@ object TestPrototypes { val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), 200, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) val runStatusWarning: RunStatus = RunStatus.Succeeded( - Some(100), 200, Some(1000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"), + Some(10000), 20000, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"), Seq("file1.csv", "file2.csv"), Seq("`db`.`table1`"), Seq("Test warning") )