Skip to content

Commit

Permalink
Add unit tests for the incremental scheduling strategy
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Oct 24, 2024
1 parent b3f91b1 commit 7df79cd
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 31 deletions.
18 changes: 16 additions & 2 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ name: ScalaCI

on:
push:
branches: [ main ]
branches:
- "main"
- "support/*"
paths:
- "pramen/**"
- ".github/workflows/scala.yml"
pull_request:
branches: [ main ]
branches:
- "main"
- "support/*"
paths:
- "pramen/**"
- ".github/workflows/scala.yml"
Expand Down Expand Up @@ -42,6 +46,16 @@ jobs:
distribution: temurin
java-version: 8
cache: sbt
- name: Install sbt
run: |
sudo apt-get update
sudo apt-get install apt-transport-https curl gnupg -yqq
echo "deb https://repo.scala-sbt.org/scalasbt/debian all main" | sudo tee /etc/apt/sources.list.d/sbt.list
echo "deb https://repo.scala-sbt.org/scalasbt/debian /" | sudo tee /etc/apt/sources.list.d/sbt_old.list
curl -sL "https://keyserver.ubuntu.com/pks/lookup?op=get&search=0x2EE0EA64E40A89B84B2DF73499E82A75642AC823" | sudo -H gpg --no-default-keyring --keyring gnupg-ring:/etc/apt/trusted.gpg.d/scalasbt-release.gpg --import
sudo chmod 644 /etc/apt/trusted.gpg.d/scalasbt-release.gpg
sudo apt-get update
sudo apt-get install sbt
- name: Build and run unit tests
working-directory: ./pramen
run: sbt ++${{matrix.scala}} unit:test -DSPARK_VERSION=${{matrix.spark}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class IncrementalIngestionJob(operationDef: OperationDef,

private var latestOffset = latestOffsetIn

override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset, source.hasInfoDateColumn(sourceTable.query))
override val scheduleStrategy: ScheduleStrategy = new ScheduleStrategyIncremental(latestOffset.map(_.maximumInfoDate), source.hasInfoDateColumn(sourceTable.query))

override def trackDays: Int = 0

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,14 @@ package za.co.absa.pramen.core.runner.splitter

import za.co.absa.pramen.api.status.{MetastoreDependency, TaskRunReason}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.bookkeeper.model.DataOffsetAggregated
import za.co.absa.pramen.core.pipeline
import za.co.absa.pramen.core.pipeline.TaskPreDef
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils.{log, _}
import za.co.absa.pramen.core.runner.splitter.ScheduleStrategyUtils._
import za.co.absa.pramen.core.schedule.Schedule

import java.time.LocalDate

class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], hasInfoDateColumn: Boolean) extends ScheduleStrategy {
class ScheduleStrategyIncremental(lastInfoDateProcessedOpt: Option[LocalDate], hasInfoDateColumn: Boolean) extends ScheduleStrategy {
private val log = org.slf4j.LoggerFactory.getLogger(this.getClass)

override def getDaysToRun(
Expand All @@ -40,39 +39,32 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has
minimumDate: LocalDate
): Seq[TaskPreDef] = {
val dates = params match {
case ScheduleParams.Normal(runDate, trackDays, _, _, _) =>
case ScheduleParams.Normal(runDate, trackDays, _, newOnly, lateOnly) =>
val infoDate = evaluateRunDate(runDate, infoDateExpression)
log.info(s"Normal run strategy: runDate=$runDate, infoDate=$infoDate")

val runInfoDays = if (hasInfoDateColumn) {
lastOffsets match {
case Some(lastOffset) =>
if (lastOffset.maximumInfoDate.isBefore(infoDate)) {
val startDate = if (trackDays > 1) {
val trackDate = infoDate.minusDays(trackDays - 1)
val date = if (trackDate.isAfter(lastOffset.maximumInfoDate))
trackDate
else
lastOffset.maximumInfoDate
log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Tracking days = '$trackDate'. Catching up from '$date' until '$infoDate'.")
date
} else {
log.warn(s"Last ran day: ${lastOffset.maximumInfoDate}. Catching up data until '$infoDate'.")
lastOffset.maximumInfoDate
}

val potentialDates = getInfoDateRange(startDate, infoDate, "@runDate", schedule)
potentialDates.map(date => {
TaskPreDef(date, TaskRunReason.New)
})
} else {
lastInfoDateProcessedOpt match {
case Some(lastInfoDate) =>
val newDays = if (lastInfoDate.isBefore(infoDate))
Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
else
Seq(TaskPreDef(infoDate, TaskRunReason.New))

val lateDays = getLateDays(infoDate, lastInfoDate, trackDays)

if (newOnly) {
newDays
} else if (lateOnly) {
lateDays
} else {
lateDays ++ newDays
}
case None => Seq(TaskPreDef(infoDate.minusDays(1), TaskRunReason.New), TaskPreDef(infoDate, TaskRunReason.New))
case None => Seq(TaskPreDef(infoDate, TaskRunReason.New))
}
} else {
lastOffsets match {
case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => Seq.empty
lastInfoDateProcessedOpt match {
case Some(lastInfoDate) if lastInfoDate.isAfter(infoDate) => Seq.empty
case _ => Seq(TaskPreDef(infoDate, TaskRunReason.New))
}
}
Expand All @@ -97,4 +89,36 @@ class ScheduleStrategyIncremental(lastOffsets: Option[DataOffsetAggregated], has

filterOutPastMinimumDates(dates, minimumDate)
}

private[core] def getLateDays(infoDate: LocalDate, lastInfoDate: LocalDate, trackDays: Int): Seq[TaskPreDef] = {
val lastNewDate = infoDate.minusDays(1)

if (lastInfoDate.isBefore(lastNewDate)) {
val startDate = if (trackDays > 1) {
val trackDate = lastNewDate.minusDays(trackDays - 1)
val date = if (trackDate.isAfter(lastInfoDate))
trackDate
else
lastInfoDate
log.warn(s"Last ran day: $lastInfoDate. Tracking days = '$trackDate'. Catching up from '$date' until '$infoDate'.")
date
} else {
if (trackDays < 0) {
log.warn(s"Last ran day: $lastInfoDate. Catching up data until '$infoDate'.")
lastInfoDate
} else {
log.warn(s"Last ran day: $lastInfoDate. Not catching up since 'track.days=$trackDays'. Set it to the number of days or '-1' for infinite catching up.")
lastNewDate
}
}

val potentialDates = getInfoDateRange(startDate, lastNewDate.minusDays(1), "@runDate", Schedule.Incremental)
potentialDates.map(date => {
TaskPreDef(date, TaskRunReason.Late)
})
} else {
Seq.empty
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2022 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.pramen.core.tests.runner.splitter

import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.pramen.api.status.TaskRunReason
import za.co.absa.pramen.core.bookkeeper.BookkeeperNull
import za.co.absa.pramen.core.runner.splitter.{RunMode, ScheduleParams, ScheduleStrategyIncremental}
import za.co.absa.pramen.core.schedule.Schedule

import java.time.LocalDate

class ScheduleStrategyIncrementalSuite extends AnyWordSpec {
private val minimumDate = LocalDate.of(2021, 1, 1)
private val infoDate = LocalDate.of(2021, 2, 18)

"normal run" when {
"info date is defined" should {
"run for the current info date when the job have never ran" in {
val strategy = new ScheduleStrategyIncremental(None, true)
val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.New)
}

"run for the current info date when the job ran today before" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate), true)
val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.New)
}

"run for the current info date when the job ran several before track days = 2" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(10)), true)
val params = ScheduleParams.Normal(infoDate, 2, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 3)
assert(dates.head.infoDate == infoDate.minusDays(2))
assert(dates.head.reason == TaskRunReason.Late)
assert(dates(1).infoDate == infoDate.minusDays(1))
assert(dates(1).reason == TaskRunReason.New)
assert(dates(2).infoDate == infoDate)
assert(dates(2).reason == TaskRunReason.New)
}

"run for the current info date when the job ran several before track days = -1" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(3)), true)
val params = ScheduleParams.Normal(infoDate, -1, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 4)
assert(dates.head.infoDate == infoDate.minusDays(3))
assert(dates.head.reason == TaskRunReason.Late)
assert(dates(1).infoDate == infoDate.minusDays(2))
assert(dates(1).reason == TaskRunReason.Late)
assert(dates(2).infoDate == infoDate.minusDays(1))
assert(dates(2).reason == TaskRunReason.New)
assert(dates(3).infoDate == infoDate)
assert(dates(3).reason == TaskRunReason.New)
}

"run for the current info date new only" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(3)), true)
val params = ScheduleParams.Normal(infoDate, -1, 0, newOnly = true, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 2)
assert(dates.head.infoDate == infoDate.minusDays(1))
assert(dates.head.reason == TaskRunReason.New)
assert(dates(1).infoDate == infoDate)
assert(dates(1).reason == TaskRunReason.New)
}

