From e4a71f476c3a1e8828f74cfe644eaabd41e3e0bf Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 12 Mar 2024 08:41:35 +0100 Subject: [PATCH 1/2] #366 Implement tracking of raw files in the metastore by size, not by file count. --- .../peristence/MetastorePersistenceRaw.scala | 20 ++--- .../PipelineNotificationBuilderHtml.scala | 78 ++++++++++++++++--- .../jobrunner/ConcurrentJobRunnerImpl.scala | 4 +- .../orchestrator/OrchestratorImpl.scala | 4 +- .../pramen/core/runner/task/TaskResult.scala | 1 + .../core/runner/task/TaskRunnerBase.scala | 30 +++---- .../pramen/core/source/RawFileSource.scala | 39 ++++++---- .../absa/pramen/core/source/SparkSource.scala | 2 +- .../co/absa/pramen/core/utils/FsUtils.scala | 8 +- .../test_pipeline_email_body_complex.txt | 4 +- .../pramen/core/mocks/TaskResultFactory.scala | 2 + .../mocks/runner/ConcurrentJobRunnerSpy.scala | 2 +- .../core/source/RawFileSourceSuite.scala | 42 ++++++---- .../tests/journal/TaskCompletedSuite.scala | 2 + .../core/tests/utils/FsUtilsSuite.scala | 15 ++-- 15 files changed, 169 insertions(+), 84 deletions(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala index ffcc617b2..7a84a89ce 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala @@ -16,7 +16,7 @@ package za.co.absa.pramen.core.metastore.peristence -import org.apache.hadoop.fs.{FileUtil, Path} +import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.spark.sql.{DataFrame, SparkSession} import org.slf4j.LoggerFactory import za.co.absa.pramen.core.metastore.MetaTableStats @@ -39,9 +39,9 @@ class MetastorePersistenceRaw(path: String, (infoDateFrom, infoDateTo) match { case (Some(from), Some(to)) if from.isEqual(to) => - getListOfFiles(from).toDF("path") + getListOfFiles(from).map(_.getPath.toString).toDF("path") case (Some(from), Some(to)) => - getListOfFilesRange(from, to).toDF("path") + getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path") case _ => throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.") } @@ -82,7 +82,7 @@ class MetastorePersistenceRaw(path: String, } MetaTableStats( - files.length, + totalSize, Some(totalSize) ) } @@ -97,7 +97,7 @@ class MetastorePersistenceRaw(path: String, var totalSize = 0L files.foreach(file => { - totalSize += fsUtils.fs.getContentSummary(new Path(file)).getLength + totalSize += file.getLen }) MetaTableStats( @@ -119,13 +119,13 @@ class MetastorePersistenceRaw(path: String, throw new UnsupportedOperationException("Raw format does not support Hive tables.") } - private def getListOfFilesRange(infoDateFrom: LocalDate, infoDateTo: LocalDate): Seq[String] = { + private def getListOfFilesRange(infoDateFrom: LocalDate, infoDateTo: LocalDate): Seq[FileStatus] = { if (infoDateFrom.isAfter(infoDateTo)) - Seq.empty[String] + Seq.empty[FileStatus] else { val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path) var d = infoDateFrom - val files = mutable.ArrayBuffer.empty[String] + val files = mutable.ArrayBuffer.empty[FileStatus] while (d.isBefore(infoDateTo) || d.isEqual(infoDateTo)) { val subPath = SparkUtils.getPartitionPath(d, infoDateColumn, infoDateFormat, path) @@ -138,7 +138,7 @@ class MetastorePersistenceRaw(path: String, } } - private def getListOfFiles(infoDate: LocalDate): Seq[String] = { + private def getListOfFiles(infoDate: LocalDate): Seq[FileStatus] = { val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path) val subPath = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path) @@ -146,7 +146,7 @@ class MetastorePersistenceRaw(path: String, if (fsUtils.exists(new Path(path)) && !fsUtils.exists(subPath)) { // The absence of the partition folder means no data is there, which is okay quite often. // But fsUtils.getHadoopFiles() throws an exception that fails the job and dependent jobs in this case - Seq.empty[String] + Seq.empty[FileStatus] } else { fsUtils.getHadoopFiles(subPath).toSeq } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index fd69fd85e..f7fbe3c9e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.notification._ import za.co.absa.pramen.core.config.Keys.TIMEZONE import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException} import za.co.absa.pramen.core.notify.message._ -import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS} +import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_MEGABYTES, MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS} import za.co.absa.pramen.core.pipeline.TaskRunReason import za.co.absa.pramen.core.runner.task.RunStatus._ import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunStatus, TaskResult} @@ -37,6 +37,7 @@ import scala.collection.mutable.ListBuffer object PipelineNotificationBuilderHtml { val MIN_RPS_JOB_DURATION_SECONDS = 60 val MIN_RPS_RECORDS = 1000 + val MIN_MEGABYTES = 10 } class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNotificationBuilder { @@ -343,7 +344,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot tableHeaders.append(TableHeader(TextElement("Elapsed Time"), Align.Center)) if (outputSizeKnown) tableHeaders.append(TableHeader(TextElement("Size"), Align.Right)) - tableHeaders.append(TableHeader(TextElement("RPS"), Align.Right)) + tableHeaders.append(TableHeader(TextElement("Throughput"), Align.Right)) tableHeaders.append(TableHeader(TextElement("Saved at"), Align.Center)) tableHeaders.append(TableHeader(TextElement("Status"), Align.Center)) if (haveReasonColumn) @@ -375,8 +376,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot row.append(TextElement(getElapsedTime(task))) - if (outputSizeKnown) - row.append(TextElement(getOutputSize(task))) + if (task.isRawFilesJob) { + row.append(TextElement(getSizeText(task))) + } else { + if (outputSizeKnown) + row.append(TextElement(getOutputSize(task))) + } row.append(getThroughputRps(task)) row.append(TextElement(getFinishTime(task))) @@ -452,12 +457,10 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot case Some(runInfo) => val jobDuration = Duration.between(runInfo.started, runInfo.finished).getSeconds if (jobDuration > MIN_RPS_JOB_DURATION_SECONDS && recordCount >= MIN_RPS_RECORDS) { - val throughput = recordCount / jobDuration - - throughput match { - case n if n < minRps => TextElement(throughput.toString, Style.Warning) - case n if n >= goodRps => TextElement(throughput.toString, Style.Success) - case _ => TextElement(throughput.toString) + if (task.isRawFilesJob) { + getBytesPerSecondsText(recordCount, jobDuration) + } else { + getRpsText(recordCount, jobDuration) } } else TextElement("") @@ -465,6 +468,30 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } } + private[core] def getRpsText(recordOrByteCount: Long, jobDurationSeconds: Long): TextElement = { + val throughput = recordOrByteCount / jobDurationSeconds + val rps = s"${throughput.toString} r/s" + + throughput match { + case n if n < minRps => TextElement(rps, Style.Warning) + case n if n >= goodRps => TextElement(rps, Style.Success) + case _ => TextElement(rps) + } + } + + private[core] def getBytesPerSecondsText(totalBytesCount: Long, jobDurationSeconds: Long): TextElement = { + val MEGABYTE = 1024L * 1024L + + val sizeMb = totalBytesCount / MEGABYTE + + if (sizeMb < MIN_MEGABYTES) { + TextElement("") + } else { + val throughput = totalBytesCount / jobDurationSeconds + TextElement(s"${StringUtils.prettySize(throughput)}/s") + } + } + private[core] def getRecordCountText(task: TaskResult): String = { def renderDifference(numRecords: Long, numRecordsOld: Option[Long]): String = { numRecordsOld match { @@ -481,9 +508,36 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } } + if (task.isRawFilesJob) { + "-" + } else { + task.runStatus match { + case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld) + case d: InsufficientData => renderDifference(d.actual, d.recordCountOld) + case _ => "" + } + } + } + + private[core] def getSizeText(task: TaskResult): String = { + def renderDifferenceSize(numBytes: Long, numBytesOld: Option[Long]): String = { + numBytesOld match { + case Some(old) if old > 0 => + val diff = numBytes - old + if (diff > 0) + s"${StringUtils.prettySize(numBytes)} (+${StringUtils.prettySize(diff)})" + else if (diff < 0) + s"${StringUtils.prettySize(numBytes)} (-${StringUtils.prettySize(Math.abs(diff))})" + else { + StringUtils.prettySize(numBytes) + } + case _ => StringUtils.prettySize(numBytes) + } + } + task.runStatus match { - case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld) - case d: InsufficientData => renderDifference(d.actual, d.recordCountOld) + case s: Succeeded => renderDifferenceSize(s.recordCount, s.recordCountOld) + case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld) case _ => "" } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala index c39668ab9..2823ad794 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/jobrunner/ConcurrentJobRunnerImpl.scala @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.runner.jobrunner import com.github.yruslan.channel.{Channel, ReadChannel} import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.DataFormat import za.co.absa.pramen.core.app.config.RuntimeConfig import za.co.absa.pramen.core.bookkeeper.Bookkeeper import za.co.absa.pramen.core.exceptions.FatalErrorWrapper @@ -103,7 +104,8 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig, } private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = { - completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false)) + completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, + job.outputTable.format.isInstanceOf[DataFormat.Raw], Nil, Nil, Nil) :: Nil, false)) } private[core] def runJob(job: Job): Boolean = { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala index 37b15be30..b7f65ba4c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/orchestrator/OrchestratorImpl.scala @@ -20,6 +20,7 @@ import com.github.yruslan.channel.Channel import com.typesafe.config.Config import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory +import za.co.absa.pramen.api.DataFormat import za.co.absa.pramen.core.app.AppContext import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ValidationException} import za.co.absa.pramen.core.pipeline.{Job, JobDependency, OperationType} @@ -124,7 +125,8 @@ class OrchestratorImpl extends Orchestrator { val isTransient = job.outputTable.format.isTransient val isFailure = hasNonPassiveNonOptionalDeps(job, missingTables) - val taskResult = TaskResult(job, RunStatus.MissingDependencies(isFailure, missingTables), None, applicationId, isTransient, Nil, Nil, Nil) + val taskResult = TaskResult(job, RunStatus.MissingDependencies(isFailure, missingTables), None, applicationId, + isTransient, job.outputTable.format.isInstanceOf[DataFormat.Raw], Nil, Nil, Nil) state.addTaskCompletion(taskResult :: Nil) }) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala index 9c7bffb20..9d6454c38 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskResult.scala @@ -25,6 +25,7 @@ case class TaskResult( runInfo: Option[RunInfo], applicationId: String, isTransient: Boolean, + isRawFilesJob: Boolean, schemaChanges: Seq[SchemaDifference], dependencyWarnings: Seq[DependencyWarning], notificationTargetErrors: Seq[NotificationFailure] diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index a4a169c01..89679b6a2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -135,7 +135,7 @@ abstract class TaskRunnerBase(conf: Config, val runStatus = RunStatus.Skipped(reason, isWarning) val runInfo = RunInfo(task.infoDate, now, now) val isTransient = task.job.outputTable.format.isTransient - val taskResult = TaskResult(task.job, runStatus, Some(runInfo), applicationId, isTransient, Nil, Nil, Nil) + val taskResult = TaskResult(task.job, runStatus, Some(runInfo), applicationId, isTransient, isRawFilesJob = false, Nil, Nil, Nil) onTaskCompletion(task, taskResult, isLazy = false) } @@ -153,6 +153,7 @@ abstract class TaskRunnerBase(conf: Config, private[core] def preRunCheck(task: Task, started: Instant): Either[TaskResult, JobPreRunResult] = { val outputTable = task.job.outputTable.name val isTransient = task.job.outputTable.format.isTransient + val isRawFileBased = task.job.outputTable.format.isInstanceOf[DataFormat.Raw] Try { task.job.preRunCheck(task.infoDate, conf) @@ -167,23 +168,23 @@ abstract class TaskRunnerBase(conf: Config, Right(validationResult) case NoData(isFailure) => log.info(s"NO DATA available for the task: $outputTable for date: ${task.infoDate}.") - Left(TaskResult(task.job, RunStatus.NoData(isFailure), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, validationResult.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.NoData(isFailure), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil)) case InsufficientData(actual, expected, oldRecordCount) => log.info(s"INSUFFICIENT DATA available for the task: $outputTable for date: ${task.infoDate}. Expected = $expected, actual = $actual") - Left(TaskResult(task.job, RunStatus.InsufficientData(actual, expected, oldRecordCount), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, validationResult.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.InsufficientData(actual, expected, oldRecordCount), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil)) case AlreadyRan => if (runtimeConfig.isRerun) { log.info(s"RE-RUNNING the task: $outputTable for date: ${task.infoDate}.") Right(validationResult) } else { log.info(s"SKIPPING already ran job: $outputTable for date: ${task.infoDate}.") - Left(TaskResult(task.job, RunStatus.NotRan, getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, validationResult.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.NotRan, getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil)) } case Skip(msg) => log.info(s"SKIPPING job: $outputTable for date: ${task.infoDate}. Reason: msg") - Left(TaskResult(task.job, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, validationResult.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, validationResult.dependencyWarnings, Nil)) case FailedDependencies(isFailure, failures) => - Left(TaskResult(task.job, RunStatus.FailedDependencies(isFailure, failures), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, Nil, Nil)) + Left(TaskResult(task.job, RunStatus.FailedDependencies(isFailure, failures), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, Nil, Nil)) } if (validationResult.dependencyWarnings.nonEmpty) { log.warn(s"$WARNING Validation of the task: $outputTable for date: ${task.infoDate} has " + @@ -191,7 +192,7 @@ abstract class TaskRunnerBase(conf: Config, } resultToReturn case Failure(ex) => - Left(TaskResult(task.job, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, Nil, Nil)) + Left(TaskResult(task.job, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, Nil, Nil)) } } @@ -208,6 +209,7 @@ abstract class TaskRunnerBase(conf: Config, private[core] def validate(task: Task, started: Instant): Either[TaskResult, JobPreRunResult] = { val outputTable = task.job.outputTable.name val isTransient = task.job.outputTable.format.isTransient + val isRawFileBased = task.job.outputTable.format.isInstanceOf[DataFormat.Raw] preRunCheck(task, started) match { case Left(result) => @@ -226,20 +228,20 @@ abstract class TaskRunnerBase(conf: Config, Right(status.copy(warnings = reason.warnings)) case Reason.NotReady(msg) => log.info(s"NOT READY validation failure for the task: $outputTable for date: ${task.infoDate}. Reason: $msg") - Left(TaskResult(task.job, RunStatus.ValidationFailed(new ReasonException(Reason.NotReady(msg), msg)), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, status.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.ValidationFailed(new ReasonException(Reason.NotReady(msg), msg)), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil)) case Reason.Skip(msg) => log.info(s"SKIP validation failure for the task: $outputTable for date: ${task.infoDate}. Reason: $msg") if (bookkeeper.getLatestDataChunk(outputTable, task.infoDate, task.infoDate).isEmpty) { val isTransient = task.job.outputTable.format.isTransient bookkeeper.setRecordCount(outputTable, task.infoDate, task.infoDate, task.infoDate, status.inputRecordsCount.getOrElse(0L), 0, started.getEpochSecond, Instant.now().getEpochSecond, isTransient) } - Left(TaskResult(task.job, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, status.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil)) case Reason.SkipOnce(msg) => log.info(s"SKIP today validation failure for the task: $outputTable for date: ${task.infoDate}. Reason: $msg") - Left(TaskResult(task.job, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, status.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.Skipped(msg), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil)) } case Failure(ex) => - Left(TaskResult(task.job, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, Nil, status.dependencyWarnings, Nil)) + Left(TaskResult(task.job, RunStatus.ValidationFailed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, isRawFileBased, Nil, status.dependencyWarnings, Nil)) } } } @@ -255,6 +257,7 @@ abstract class TaskRunnerBase(conf: Config, */ private[core] def run(task: Task, started: Instant, validationResult: JobPreRunResult): TaskResult = { val isTransient = task.job.outputTable.format.isTransient + val isRawFileBased = task.job.outputTable.format.isInstanceOf[DataFormat.Raw] val lock = lockFactory.getLock(getTokenName(task)) val attempt = try { @@ -336,6 +339,7 @@ abstract class TaskRunnerBase(conf: Config, Some(RunInfo(task.infoDate, started, finished)), applicationId, isTransient, + isRawFileBased, schemaChangesBeforeTransform ::: schemaChangesAfterTransform, validationResult.dependencyWarnings, Seq.empty) @@ -352,8 +356,8 @@ abstract class TaskRunnerBase(conf: Config, case Success(result) => result case Failure(ex) => - TaskResult(task.job, RunStatus.Failed(ex), getRunInfo(task.infoDate, started), applicationId, isTransient, - Nil,validationResult.dependencyWarnings, Nil) + TaskResult(task.job, RunStatus.Failed(ex), getRunInfo(task.infoDate, started), applicationId, + isTransient, isRawFileBased, Nil,validationResult.dependencyWarnings, Nil) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala index 9e3bcf907..77943376f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/RawFileSource.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.source import com.typesafe.config.Config -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileStatus, FileSystem, Path} import org.apache.spark.sql.SparkSession import org.slf4j.LoggerFactory import za.co.absa.pramen.api._ @@ -98,19 +98,21 @@ class RawFileSource(val sourceConfig: Config, } override def getRecordCount(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Long = { - getPaths(query, infoDateBegin, infoDateEnd).length + getPaths(query, infoDateBegin, infoDateEnd) + .map(_.getLen) + .sum } override def getData(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate, columns: Seq[String]): SourceResult = { val files = getPaths(query, infoDateBegin, infoDateEnd) - val df = files.toDF(PATH_FIELD) - val fileNames = files.map(fullPath => new Path(fullPath).getName) + val df = files.map(_.getPath.toString).toDF(PATH_FIELD) + val fileNames = files.map(_.getPath.getName).sorted SourceResult(df, fileNames) } @throws[FileNotFoundException] - private[source] def getPaths(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[String] = { + private[source] def getPaths(query: Query, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[FileStatus] = { query match { case Query.Path(pathPattern) => getPatternBasedFilesForRange(pathPattern, infoDateBegin, infoDateEnd) case Query.Custom(options) => getMultiList(options) @@ -119,14 +121,14 @@ class RawFileSource(val sourceConfig: Config, } @throws[FileNotFoundException] - private[source] def getPatternBasedFilesForRange(pathPattern: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[String] = { + private[source] def getPatternBasedFilesForRange(pathPattern: String, infoDateBegin: LocalDate, infoDateEnd: LocalDate): Seq[FileStatus] = { if (!pathPattern.contains("{{") || infoDateBegin.isEqual(infoDateEnd)) { getListOfFilesForPathPattern(pathPattern, infoDateBegin, caseSensitivePattern) } else { if (infoDateBegin.isAfter(infoDateEnd)) { throw new IllegalArgumentException(s"Begin date is more recent than the end date: $infoDateBegin > $infoDateEnd.") } - val files = new ListBuffer[String] + val files = new ListBuffer[FileStatus] var date = infoDateBegin while (date.isBefore(infoDateEnd) || date.isEqual(infoDateEnd)) { files ++= getListOfFilesForPathPattern(pathPattern, date, caseSensitivePattern) @@ -152,14 +154,19 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] { new RawFileSource(conf, options)(spark) } - private[core] def getMultiList(options: Map[String, String]): Seq[String] = { + private[core] def getMultiList(options: Map[String, String])(implicit spark: SparkSession): Seq[FileStatus] = { var i = 1 - val files = new ListBuffer[String] + val files = new ListBuffer[FileStatus] + var fs: FileSystem = null while (options.contains(s"$FILE_PREFIX.$i")) { - val filePath = options(s"$FILE_PREFIX.$i") + val filePath = new Path(options(s"$FILE_PREFIX.$i")) - files += filePath + if (fs == null) { + fs = filePath.getFileSystem(spark.sparkContext.hadoopConfiguration) + } + + files += fs.getFileStatus(filePath) i += 1 } @@ -170,7 +177,7 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] { private[core] def getListOfFilesForPathPattern(pathPattern: String, infoDate: LocalDate, caseSensitive: Boolean) - (implicit spark: SparkSession): Seq[String] = { + (implicit spark: SparkSession): Seq[FileStatus] = { val globPattern = getGlobPattern(pathPattern, infoDate) log.info(s"Using the following pattern for '$infoDate': $globPattern") @@ -179,7 +186,7 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] { @throws[FileNotFoundException] - private[core] def getListOfFiles(pathPattern: String, caseSensitive: Boolean)(implicit spark: SparkSession): Seq[String] = { + private[core] def getListOfFiles(pathPattern: String, caseSensitive: Boolean)(implicit spark: SparkSession): Seq[FileStatus] = { val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, pathPattern) val hadoopPath = new Path(pathPattern) @@ -192,13 +199,13 @@ object RawFileSource extends ExternalChannelFactory[RawFileSource] { try { if (caseSensitive) { log.info(s"Using case-sensitive Hadoop file search.") - fsUtils.getHadoopFiles(hadoopPath, includeHiddenFiles = true).sorted + fsUtils.getHadoopFiles(hadoopPath, includeHiddenFiles = true) } else { log.info(s"Using case-insensitive Hadoop file search.") - fsUtils.getHadoopFilesCaseInsensitive(hadoopPath, includeHiddenFiles = true).sorted + fsUtils.getHadoopFilesCaseInsensitive(hadoopPath, includeHiddenFiles = true) } } catch { - case ex: IllegalArgumentException if ex.getMessage.contains("Input path does not exist") => Seq.empty[String] + case ex: IllegalArgumentException if ex.getMessage.contains("Input path does not exist") => Seq.empty[FileStatus] } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala index c87eae720..84ed4ceb8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/source/SparkSource.scala @@ -57,7 +57,7 @@ class SparkSource(val format: Option[String], case _: Query.Sql => Array.empty[String] case Query.Path(path) => val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path) - fsUtils.getHadoopFiles(new Path(path)) + fsUtils.getHadoopFiles(new Path(path)).map(_.getPath.toString).sorted case other => throw new IllegalArgumentException(s"'${other.name}' is not supported by the Spark source. Use 'path', 'table' or 'sql' instead.") } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FsUtils.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FsUtils.scala index f9d7ec801..dd788df37 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FsUtils.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/utils/FsUtils.scala @@ -676,7 +676,7 @@ class FsUtils(conf: Configuration, pathBase: String) { * * The glob pattern is supported. Maximum depth of recursivity is 1. */ - def getHadoopFiles(path: Path, includeHiddenFiles: Boolean = false): Array[String] = { + def getHadoopFiles(path: Path, includeHiddenFiles: Boolean = false): Array[FileStatus] = { val fileFilter = if (includeHiddenFiles) anyFileFilter else hiddenFileFilter val stats: Array[FileStatus] = fs.globStatus(path, fileFilter) @@ -694,7 +694,7 @@ class FsUtils(conf: Configuration, pathBase: String) { } }) - allFiles.map(_.getPath.toString).toArray[String] + allFiles.toArray[FileStatus] } /** @@ -704,7 +704,7 @@ class FsUtils(conf: Configuration, pathBase: String) { * * The glob pattern is supported. Maximum depth of recursivity is 1. */ - def getHadoopFilesCaseInsensitive(path: Path, includeHiddenFiles: Boolean = false): Array[String] = { + def getHadoopFilesCaseInsensitive(path: Path, includeHiddenFiles: Boolean = false): Array[FileStatus] = { def containsWildcard(input: String): Boolean = { val wildcardPattern = "[*?{}!]".r wildcardPattern.findFirstIn(input).isDefined @@ -735,7 +735,7 @@ class FsUtils(conf: Configuration, pathBase: String) { } }) - allFiles.map(_.getPath.toString).toArray[String] + allFiles.toArray[FileStatus] } } diff --git a/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt b/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt index 84aad3cfe..d9eadd373 100644 --- a/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt +++ b/pramen/core/src/test/resources/test/notify/test_pipeline_email_body_complex.txt @@ -114,7 +114,7 @@ Record Count Elapsed Time Size -RPS +Throughput Saved at Status Reason @@ -125,7 +125,7 @@ 20000 (+10000) 01:14:04 97 KiB -4 +4 r/s 1970-01-01 03:34 +0200 Warning Test warning diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala index 3da0371a8..f72b129c0 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala @@ -29,6 +29,7 @@ object TaskResultFactory { runInfo: Option[RunInfo] = Some(RunInfo(LocalDate.of(2022, 2, 18), Instant.ofEpochSecond(1234), Instant.ofEpochSecond(5678))), applicationId: String = "app_123", isTransient: Boolean = false, + isRawFilesJob: Boolean = false, schemaDifferences: Seq[SchemaDifference] = Nil, dependencyWarnings: Seq[DependencyWarning] = Nil, notificationTargetErrors: Seq[NotificationFailure] = Nil): TaskResult = { @@ -37,6 +38,7 @@ object TaskResultFactory { runInfo, applicationId, isTransient, + isRawFilesJob, schemaDifferences, dependencyWarnings, notificationTargetErrors) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala index a7a2626a7..6c46ed5f2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala @@ -74,7 +74,7 @@ class ConcurrentJobRunnerSpy(includeFails: Boolean = false, RunStatus.NoData(isNoDataFailure) } - val taskResult = TaskResult(job, status, Some(RunInfo(infoDate, started, finished)), "app_123", isTransient = false, Nil, Nil, Nil) + val taskResult = TaskResult(job, status, Some(RunInfo(infoDate, started, finished)), "app_123", isTransient = false, isRawFilesJob = false, Nil, Nil, Nil) completedJobsChannel.send((job, taskResult :: Nil, taskResult.runStatus.isInstanceOf[RunStatus.Succeeded] || taskResult.runStatus == RunStatus.NotRan)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala index 9526cdcf1..482b340ca 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/source/RawFileSourceSuite.scala @@ -150,7 +150,7 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir val count = source.getRecordCount(Query.Path(filesPath.toString), null, null) - assert(count == 3) + assert(count == 9) } "return the number of files in the directory for a path pattern" in { @@ -158,7 +158,7 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir val count = source.getRecordCount(Query.Path(filesPattern.toString), infoDate, infoDate) - assert(count == 2) + assert(count == 6) } "return the number of files in the directory for a path pattern and range query" in { @@ -166,17 +166,20 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir val count = source.getRecordCount(Query.Path(filesPattern.toString), infoDate, infoDate.plusDays(1)) - assert(count == 3) + assert(count == 9) } "return the number of files in the directory for a list of files" in { val source = new RawFileSource(emptyConfig, null)(spark) - val options = Map("file.1" -> "f1.dat", "file.2" -> "f2.dat") + val options = Map( + "file.1" -> new Path(filesPath, "1.dat").toString, + "file.2" -> new Path(filesPath, "2.dat").toString + ) val count = source.getRecordCount(Query.Custom(options), null, null) - assert(count == 2) + assert(count == 7) } "thrown an exception when parent path does not exist" in { @@ -271,18 +274,21 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir "return the list of files for a list of files" in { val source = new RawFileSource(emptyConfig, null)(spark) - val options = Map("file.1" -> "f1.dat", "file.2" -> "f2.dat") + val options = Map( + "file.1" -> new Path(filesPath, "1.dat").toString, + "file.2" -> new Path(filesPath, "2.dat").toString + ) val result = source.getData(Query.Custom(options), null, null, null) val filesInDf = result.data.collect().map(_.toString()) val filesRead = result.filesRead - assert(filesInDf.exists(_.contains("f1.dat"))) - assert(filesInDf.exists(_.contains("f2.dat"))) + assert(filesInDf.exists(_.contains("1.dat"))) + assert(filesInDf.exists(_.contains("2.dat"))) - assert(filesRead.exists(_.contains("f1.dat"))) - assert(filesRead.exists(_.contains("f2.dat"))) + assert(filesRead.exists(_.contains("1.dat"))) + assert(filesRead.exists(_.contains("2.dat"))) } } @@ -291,7 +297,7 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir val source = new RawFileSource(emptyConfig, null)(spark) val files = source.getPaths(Query.Path(filesPath.toString), infoDate, infoDate) - .map(path => new Path(path).getName) + .map(_.getPath.getName) assert(files.length == 3) @@ -303,9 +309,13 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir "work for a list of files" in { val source = new RawFileSource(emptyConfig, null)(spark) - val options = Map("file.1" -> "1.dat", "file.2" -> "_3.dat") + val options = Map( + "file.1" -> new Path(filesPath, "1.dat").toString, + "file.2" -> new Path(filesPath, "_3.dat").toString + ) + val files = source.getPaths(Query.Custom(options), infoDate, infoDate) - .map(path => new Path(path).getName) + .map(_.getPath.getName) assert(files.length == 2) @@ -327,7 +337,7 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir val source = new RawFileSource(emptyConfig, null)(spark) val files = source.getPatternBasedFilesForRange(filesPatternPath.toString, infoDate, infoDate.plusDays(1)) - .map(path => new Path(path).getName) + .map(_.getPath.getName) assert(files.length == 5) @@ -352,7 +362,7 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir "return the list of files for the pattern" in { val files = RawFileSource.getListOfFiles(specificPattern.toString, caseSensitive = true) - .map(path => new Path(path).getName) + .map(_.getPath.getName) assert(files.length == 2) @@ -363,7 +373,7 @@ class RawFileSourceSuite extends AnyWordSpec with BeforeAndAfterAll with TempDir "return the list of files for the case insensitive pattern" in { val caseInsensitivePattern = new Path(filesPatternPath, "FILE_test_2022-02-18*.Dat") val files = RawFileSource.getListOfFiles(caseInsensitivePattern.toString, caseSensitive = false) - .map(path => new Path(path).getName) + .map(_.getPath.getName) assert(files.length == 3) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala index 47a9f18fd..f34e89677 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala @@ -39,6 +39,7 @@ class TaskCompletedSuite extends AnyWordSpec { Some(RunInfo(infoDate, now.minusSeconds(10), now)), "app_123", isTransient = false, + isRawFilesJob = false, Nil, Nil, Nil) @@ -72,6 +73,7 @@ class TaskCompletedSuite extends AnyWordSpec { None, "app_123", isTransient = false, + isRawFilesJob = false, Nil, Nil, Nil) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/FsUtilsSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/FsUtilsSuite.scala index 86a78bdd6..7249c81d2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/FsUtilsSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/utils/FsUtilsSuite.scala @@ -908,7 +908,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { val files = fsUtils.getHadoopFiles(innerPath) assert(files.length == 1) - assert(files.head.endsWith("data1.bin")) + assert(files.head.getPath.toString.endsWith("data1.bin")) } } @@ -944,7 +944,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { assert(filesByPath.isEmpty) assert(filesWithMask.length == 1) - assert(filesWithMask.head.endsWith("data1.bin")) + assert(filesWithMask.head.getPath.toString.endsWith("data1.bin")) } } @@ -960,7 +960,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { val files = fsUtils.getHadoopFiles(innerPath) assert(files.length == 1) - assert(files.head.endsWith("data1.bin")) + assert(files.head.getPath.toString.endsWith("data1.bin")) } } @@ -977,7 +977,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { val files = fsUtils.getHadoopFiles(innerFile) assert(files.length == 1) - assert(files.head.endsWith("data1.bin")) + assert(files.head.getPath.toString.endsWith("data1.bin")) } } } @@ -1006,6 +1006,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { createBinFile(innerPath.toString, "_data3.BiN", 100) val files = fsUtils.getHadoopFilesCaseInsensitive(filePattern, includeHiddenFiles = true) + .map(_.getPath.toString) assert(files.length == 3) assert(files.exists(_.endsWith("Adata1.BIN"))) @@ -1029,7 +1030,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { val files = fsUtils.getHadoopFilesCaseInsensitive(filePattern) assert(files.length == 1) - assert(files.head.endsWith("Adata1.BiN")) + assert(files.head.getPath.toString.endsWith("Adata1.BiN")) } } @@ -1045,7 +1046,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { val files = fsUtils.getHadoopFilesCaseInsensitive(innerPath) assert(files.length == 1) - assert(files.head.endsWith("data1.bin")) + assert(files.head.getPath.toString.endsWith("data1.bin")) } } @@ -1062,7 +1063,7 @@ class FsUtilsSuite extends AnyWordSpec with SparkTestBase with TempDirFixture { val files = fsUtils.getHadoopFilesCaseInsensitive(innerFile) assert(files.length == 1) - assert(files.head.endsWith("data1.bin")) + assert(files.head.getPath.toString.endsWith("data1.bin")) } } } From 90c2a7d17de83225aeb035af8060f22fa3d92c7c Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 13 Mar 2024 11:39:59 +0100 Subject: [PATCH 2/2] #366 Improve unit test coverage of the new feature. --- .../PipelineNotificationBuilderHtml.scala | 22 +- .../pramen/core/mocks/RunStatusFactory.scala | 44 ++++ ...PipelineNotificationBuilderHtmlSuite.scala | 236 +++++++++++++++++- 3 files changed, 281 insertions(+), 21 deletions(-) create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index f7fbe3c9e..4f494d6ac 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -322,10 +322,13 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot case _ => false }) - val outputSizeKnown = tasks.exists(t => t.runStatus match { - case s: Succeeded => s.sizeBytes.isDefined - case _ => false - }) + val outputSizeKnown = tasks.exists { t => + val hasExplicitSize = t.runStatus match { + case s: Succeeded => s.sizeBytes.isDefined + case _ => false + } + t.isRawFilesJob || hasExplicitSize + } val haveReasonColumn = tasks.exists(t => t.runStatus.getReason().nonEmpty || t.dependencyWarnings.nonEmpty) val haveHiveColumn = tasks.exists(t => t.runStatus.isInstanceOf[Succeeded] && t.runStatus.asInstanceOf[Succeeded].hiveTablesUpdated.nonEmpty) @@ -376,11 +379,12 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot row.append(TextElement(getElapsedTime(task))) - if (task.isRawFilesJob) { - row.append(TextElement(getSizeText(task))) - } else { - if (outputSizeKnown) + if (outputSizeKnown) { + if (task.isRawFilesJob) { + row.append(TextElement(getSizeText(task))) + } else { row.append(TextElement(getOutputSize(task))) + } } row.append(getThroughputRps(task)) @@ -500,7 +504,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot if (diff > 0) s"$numRecords (+$diff)" else if (diff < 0) - s"$numRecords (-$diff)" + s"$numRecords ($diff)" else { numRecords.toString } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala new file mode 100644 index 000000000..610901625 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.mocks + +import za.co.absa.pramen.core.pipeline.TaskRunReason +import za.co.absa.pramen.core.runner.task.RunStatus + +object RunStatusFactory { + def getDummySuccess(recordCountOld: Option[Long] = None, + recordCount: Long = 1000, + sizeBytes: Option[Long] = None, + reason: TaskRunReason = TaskRunReason.New, + filesRead: Seq[String] = Nil, + filesWritten: Seq[String] = Nil, + hiveTablesUpdated: Seq[String] = Nil, + warnings: Seq[String] = Nil): RunStatus.Succeeded = { + RunStatus.Succeeded(recordCountOld, + recordCount, + sizeBytes, + reason, + filesRead, + filesWritten, + hiveTablesUpdated, + warnings) + } + + def getDummyFailure(ex: Throwable = new RuntimeException("Dummy failure")): RunStatus.Failed = { + RunStatus.Failed(ex) + } +} diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index f5e518caf..5d059d822 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -22,15 +22,18 @@ import za.co.absa.pramen.api.notification.NotificationEntry.Paragraph import za.co.absa.pramen.api.notification._ import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException} import za.co.absa.pramen.core.fixtures.TextComparisonFixture -import za.co.absa.pramen.core.mocks.{SchemaDifferenceFactory, TaskResultFactory, TestPrototypes} +import za.co.absa.pramen.core.mocks.{RunStatusFactory, SchemaDifferenceFactory, TaskResultFactory, TestPrototypes} import za.co.absa.pramen.core.notify.message.{MessageBuilderHtml, ParagraphBuilder} import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml +import za.co.absa.pramen.core.pipeline.TaskRunReason import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunStatus} import za.co.absa.pramen.core.utils.ResourceUtils import java.time.{Instant, LocalDate} class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparisonFixture { + private val megabyte = 1024L * 1024L + "constructor" should { "be able to initialize the builder with the default timezone" in { implicit val conf: Config = ConfigFactory.parseString( @@ -85,8 +88,8 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis val builder = getBuilder builder.addAppName("MyNewApp") - builder.addCompletedTask(TaskResultFactory.getDummyTaskResult( runStatus = TestPrototypes.runStatusWarning)) - builder.addCompletedTask(TaskResultFactory.getDummyTaskResult( runStatus = TestPrototypes.runStatusFailure)) + builder.addCompletedTask(TaskResultFactory.getDummyTaskResult(runStatus = TestPrototypes.runStatusWarning)) + builder.addCompletedTask(TaskResultFactory.getDummyTaskResult(runStatus = TestPrototypes.runStatusFailure)) assert(builder.renderSubject().startsWith("Notification of PARTIAL SUCCESS for MyNewApp at")) } @@ -241,14 +244,14 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis .renderBody assert(actual.contains( - """
Command line failed
-        |Last stdout lines:
-        |stdout line 1
-        |stdout line 2
-        |
-        |Last stderr lines:
-        |stderr line 1
-        |
""".stripMargin.replaceAll("\\r\\n", "\\n") + """
Command line failed
+          |Last stdout lines:
+          |stdout line 1
+          |stdout line 2
+          |
+          |Last stderr lines:
+          |stderr line 1
+          |
""".stripMargin.replaceAll("\\r\\n", "\\n") )) } } @@ -323,7 +326,216 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis val actual = builder.getStatus(TaskResultFactory.getDummyTaskResult(runStatus = RunStatus.Failed(new RuntimeException("dummy")))) - assert(actual ==TextElement("Failed", Style.Exception)) + assert(actual == TextElement("Failed", Style.Exception)) + } + } + + "getThroughputRps" should { + "work for a failed tasks" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummyFailure() + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getThroughputRps(task) + + assert(actual.text.isEmpty) + } + + "work for a task without a run info" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, reason = TaskRunReason.New) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, runInfo = None) + + val actual = builder.getThroughputRps(task) + + assert(actual.text.isEmpty) + } + + "work for a normal successful task" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, reason = TaskRunReason.New) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getThroughputRps(task) + + assert(actual.text == "225 r/s") + } + + "work for a raw file task" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(None, 1000 * megabyte, reason = TaskRunReason.New) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) + + val actual = builder.getThroughputRps(task) + + assert(actual.text == "230 KiB/s") + } + } + + "getBytesPerSecondsText" should { + "return an empty string for files smaller than the minimum size" in { + val builder = getBuilder + + val actual = builder.getBytesPerSecondsText(1000, 10) + + assert(actual.text.isEmpty) + } + + "return the throughput for usual inputs" in { + val builder = getBuilder + + val actual = builder.getBytesPerSecondsText(100L * 1024L * 1024L, 10) + + assert(actual.text == "10 MiB/s") + } + } + + "getRecordCountText" should { + "work for a failure" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummyFailure() + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getRecordCountText(task) + + assert(actual.isEmpty) + } + + "work for success file based job" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100), 100, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) + + val actual = builder.getRecordCountText(task) + + assert(actual == "-") + } + + "work for success new" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(None, 100, reason = TaskRunReason.New) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getRecordCountText(task) + + assert(actual == "100") + } + + "work for success unchanged" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100), 100, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getRecordCountText(task) + + assert(actual == "100") + } + + "work for success increased" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100), 110, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getRecordCountText(task) + + assert(actual == "110 (+10)") + } + + "work for success decreased" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100), 90, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getRecordCountText(task) + + assert(actual == "90 (-10)") + } + + "work for insufficient data" in { + val builder = getBuilder + + val task = TaskResultFactory.getDummyTaskResult(runStatus = RunStatus.InsufficientData(90, 96, Some(100))) + + val actual = builder.getRecordCountText(task) + + assert(actual == "90 (-10)") + } + } + + "getSizeText" should { + "work for a failure" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummyFailure() + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) + + val actual = builder.getSizeText(task) + + assert(actual.isEmpty) + } + + "work for success new" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(None, 100 * megabyte, reason = TaskRunReason.New) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) + + val actual = builder.getSizeText(task) + + assert(actual == "100 MiB") + } + + "work for success unchanged" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), 100 * megabyte, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) + + val actual = builder.getSizeText(task) + + assert(actual == "100 MiB") + } + + "work for success increased" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), 110 * megabyte, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) + + val actual = builder.getSizeText(task) + + assert(actual == "110 MiB (+10 MiB)") + } + + "work for success decreased" in { + val builder = getBuilder + + val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), 90 * megabyte, reason = TaskRunReason.Update) + val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) + + val actual = builder.getSizeText(task) + + assert(actual == "90 MiB (-10 MiB)") + } + + "work for insufficient data" in { + val builder = getBuilder + + val task = TaskResultFactory.getDummyTaskResult(runStatus = RunStatus.InsufficientData(90 * megabyte, 96 * megabyte, Some(100 * megabyte)), isRawFilesJob = true) + + val actual = builder.getSizeText(task) + + assert(actual == "90 MiB (-10 MiB)") } }