diff --git a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala index da39e6d61..970817a4a 100644 --- a/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala +++ b/pramen/api/src/main/scala/za/co/absa/pramen/api/status/RunStatus.scala @@ -24,7 +24,7 @@ sealed trait RunStatus { object RunStatus { case class Succeeded(recordCountOld: Option[Long], - recordCount: Long, + recordCount: Option[Long], recordsAppended: Option[Long], sizeBytes: Option[Long], reason: TaskRunReason, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala index 447b472b1..f0b4cc872 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalHadoopCsv.scala @@ -96,6 +96,8 @@ class JournalHadoopCsv(journalPath: String) val periodEnd = t.periodEnd.format(dateFormatter) val infoDate = t.informationDate.format(dateFormatter) + val inputRecordCount = t.inputRecordCount.map(_.toString).getOrElse("") + val inputRecordCountOld = t.inputRecordCountOld.map(_.toString).getOrElse("") val outputRecordCount = t.outputRecordCount.map(_.toString).getOrElse("") val outputRecordCountOld = t.outputRecordCountOld.map(_.toString).getOrElse("") val appendedRecordCount = t.appendedRecordCount.map(_.toString).getOrElse("") @@ -106,8 +108,8 @@ class JournalHadoopCsv(journalPath: String) periodBegin :: periodEnd :: infoDate :: - t.inputRecordCount :: - t.inputRecordCountOld :: + inputRecordCount :: + inputRecordCountOld :: outputRecordCount :: outputRecordCountOld :: appendedRecordCount :: diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala index 9f55017c9..37834d8f9 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/JournalJdbc.scala @@ -41,8 +41,8 @@ class JournalJdbc(db: Database) extends Journal { periodBegin, periodEnd, infoDate, - entry.inputRecordCount, - entry.inputRecordCountOld, + entry.inputRecordCount.getOrElse(-1L), + entry.inputRecordCountOld.getOrElse(-1L), entry.outputRecordCount, entry.outputRecordCountOld, entry.appendedRecordCount, @@ -73,14 +73,16 @@ class JournalJdbc(db: Database) extends Journal { val entries = SlickUtils.executeQuery(db, JournalTasks.journalTasks.filter(d => d.finishedAt >= fromSec && d.finishedAt <= toSec )) entries.map(v => { + val recordCountOpt = if (v.inputRecordCount < 0) None else Option(v.inputRecordCount) + val recordCountOldOpt = if (v.inputRecordCountOld < 0) None else Option(v.inputRecordCountOld) model.TaskCompleted( jobName = v.jobName, tableName = v.pramenTableName, periodBegin = LocalDate.parse(v.periodBegin, dateFormatter), periodEnd = LocalDate.parse(v.periodEnd, dateFormatter), informationDate = LocalDate.parse(v.informationDate, dateFormatter), - inputRecordCount = v.inputRecordCount, - inputRecordCountOld = v.inputRecordCountOld, + inputRecordCount = recordCountOpt, + inputRecordCountOld = recordCountOldOpt, outputRecordCount = v.outputRecordCount, outputRecordCountOld = v.outputRecordCountOld, appendedRecordCount = v.appendedRecordCount, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala index 4b47d895e..47857281e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompleted.scala @@ -29,8 +29,8 @@ case class TaskCompleted( periodBegin: LocalDate, periodEnd: LocalDate, informationDate: LocalDate, - inputRecordCount: Long, - inputRecordCountOld: Long, + inputRecordCount: Option[Long], + inputRecordCountOld: Option[Long], outputRecordCount: Option[Long], outputRecordCountOld: Option[Long], appendedRecordCount: Option[Long], @@ -60,8 +60,8 @@ object TaskCompleted { val sparkApplicationId = Option(taskResult.applicationId) val (recordCountOld, inputRecordCount, outputRecordCount, sizeBytes, appendedRecords) = taskResult.runStatus match { - case s: Succeeded => (s.recordCountOld, s.recordCount, Some(s.recordCount), s.sizeBytes, s.recordsAppended) - case _ => (None, 0L, None, None, None) + case s: Succeeded => (s.recordCountOld, s.recordCount, s.recordCount, s.sizeBytes, s.recordsAppended) + case _ => (None, None, None, None, None) } TaskCompleted( @@ -71,7 +71,7 @@ object TaskCompleted { task.infoDate, task.infoDate, inputRecordCount, - recordCountOld.getOrElse(0L), + recordCountOld, outputRecordCount, recordCountOld, appendedRecords, diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala index 6bbdbe313..8ac82f9a2 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/journal/model/TaskCompletedCsv.scala @@ -22,8 +22,8 @@ case class TaskCompletedCsv( periodBegin: String, periodEnd: String, informationDate: String, - inputRecordCount: Long, - inputRecordCountOld: Long, + inputRecordCount: Option[Long], + inputRecordCountOld: Option[Long], outputRecordCount: Option[Long], outputRecordCountOld: Option[Long], outputSize: Option[Long], diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetaTableStats.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetaTableStats.scala index ea98ed2ba..a18f9359b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetaTableStats.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetaTableStats.scala @@ -17,7 +17,7 @@ package za.co.absa.pramen.core.metastore case class MetaTableStats( - recordCount: Long, + recordCount: Option[Long], recordCountAppended: Option[Long], dataSizeBytes: Option[Long] ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala index 2696f6034..c53170a29 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala @@ -112,7 +112,7 @@ class MetastoreImpl(appConfig: Config, val isTransient = mt.format.isTransient val start = Instant.now.getEpochSecond - var stats = MetaTableStats(0, None, None) + var stats = MetaTableStats(Some(0), None, None) withSparkConfig(mt.sparkConfig) { stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount) @@ -122,8 +122,10 @@ class MetastoreImpl(appConfig: Config, val nothingAppended = stats.recordCountAppended.contains(0) - if (!skipBookKeepingUpdates && !nothingAppended) { - bookkeeper.setRecordCount(tableName, infoDate, infoDate, infoDate, inputRecordCount.getOrElse(stats.recordCount), stats.recordCount, start, finish, isTransient) + stats.recordCount.foreach{recordCount => + if (!skipBookKeepingUpdates && !nothingAppended) { + bookkeeper.setRecordCount(tableName, infoDate, infoDate, infoDate, inputRecordCount.getOrElse(recordCount), recordCount, start, finish, isTransient) + } } stats diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala index 672b6ee9a..273b18b9c 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceDelta.scala @@ -120,18 +120,18 @@ class MetastorePersistenceDelta(query: Query, case Some(size) => stats.recordCountAppended match { case Some(recordsAppended) => - log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " + + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount.get}, " + s"new size: ${StringUtils.prettySize(size)}) to ${query.query}") case None => - log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " + + log.info(s"$SUCCESS Successfully saved ${stats.recordCount.get} records " + s"(${StringUtils.prettySize(size)}) to ${query.query}") } case None => stats.recordCountAppended match { case Some(recordsAppended) => - log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount} to ${query.query}") + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount.get} to ${query.query}") case None => - log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records to ${query.query}") + log.info(s"$SUCCESS Successfully saved ${stats.recordCount.get} records to ${query.query}") } } @@ -160,11 +160,11 @@ class MetastorePersistenceDelta(query: Query, val batchCount = df.filter(col(batchIdColumn) === batchId).count() val countAll = df.count() - MetaTableStats(countAll, Option(batchCount), sizeOpt) + MetaTableStats(Option(countAll), Option(batchCount), sizeOpt) } else { val countAll = df.count() - MetaTableStats(countAll, None, sizeOpt) + MetaTableStats(Option(countAll), None, sizeOpt) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala index b26c302ef..e5e5fd40f 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceParquet.scala @@ -100,10 +100,10 @@ class MetastorePersistenceParquet(path: String, stats.recordCountAppended match { case Some(recordsAppended) => - log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount}, " + + log.info(s"$SUCCESS Successfully saved $recordsAppended records (new count: ${stats.recordCount.get}, " + s"new size: ${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir") case None => - log.info(s"$SUCCESS Successfully saved ${stats.recordCount} records " + + log.info(s"$SUCCESS Successfully saved ${stats.recordCount.get} records " + s"(${StringUtils.prettySize(stats.dataSizeBytes.get)}) to $outputDir") } @@ -123,11 +123,11 @@ class MetastorePersistenceParquet(path: String, val batchCount = df.filter(col(batchIdColumn) === batchId).count() val countAll = df.count() - MetaTableStats(countAll, Option(batchCount), Option(size)) + MetaTableStats(Option(countAll), Option(batchCount), Option(size)) } else { val countAll = df.count() - MetaTableStats(countAll, None, Option(size)) + MetaTableStats(Option(countAll), None, Option(size)) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala index ce716ee24..ed95b41a8 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala @@ -88,7 +88,7 @@ class MetastorePersistenceRaw(path: String, } MetaTableStats( - totalSize, + Option(totalSize), None, Some(totalSize) ) @@ -108,7 +108,7 @@ class MetastorePersistenceRaw(path: String, }) MetaTableStats( - files.length, + Option(files.length), None, Some(totalSize) ) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceTransientEager.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceTransientEager.scala index 88c73a0ed..961b1133d 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceTransientEager.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceTransientEager.scala @@ -54,8 +54,13 @@ class MetastorePersistenceTransientEager(tempPathOpt: Option[String], } val recordCount = numberOfRecordsEstimate match { - case Some(n) => n - case None => dfOut.count() + case Some(n) => Option(n) + case None => + cachePolicy match { + case CachePolicy.Cache => Option(dfOut.count()) + case CachePolicy.Persist => Option(dfOut.count()) + case _ => None + } } MetaTableStats( diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala index 19b68c74c..09222d920 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala @@ -498,14 +498,14 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot builder.withTable(tableBuilder) } - private[core] def getThroughputRps(task: TaskResult): TextElement = { + def getThroughputRps(task: TaskResult): TextElement = { val recordCount = task.runStatus match { case s: Succeeded => s.recordsAppended match { case Some(appended) => appended - case None => s.recordCount + case None => s.recordCount.getOrElse(0L) } - case _ => 0 + case _ => 0L } task.runInfo match { @@ -547,7 +547,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } } - private[core] def getRecordCountText(task: TaskResult): String = { + def getRecordCountText(task: TaskResult): String = { def renderDifference(numRecords: Long, numRecordsOld: Option[Long], numRecordsAppended: Option[Long]): String = { numRecordsOld match { case Some(old) if old > 0 => @@ -571,14 +571,18 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot "-" } else { task.runStatus match { - case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld, s.recordsAppended) + case s: Succeeded => + s.recordCount match { + case Some(recordCount) => renderDifference(recordCount, s.recordCountOld, s.recordsAppended) + case None => "-" + } case d: InsufficientData => renderDifference(d.actual, d.recordCountOld, None) - case _ => "" + case _ => "-" } } } - private[core] def getSizeText(task: TaskResult): String = { + def getSizeText(task: TaskResult): String = { def renderDifferenceSize(numBytes: Long, numBytesOld: Option[Long]): String = { numBytesOld match { case Some(old) if old > 0 => @@ -595,7 +599,11 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot } task.runStatus match { - case s: Succeeded => renderDifferenceSize(s.recordCount, s.recordCountOld) + case s: Succeeded => + s.recordCount match { + case Some(recordCount) => renderDifferenceSize(recordCount, s.recordCountOld) + case None => "" + } case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld) case _ => "" } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala index 7e383e39d..33d6ebb2b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJob.scala @@ -120,25 +120,29 @@ class PythonTransformationJob(operationDef: OperationDef, case ex: AnalysisException => throw new RuntimeException(s"Output data not found in the metastore for $infoDate", ex) } - if (stats.recordCount == 0 && minimumRecords > 0) { + val recordCount = stats.recordCount.getOrElse(0L) + + if (recordCount == 0 && minimumRecords > 0) { throw new RuntimeException(s"Output table is empty in the metastore for $infoDate") } - if (stats.recordCount < minimumRecords) { - throw new RuntimeException(s"The transformation returned too few records (${stats.recordCount} < $minimumRecords).") + if (recordCount < minimumRecords) { + throw new RuntimeException(s"The transformation returned too few records ($recordCount < $minimumRecords).") } val jobFinished = Instant.now() - bookkeeper.setRecordCount(outputTable.name, - infoDate, - infoDate, - infoDate, - stats.recordCount, - stats.recordCount, - jobStarted.getEpochSecond, - jobFinished.getEpochSecond, - isTableTransient = false) + stats.recordCount.foreach{ recordCount => + bookkeeper.setRecordCount(outputTable.name, + infoDate, + infoDate, + infoDate, + recordCount, + recordCount, + jobStarted.getEpochSecond, + jobFinished.getEpochSecond, + isTableTransient = false) + } SaveResult(stats) } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala index 12618d70a..7d3913474 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/SinkJob.scala @@ -142,7 +142,7 @@ class SinkJob(operationDef: OperationDef, isTransient ) - val stats = MetaTableStats(sinkResult.recordsSent, None, None) + val stats = MetaTableStats(Option(sinkResult.recordsSent), None, None) SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings) } catch { case NonFatal(ex) => throw new IllegalStateException("Unable to write to the sink.", ex) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala index 848fff34e..e3c747c4e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/task/TaskRunnerBase.scala @@ -369,7 +369,7 @@ abstract class TaskRunnerBase(conf: Config, val saveResult = if (runtimeConfig.isDryRun) { log.warn(s"$WARNING DRY RUN mode, no actual writes to ${task.job.outputTable.name} for ${task.infoDate} will be performed.") - SaveResult(MetaTableStats(dfTransformed.count(), None, None)) + SaveResult(MetaTableStats(Option(dfTransformed.count()), None, None)) } else { task.job.save(dfTransformed, task.infoDate, task.reason, conf, started, validationResult.inputRecordsCount) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala index 3174dc0d7..1bc2fe436 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/TaskNotificationFactory.scala @@ -29,7 +29,7 @@ object TaskNotificationFactory { Instant.ofEpochMilli(1613600000000L), Instant.ofEpochMilli(1672759508000L) )), - status: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty), + status: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty), applicationId: String = "app_12345", isTransient: Boolean = false, isRawFilesJob: Boolean = false, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala index bef060d6d..488bab536 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/MetastoreSuite.scala @@ -377,7 +377,7 @@ class MetastoreSuite extends AnyWordSpec with SparkTestBase with TextComparisonF val stats = m.getStats("table1", infoDate) - assert(stats.recordCount == 3) + assert(stats.recordCount.contains(3)) assert(stats.dataSizeBytes.exists(_ > 0)) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala index f3c0c0bbd..c01c05bba 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceRawSuite.scala @@ -232,7 +232,7 @@ class MetastorePersistenceRawSuite extends AnyWordSpec with SparkTestBase with T val stats = persistence.getStats(infoDate, onlyForCurrentBatchId = false) - assert(stats.recordCount == 2) + assert(stats.recordCount.contains(2)) assert(stats.dataSizeBytes.contains(7L)) } } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala index 2616c3484..007d9f566 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceSuite.scala @@ -217,7 +217,7 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp val stats = mtp.getStats(infoDate, onlyForCurrentBatchId = false) - assert(stats.recordCount == 3) + assert(stats.recordCount.contains(3)) assert(stats.dataSizeBytes.exists(_ > 0)) } @@ -226,7 +226,7 @@ class MetastorePersistenceSuite extends AnyWordSpec with SparkTestBase with Temp val stats = mtp.getStats(infoDate, onlyForCurrentBatchId = false) - assert(stats.recordCount == 0) + assert(stats.recordCount.contains(0)) assert(stats.dataSizeBytes.contains(0L)) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceTransientEagerSuiteEager.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceTransientEagerSuiteEager.scala index 329dba5b7..9a12edecc 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceTransientEagerSuiteEager.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/MetastorePersistenceTransientEagerSuiteEager.scala @@ -117,7 +117,7 @@ class MetastorePersistenceTransientEagerSuiteEager extends AnyWordSpec with Befo val saveResult = persistor.saveTable(infoDate, exampleDf, Some(10)) - assert(saveResult.recordCount == 10) + assert(saveResult.recordCount.contains(10)) assert(saveResult.dataSizeBytes.isEmpty) TransientTableManager.reset() @@ -128,7 +128,7 @@ class MetastorePersistenceTransientEagerSuiteEager extends AnyWordSpec with Befo val saveResult = persistor.saveTable(infoDate, exampleDf, None) - assert(saveResult.recordCount == 3) + assert(saveResult.recordCount.contains(3)) assert(saveResult.dataSizeBytes.isEmpty) TransientTableManager.reset() @@ -139,7 +139,7 @@ class MetastorePersistenceTransientEagerSuiteEager extends AnyWordSpec with Befo val saveResult = persistor.saveTable(infoDate, exampleDf, None) - assert(saveResult.recordCount == 3) + assert(saveResult.recordCount.contains(3)) assert(saveResult.dataSizeBytes.isDefined) assert(saveResult.dataSizeBytes.exists(_ > 100)) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala index 044673bf8..e1ebcbf65 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/metastore/persistence/TransientJobManagerSuite.scala @@ -166,7 +166,7 @@ class TransientJobManagerSuite extends AnyWordSpec with BeforeAndAfterAll with S "runs a job via teh task runner and return the dataframe" in { val taskRunner = mock(classOf[TaskRunner]) val job = mock(classOf[Job]) - val successStatus = RunStatus.Succeeded(None, 1, None, None, TaskRunReason.OnRequest, Nil, Nil, Nil, Nil) + val successStatus = RunStatus.Succeeded(None, Some(1), None, None, TaskRunReason.OnRequest, Nil, Nil, Nil, Nil) whenMock(taskRunner.runLazyTask(job, infoDate)).thenReturn(successStatus) whenMock(job.outputTable).thenReturn(MetaTableFactory.getDummyMetaTable("table1", format = DataFormat.Transient(CachePolicy.NoCache))) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala index 9d127a354..a89fce7ad 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/RunStatusFactory.scala @@ -20,7 +20,7 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskRunReason} object RunStatusFactory { def getDummySuccess(recordCountOld: Option[Long] = None, - recordCount: Long = 1000, + recordCount: Option[Long] = Some(1000), recordsAppended: Option[Long] = None, sizeBytes: Option[Long] = None, reason: TaskRunReason = TaskRunReason.New, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala index 75227404f..ef9f39fe2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskCompletedFactory.scala @@ -28,8 +28,8 @@ object TaskCompletedFactory { periodBegin: LocalDate = LocalDate.of(2020, 12, 9), periodEnd: LocalDate = LocalDate.of(2020, 12, 9), informationDate: LocalDate = LocalDate.of(2020, 12, 9), - inputRecordCount: Long = 1000L, - inputRecordCountOld: Long = 1000L, + inputRecordCount: Option[Long] = Some(1000L), + inputRecordCountOld: Option[Long] = Some(1000L), outputRecordCount: Option[Long] = Some(1000L), outputRecordCountOld: Option[Long] = Some(1000L), appendedRecordCount: Option[Long] = None, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala index 5d5d2875c..3b334963b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TaskResultFactory.scala @@ -25,7 +25,7 @@ import java.time.{Instant, LocalDate} object TaskResultFactory { def getDummyTaskResult(jobName: String = "DummyJob", outputTable: MetaTableDef = MetaTable.getMetaTableDef(MetaTableFactory.getDummyMetaTable(name = "table_out")), - runStatus: RunStatus = RunStatus.Succeeded(Some(100), 200, None, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil), + runStatus: RunStatus = RunStatus.Succeeded(Some(100), Some(200), None, Some(1000), TaskRunReason.New, Nil, Nil, Nil, Nil), runInfo: Option[RunInfo] = Some(RunInfo(LocalDate.of(2022, 2, 18), Instant.ofEpochSecond(1234), Instant.ofEpochSecond(5678))), applicationId: String = "app_123", isTransient: Boolean = false, diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala index 50633de6f..6df2a2041 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/TestPrototypes.scala @@ -20,10 +20,10 @@ import za.co.absa.pramen.api.status.{RunStatus, TaskRunReason} object TestPrototypes { - val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), 200, None, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) + val runStatusSuccess: RunStatus = RunStatus.Succeeded(Some(100), Some(200), None, Some(1000), TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) val runStatusWarning: RunStatus = RunStatus.Succeeded( - Some(10000), 20000, None, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"), + Some(10000), Some(20000), None, Some(100000), TaskRunReason.New, Seq("file1.txt", "file1.ctl"), Seq("file1.csv", "file2.csv"), Seq("`db`.`table1`"), Seq("Test warning") ) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala index 8b46dab66..192800ec6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/job/JobSpy.scala @@ -40,7 +40,7 @@ class JobSpy(jobName: String = "DummyJob", runFunction: () => RunResult = () => null, scheduleStrategyIn: ScheduleStrategy = new ScheduleStrategySourcing, allowParallel: Boolean = true, - saveStats: MetaTableStats = MetaTableStats(0, None, None), + saveStats: MetaTableStats = MetaTableStats(Some(0), None, None), jobNotificationTargets: Seq[JobNotificationTarget] = Seq.empty, jobTrackDays: Int = 0 ) extends Job { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala index 1c807bb63..1b6a362e2 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/metastore/MetastoreSpy.scala @@ -35,7 +35,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), availableDates: Seq[LocalDate] = Seq(LocalDate.of(2022, 2, 17)), tableDf: DataFrame = null, tableException: Throwable = null, - stats: MetaTableStats = MetaTableStats(0, None, None), + stats: MetaTableStats = MetaTableStats(Some(0), None, None), statsException: Throwable = null, isTableAvailable: Boolean = true, isTableEmpty: Boolean = false, @@ -79,7 +79,7 @@ class MetastoreSpy(registeredTables: Seq[String] = Seq("table1", "table2"), override def saveTable(tableName: String, infoDate: LocalDate, df: DataFrame, inputRecordCount: Option[Long], saveModeOverride: Option[SaveMode]): MetaTableStats = { saveTableInvocations.append((tableName, infoDate, df)) - MetaTableStats(df.count(), None, None) + MetaTableStats(Option(df.count()), None, None) } def getHiveHelper(tableName: String): HiveHelper = { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala index 649e1ef97..d84172d3c 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/mocks/runner/ConcurrentJobRunnerSpy.scala @@ -68,7 +68,7 @@ class ConcurrentJobRunnerSpy(includeFails: Boolean = false, var idx = 0 incomingJobs.foreach(job => { val status = if (!includeFails || idx % 3 == 0) { - RunStatus.Succeeded(Some(1000), 500, None, Some(10000), TaskRunReason.New, Nil, Nil, Nil, Nil) + RunStatus.Succeeded(Some(1000), Some(500), None, Some(10000), TaskRunReason.New, Nil, Nil, Nil, Nil) } else if (idx % 3 == 1) { RunStatus.Failed(new RuntimeException("Dummy exception")) } else { diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala index 2e5f48249..026ef5a42 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/IngestionJobSuite.scala @@ -369,7 +369,7 @@ class IngestionJobSuite extends AnyWordSpec with SparkTestBase with TextComparis val stats = saveResult.stats - assert(stats.recordCount == 3) + assert(stats.recordCount.contains(3)) assert(mt.saveTableInvocations.length == 1) assert(mt.saveTableInvocations.head._1 == "table1") assert(mt.saveTableInvocations.head._2 == infoDate) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala index 5d0acb6dc..763299783 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/PythonTransformationJobSuite.scala @@ -220,7 +220,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi "save" should { "update the bookkeeper" in { - val statsIn = MetaTableStats(100, None, None) + val statsIn = MetaTableStats(Some(100), None, None) val (job, _, _, _) = getUseCase(stats = statsIn) @@ -244,7 +244,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi } "allow no records in the output table" in { - val statsIn = MetaTableStats(0, None, None) + val statsIn = MetaTableStats(Some(0), None, None) val (job, _, _, _) = getUseCase(stats = statsIn) @@ -252,11 +252,11 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi val statsOut = job.save(exampleDf, infoDate, runReason, conf, started, None).stats - assert(statsOut.recordCount == 0) + assert(statsOut.recordCount.contains(0)) } "throw an exception if no records in the output table" in { - val statsIn = MetaTableStats(0, None, None) + val statsIn = MetaTableStats(Some(0), None, None) val (job, _, _, _) = getUseCase(stats = statsIn, extraOptions = Map("minimum.records" -> "1")) @@ -270,7 +270,7 @@ class PythonTransformationJobSuite extends AnyWordSpec with BeforeAndAfterAll wi } "throw an exception if the number of records is less then expected" in { - val statsIn = MetaTableStats(9, None, None) + val statsIn = MetaTableStats(Some(9), None, None) val (job, _, _, _) = getUseCase(stats = statsIn, extraOptions = Map("minimum.records" -> "10")) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala index f646188a0..0e53a3af7 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TaskCompletedSuite.scala @@ -50,7 +50,7 @@ class TaskCompletedSuite extends AnyWordSpec { val taskResult = TaskResult( job.name, MetaTable.getMetaTableDef(job.outputTable), - RunStatus.Succeeded(Some(1000), 2000, None, Some(3000), runReason, Nil, Nil, Nil, Nil), + RunStatus.Succeeded(Some(1000), Some(2000), None, Some(3000), runReason, Nil, Nil, Nil, Nil), Some(RunInfo(infoDate, now.minusSeconds(10), now)), "app_123", isTransient = false, @@ -66,8 +66,8 @@ class TaskCompletedSuite extends AnyWordSpec { assert(taskCompleted.periodBegin == infoDate) assert(taskCompleted.periodEnd == infoDate) assert(taskCompleted.informationDate == infoDate) - assert(taskCompleted.inputRecordCount == 2000) - assert(taskCompleted.inputRecordCountOld == 1000) + assert(taskCompleted.inputRecordCount.contains(2000)) + assert(taskCompleted.inputRecordCountOld.contains(1000)) assert(taskCompleted.outputRecordCount.contains(2000)) assert(taskCompleted.outputRecordCountOld.contains(1000)) assert(taskCompleted.outputSize.contains(3000)) @@ -106,8 +106,8 @@ class TaskCompletedSuite extends AnyWordSpec { assert(taskCompleted.periodBegin == infoDate) assert(taskCompleted.periodEnd == infoDate) assert(taskCompleted.informationDate == infoDate) - assert(taskCompleted.inputRecordCount == 0) - assert(taskCompleted.inputRecordCountOld == 0) + assert(taskCompleted.inputRecordCount.isEmpty) + assert(taskCompleted.inputRecordCountOld.isEmpty) assert(taskCompleted.outputRecordCount.isEmpty) assert(taskCompleted.outputRecordCountOld.isEmpty) assert(taskCompleted.outputSize.isEmpty) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala index a952ccfb4..306bc15c8 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/journal/TestCases.scala @@ -30,7 +30,7 @@ object TestCases { val instant2: Instant = Instant.ofEpochSecond(1597318835) val instant3: Instant = Instant.ofEpochSecond(1597318839) - val task1: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate1, infoDate1, infoDate1, 100, 0, Some(100), None, None, None, 597318830, 1597318830, "New", Some("Test1"), Some("abc123"), Some("p_id_1"), Some("p_1"), Some("TEST"), Some("T1")) - val task2: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate2, infoDate2, infoDate2, 100, 0, Some(100), None, None, None, 1597318835, 1597318835, "Late", Some("Test2"), Some("abc123"), Some("p_id_2"), Some("p_2"), Some("TEST"), Some("T2")) - val task3: TaskCompleted = model.TaskCompleted("job2", "table2", infoDate3, infoDate3, infoDate3, 100, 0, Some(100), None, None, None, 1597318839, 1597318839, "Fail", Some("Test3"), Some("abc123"), Some("p_id_3"), Some("p_2"), Some("TEST"), Some("T2")) + val task1: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate1, infoDate1, infoDate1, Some(100), Some(0), Some(100), None, None, None, 597318830, 1597318830, "New", Some("Test1"), Some("abc123"), Some("p_id_1"), Some("p_1"), Some("TEST"), Some("T1")) + val task2: TaskCompleted = model.TaskCompleted("job1", "table1", infoDate2, infoDate2, infoDate2, Some(100), Some(0), Some(100), None, None, None, 1597318835, 1597318835, "Late", Some("Test2"), Some("abc123"), Some("p_id_2"), Some("p_2"), Some("TEST"), Some("T2")) + val task3: TaskCompleted = model.TaskCompleted("job2", "table2", infoDate3, infoDate3, infoDate3, Some(100), Some(0), Some(100), None, None, None, 1597318839, 1597318839, "Fail", Some("Test3"), Some("abc123"), Some("p_id_3"), Some("p_2"), Some("TEST"), Some("T2")) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala index 98a2444bf..0cc1ddec6 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/notify/pipeline/PipelineNotificationBuilderHtmlSuite.scala @@ -402,7 +402,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for a task without a run info" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, reason = TaskRunReason.New) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(1000000), reason = TaskRunReason.New) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, runInfo = None) val actual = builder.getThroughputRps(task) @@ -413,7 +413,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for a normal successful task" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, reason = TaskRunReason.New) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(1000000), reason = TaskRunReason.New) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getThroughputRps(task) @@ -424,7 +424,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for an incremental successful task" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 1000000, recordsAppended = Some(500000), reason = TaskRunReason.New) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(1000000), recordsAppended = Some(500000), reason = TaskRunReason.New) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getThroughputRps(task) @@ -435,7 +435,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for a raw file task" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 1000 * megabyte, reason = TaskRunReason.New) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(1000 * megabyte), reason = TaskRunReason.New) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) val actual = builder.getThroughputRps(task) @@ -471,13 +471,13 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis val actual = builder.getRecordCountText(task) - assert(actual.isEmpty) + assert(actual == "-") } "work for success file based job" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100), 100, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100), Some(100), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) val actual = builder.getRecordCountText(task) @@ -488,7 +488,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success new" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 100, reason = TaskRunReason.New) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(100), reason = TaskRunReason.New) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getRecordCountText(task) @@ -499,7 +499,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success unchanged" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100), 100, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100), Some(100), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getRecordCountText(task) @@ -510,7 +510,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success increased" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100), 110, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100), Some(110), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getRecordCountText(task) @@ -521,7 +521,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success decreased" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100), 90, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100), Some(90), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getRecordCountText(task) @@ -532,7 +532,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for successful appends" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 110, Some(10), reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(110), Some(10), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus) val actual = builder.getRecordCountText(task) @@ -566,7 +566,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success new" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(None, 100 * megabyte, reason = TaskRunReason.New) + val runStatus = RunStatusFactory.getDummySuccess(None, Some(100 * megabyte), reason = TaskRunReason.New) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) val actual = builder.getSizeText(task) @@ -577,7 +577,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success unchanged" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), 100 * megabyte, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), Some(100 * megabyte), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) val actual = builder.getSizeText(task) @@ -588,7 +588,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success increased" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), 110 * megabyte, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), Some(110 * megabyte), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) val actual = builder.getSizeText(task) @@ -599,7 +599,7 @@ class PipelineNotificationBuilderHtmlSuite extends AnyWordSpec with TextComparis "work for success decreased" in { val builder = getBuilder() - val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), 90 * megabyte, reason = TaskRunReason.Update) + val runStatus = RunStatusFactory.getDummySuccess(Some(100 * megabyte), Some(90 * megabyte), reason = TaskRunReason.Update) val task = TaskResultFactory.getDummyTaskResult(runStatus = runStatus, isRawFilesJob = true) val actual = builder.getSizeText(task) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala index c4b7fe523..a736f079b 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/jobrunner/TaskRunnerMultithreadedSuite.scala @@ -190,7 +190,7 @@ class TaskRunnerMultithreadedSuite extends AnyWordSpec with SparkTestBase { bookkeeper.setRecordCount("table_out", runDate.minusDays(1), runDate.minusDays(1), runDate.minusDays(1), 1, 1, 0, 0, isTableTransient = false) - val stats = MetaTableStats(2, None, Some(100)) + val stats = MetaTableStats(Some(2), None, Some(100)) val operationDef = OperationDefFactory.getDummyOperationDef(consumeThreads = consumeThreads) val job = new JobSpy(runFunction = runFunction, saveStats = stats, operationDef = operationDef, allowParallel = allowParallel) diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala index 1135cf968..0e22094b1 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/RunStatusSuite.scala @@ -23,12 +23,12 @@ class RunStatusSuite extends AnyWordSpec { "toString" should { "Succeeded" when { "New" in { - val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, Some(0), None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) assert(status.toString == "New") } "Update" in { - val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.Update, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, Some(0), None, None, TaskRunReason.Update, Nil, Nil, Nil, Nil) assert(status.toString == "Update") } @@ -85,7 +85,7 @@ class RunStatusSuite extends AnyWordSpec { "isFailure" should { "Succeeded" in { - val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, Some(0), None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) assert(!status.isFailure) } @@ -165,7 +165,7 @@ class RunStatusSuite extends AnyWordSpec { "getReason" should { "Succeeded" in { - val status = RunStatus.Succeeded(None, 0, None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) + val status = RunStatus.Succeeded(None, Some(0), None, None, TaskRunReason.New, Nil, Nil, Nil, Nil) assert(status.getReason().isEmpty) } diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala index b921ebc2d..ad897f1ca 100644 --- a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/task/TaskRunnerBaseSuite.scala @@ -477,7 +477,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val success = result.runStatus.asInstanceOf[Succeeded] assert(success.recordCountOld.isEmpty) - assert(success.recordCount == 2) + assert(success.recordCount.contains(2)) assert(success.sizeBytes.contains(100)) val actualData = SparkUtils.convertDataFrameToPrettyJSON(job.saveDf) @@ -511,7 +511,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar val success = result.runStatus.asInstanceOf[Succeeded] - assert(success.recordCount == 2) + assert(success.recordCount.contains(2)) assert(success.sizeBytes.isEmpty) assert(bk.getDataChunks("table_out", infoDate, infoDate).isEmpty) } @@ -638,7 +638,7 @@ class TaskRunnerBaseSuite extends AnyWordSpec with SparkTestBase with TextCompar killMaxExecutionTimeSeconds = killTimer ) - val stats = MetaTableStats(2, None, Some(100)) + val stats = MetaTableStats(Some(2), None, Some(100)) val job = new JobSpy(preRunCheckFunction = preRunCheckFunction, validationFunction = validationFunction, diff --git a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala index 17a332af6..b9b65fffe 100644 --- a/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala +++ b/pramen/extras/src/test/scala/za/co/absa/pramen/extras/mocks/TestPrototypes.scala @@ -40,7 +40,7 @@ object TestPrototypes { Map.empty, Map.empty) - val taskStatus: RunStatus = RunStatus.Succeeded(None, 100, None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) + val taskStatus: RunStatus = RunStatus.Succeeded(None, Some(100), None, None, TaskRunReason.New, Seq.empty, Seq.empty, Seq.empty, Seq.empty) val taskNotification: TaskResult = status.TaskResult( "Dummy Job",