"run for the current info date late only" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(3)), true)
val params = ScheduleParams.Normal(infoDate, -1, 0, newOnly = false, lateOnly = true)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 2)
assert(dates.head.infoDate == infoDate.minusDays(3))
assert(dates.head.reason == TaskRunReason.Late)
assert(dates(1).infoDate == infoDate.minusDays(2))
assert(dates(1).reason == TaskRunReason.Late)
}
}

"info date is not defined" should {
"run for the current info date when the job have never ran" in {
val strategy = new ScheduleStrategyIncremental(None, false)
val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.New)
}

"run for the current info date when the job ran today before" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate), false)
val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.New)
}

"run for the current info date when the job ran some time ago" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.minusDays(5)), false)
val params = ScheduleParams.Normal(infoDate, 0, 0, newOnly = false, lateOnly = false)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.New)
}
}
}

"re-run" when {
"info date is defined" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), true)
val params = ScheduleParams.Rerun(infoDate)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.Rerun)
}

"info date is not defined" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), false)
val params = ScheduleParams.Rerun(infoDate)

val dates = strategy.getDaysToRun("table1", Seq.empty, null, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 1)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.Rerun)
}
}

"historical run" when {
val bk = new BookkeeperNull

"info date is defined" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), true)
val params = ScheduleParams.Historical(infoDate.minusDays(2), infoDate, inverseDateOrder = false, RunMode.ForceRun)

