From 7df79cd79125c628816073220e371541e9b1d1c5 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 24 Oct 2024 14:48:16 +0200 Subject: [PATCH] Add unit tests for the incremental scheduling strategy --- .github/workflows/scala.yml | 18 +- .../pipeline/IncrementalIngestionJob.scala | 2 +- .../ScheduleStrategyIncremental.scala | 80 ++++--- .../ScheduleStrategyIncrementalSuite.scala | 209 ++++++++++++++++++ 4 files changed, 278 insertions(+), 31 deletions(-) create mode 100644 pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala diff --git a/.github/workflows/scala.yml b/.github/workflows/scala.yml index 9f3dcbba2..6e50ea496 100644 --- a/.github/workflows/scala.yml +++ b/.github/workflows/scala.yml @@ -2,12 +2,16 @@ name: ScalaCI on: push: - branches: [ main ] + branches: + - "main" + - "support/*" paths: - "pramen/**" - ".github/workflows/scala.yml" pull_request: - branches: [ main ] + branches: + - "main" + - "support/*" paths: - "pramen/**" - ".github/workflows/scala.yml" @@ -42,6 +46,16 @@ jobs: distribution: temurin java-version: 8 cache: sbt + - name: Install sbt + run: | + sudo apt-get update + sudo apt-get install apt-transport-https curl gnupg -yqq + echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list + echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list + curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import + sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg + sudo apt-get update + sudo apt-get install sbt - name: Build and run unit tests working-directory: ./pramen run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}} diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 3510de6dc..b69d5845b 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -50,7 +50,7 @@ class IncrementalIngestionJob(operationDef: OperationDef, private var latestOffset = latestOffsetIn - override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset, source.hasInfoDateColumn(sourceTable.query)) + override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset.map(_.maximumInfoDate), source.hasInfoDateColumn(sourceTable.query)) override def trackDays: Int = 0 diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala index b809514a7..97ac5c7eb 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/splitter/ScheduleStrategyIncremental.scala @@ -18,15 +18,14 @@ package za.co.absa.pramen.core.runner.splitter import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason} import za.co.absa.pramen.core.bookkeeper.Bookkeeper -import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated import za.co.absa.pramen.core.pipeline import za.co.absa.pramen.core.pipeline.TaskPreDef -import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils.{log, _} +import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._ import za.co.absa.pramen.core.schedule.Schedule import java.time.LocalDate -class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], hasInfoDateColumn: Boolean) extends ScheduleStrategy { +class ScheduleStrategyIncremental(lastInfoDateProcessedOpt: Option[LocalDate], hasInfoDateColumn: Boolean) extends ScheduleStrategy { private val log = org.slf4j.LoggerFactory.getLogger(this.getClass) override def getDaysToRun( @@ -40,39 +39,32 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has minimumDate: LocalDate ): Seq[TaskPreDef] = { val dates = params match { - case ScheduleParams.Normal(runDate, trackDays, _, _, _) => + case ScheduleParams.Normal(runDate, trackDays, _, newOnly, lateOnly) => val infoDate = evaluateRunDate(runDate, infoDateExpression) log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate") val runInfoDays = if (hasInfoDateColumn) { - lastOffsets match { - case Some(lastOffset) => - if (lastOffset.maximumInfoDate.isBefore(infoDate)) { - val startDate = if (trackDays > 1) { - val trackDate = infoDate.minusDays(trackDays - 1) - val date = if (trackDate.isAfter(lastOffset.maximumInfoDate)) - trackDate - else - lastOffset.maximumInfoDate - log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Tracking days = '$trackDate'. Catching up from '$date' until '$infoDate'.") - date - } else { - log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Catching up data until '$infoDate'.") - lastOffset.maximumInfoDate - } - - val potentialDates = getInfoDateRange(startDate, infoDate, "@runDate", schedule) - potentialDates.map(date => { - TaskPreDef(date, TaskRunReason.New) - }) - } else { + lastInfoDateProcessedOpt match { + case Some(lastInfoDate) => + val newDays = if (lastInfoDate.isBefore(infoDate)) + Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New)) + else Seq(TaskPreDef(infoDate, TaskRunReason.New)) + + val lateDays = getLateDays(infoDate, lastInfoDate, trackDays) + + if (newOnly) { + newDays + } else if (lateOnly) { + lateDays + } else { + lateDays ++ newDays } - case None => Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New)) + case None => Seq(TaskPreDef(infoDate, TaskRunReason.New)) } } else { - lastOffsets match { - case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => Seq.empty + lastInfoDateProcessedOpt match { + case Some(lastInfoDate) if lastInfoDate.isAfter(infoDate) => Seq.empty case _ => Seq(TaskPreDef(infoDate, TaskRunReason.New)) } } @@ -97,4 +89,36 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has filterOutPastMinimumDates(dates, minimumDate) } + + private[core] def getLateDays(infoDate: LocalDate, lastInfoDate: LocalDate, trackDays: Int): Seq[TaskPreDef] = { + val lastNewDate = infoDate.minusDays(1) + + if (lastInfoDate.isBefore(lastNewDate)) { + val startDate = if (trackDays > 1) { + val trackDate = lastNewDate.minusDays(trackDays - 1) + val date = if (trackDate.isAfter(lastInfoDate)) + trackDate + else + lastInfoDate + log.warn(s"Last ran day: $lastInfoDate. Tracking days = '$trackDate'. Catching up from '$date' until '$infoDate'.") + date + } else { + if (trackDays < 0) { + log.warn(s"Last ran day: $lastInfoDate. Catching up data until '$infoDate'.") + lastInfoDate + } else { + log.warn(s"Last ran day: $lastInfoDate. Not catching up since 'track.days=$trackDays'. Set it to the number of days or '-1' for infinite catching up.") + lastNewDate + } + } + + val potentialDates = getInfoDateRange(startDate, lastNewDate.minusDays(1), "@runDate", Schedule.Incremental) + potentialDates.map(date => { + TaskPreDef(date, TaskRunReason.Late) + }) + } else { + Seq.empty + } + } } + diff --git a/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala new file mode 100644 index 000000000..808f07f29 --- /dev/null +++ b/pramen/core/src/test/scala/za/co/absa/pramen/core/tests/runner/splitter/ScheduleStrategyIncrementalSuite.scala @@ -0,0 +1,209 @@ +/* + * Copyright 2022 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.pramen.core.tests.runner.splitter + +import org.scalatest.wordspec.AnyWordSpec +import za.co.absa.pramen.api.status.TaskRunReason +import za.co.absa.pramen.core.bookkeeper.BookkeeperNull +import za.co.absa.pramen.core.runner.splitter.{RunMode, ScheduleParams, ScheduleStrategyIncremental} +import za.co.absa.pramen.core.schedule.Schedule + +import java.time.LocalDate + +class ScheduleStrategyIncrementalSuite extends AnyWordSpec { + private val minimumDate = LocalDate.of(2021, 1, 1) + private val infoDate = LocalDate.of(2021, 2, 18) + + "normal run" when { + "info date is defined" should { + "run for the current info date when the job have never ran" in { + val strategy = new ScheduleStrategyIncremental(None, true) + val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.New) + } + + "run for the current info date when the job ran today before" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate), true) + val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.New) + } + + "run for the current info date when the job ran several before track days = 2" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(10)), true) + val params = ScheduleParams.Normal(infoDate, 2, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 3) + assert(dates.head.infoDate == infoDate.minusDays(2)) + assert(dates.head.reason == TaskRunReason.Late) + assert(dates(1).infoDate == infoDate.minusDays(1)) + assert(dates(1).reason == TaskRunReason.New) + assert(dates(2).infoDate == infoDate) + assert(dates(2).reason == TaskRunReason.New) + } + + "run for the current info date when the job ran several before track days = -1" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(3)), true) + val params = ScheduleParams.Normal(infoDate, -1, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 4) + assert(dates.head.infoDate == infoDate.minusDays(3)) + assert(dates.head.reason == TaskRunReason.Late) + assert(dates(1).infoDate == infoDate.minusDays(2)) + assert(dates(1).reason == TaskRunReason.Late) + assert(dates(2).infoDate == infoDate.minusDays(1)) + assert(dates(2).reason == TaskRunReason.New) + assert(dates(3).infoDate == infoDate) + assert(dates(3).reason == TaskRunReason.New) + } + + "run for the current info date new only" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(3)), true) + val params = ScheduleParams.Normal(infoDate, -1, 0, newOnly = true, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 2) + assert(dates.head.infoDate == infoDate.minusDays(1)) + assert(dates.head.reason == TaskRunReason.New) + assert(dates(1).infoDate == infoDate) + assert(dates(1).reason == TaskRunReason.New) + } + + "run for the current info date late only" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(3)), true) + val params = ScheduleParams.Normal(infoDate, -1, 0, newOnly = false, lateOnly = true) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 2) + assert(dates.head.infoDate == infoDate.minusDays(3)) + assert(dates.head.reason == TaskRunReason.Late) + assert(dates(1).infoDate == infoDate.minusDays(2)) + assert(dates(1).reason == TaskRunReason.Late) + } + } + + "info date is not defined" should { + "run for the current info date when the job have never ran" in { + val strategy = new ScheduleStrategyIncremental(None, false) + val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.New) + } + + "run for the current info date when the job ran today before" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate), false) + val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.New) + } + + "run for the current info date when the job ran some time ago" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(5)), false) + val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.New) + } + } + } + + "re-run" when { + "info date is defined" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), true) + val params = ScheduleParams.Rerun(infoDate) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.Rerun) + } + + "info date is not defined" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), false) + val params = ScheduleParams.Rerun(infoDate) + + val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 1) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.Rerun) + } + } + + "historical run" when { + val bk = new BookkeeperNull + + "info date is defined" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), true) + val params = ScheduleParams.Historical(infoDate.minusDays(2), infoDate, inverseDateOrder = false, RunMode.ForceRun) + + val dates = strategy.getDaysToRun("table1", Seq.empty, bk, "@runDate", Schedule.Incremental, params, null, minimumDate) + + dates.foreach(println) + + assert(dates.length == 3) + assert(dates.head.infoDate == infoDate.minusDays(2)) + assert(dates.head.reason == TaskRunReason.New) + assert(dates(1).infoDate == infoDate.minusDays(1)) + assert(dates(1).reason == TaskRunReason.New) + assert(dates(2).infoDate == infoDate) + assert(dates(2).reason == TaskRunReason.New) + } + + "info date is not defined" in { + val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), false) + val params = ScheduleParams.Historical(infoDate.minusDays(2), infoDate, inverseDateOrder = true, RunMode.ForceRun) + + val dates = strategy.getDaysToRun("table1", Seq.empty, bk, "@runDate", Schedule.Incremental, params, null, minimumDate) + + assert(dates.length == 3) + assert(dates.head.infoDate == infoDate) + assert(dates.head.reason == TaskRunReason.New) + assert(dates(1).infoDate == infoDate.minusDays(1)) + assert(dates(1).reason == TaskRunReason.New) + assert(dates(2).infoDate == infoDate.minusDays(2)) + assert(dates(2).reason == TaskRunReason.New) + } + } +}