val dates = strategy.getDaysToRun("table1", Seq.empty, bk, "@runDate", Schedule.Incremental, params, null, minimumDate)

dates.foreach(println)

assert(dates.length == 3)
assert(dates.head.infoDate == infoDate.minusDays(2))
assert(dates.head.reason == TaskRunReason.New)
assert(dates(1).infoDate == infoDate.minusDays(1))
assert(dates(1).reason == TaskRunReason.New)
assert(dates(2).infoDate == infoDate)
assert(dates(2).reason == TaskRunReason.New)
}

"info date is not defined" in {
val strategy = new ScheduleStrategyIncremental(Some(infoDate.plusDays(5)), false)
val params = ScheduleParams.Historical(infoDate.minusDays(2), infoDate, inverseDateOrder = true, RunMode.ForceRun)

val dates = strategy.getDaysToRun("table1", Seq.empty, bk, "@runDate", Schedule.Incremental, params, null, minimumDate)

assert(dates.length == 3)
assert(dates.head.infoDate == infoDate)
assert(dates.head.reason == TaskRunReason.New)
assert(dates(1).infoDate == infoDate.minusDays(1))
assert(dates(1).reason == TaskRunReason.New)
assert(dates(2).infoDate == infoDate.minusDays(2))
assert(dates(2).reason == TaskRunReason.New)
}
}
}

0 comments on commit 7df79cd

Please sign in to comment.