diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala index 4619c577e..41f046e8e 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala @@ -30,7 +30,7 @@ import za.co.absa.pramen.core.metastore.model.MetaTable import za.co.absa.pramen.core.runner.splitter.{ScheduleStrategy, ScheduleStrategyIncremental} import za.co.absa.pramen.core.utils.SparkUtils._ -import java.time.{Instant, LocalDate} +import java.time.{Instant, LocalDate, ZoneId} class IncrementalIngestionJob(operationDef: OperationDef, metastore: Metastore, @@ -91,12 +91,16 @@ class IncrementalIngestionJob(operationDef: OperationDef, } } } else { - latestOffset match { - case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => - log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.") - Reason.Skip("Incremental ingestion cannot be retrospective") - case _ => - Reason.Ready + if (hasInfoDate) { + Reason.Ready + } else { + latestOffset match { + case Some(offset) if offset.maximumInfoDate.isAfter(infoDate) => + log.warn(s"Cannot run '${outputTable.name}' for '$infoDate' since offsets exists for ${offset.maximumInfoDate}.") + Reason.Skip("Incremental ingestion cannot be retrospective") + case _ => + Reason.Ready + } } } } else { @@ -117,10 +121,14 @@ class IncrementalIngestionJob(operationDef: OperationDef, if (runReason == TaskRunReason.Rerun) { source.getData(sourceTable.query, infoDate, infoDate, columns) } else { - latestOffset match { + val om = bookkeeper.getOffsetManager + val infoDateLatestOffset = om.getMaxInfoDateAndOffset(outputTable.name, Some(infoDate)) + infoDateLatestOffset match { case Some(maxOffset) => + log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for offset > ${maxOffset.maximumOffset.valueString}.") source.getDataIncremental(sourceTable.query, Option(infoDate), Option(maxOffset.maximumOffset), None, columns) case None => + log.info(s"Running ingestion to '${outputTable.name}' at '$infoDate' for all data available at the day.") source.getData(sourceTable.query, infoDate, infoDate, columns) } } diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/sql/SqlGeneratorMicrosoft.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/sql/SqlGeneratorMicrosoft.scala index e0d5e7740..81b101075 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/sql/SqlGeneratorMicrosoft.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/sql/SqlGeneratorMicrosoft.scala @@ -145,7 +145,11 @@ class SqlGeneratorMicrosoft(sqlConfig: SqlConfig) extends SqlGenerator { val offsetWhere = getOffsetWhereClause(sqlConfig.offsetInfo.get, offsetFromOpt, offsetToOpt) if (offsetWhere.nonEmpty) { - s"$dataQuery AND $offsetWhere" + if (onlyForInfoDate.isEmpty) { + s"$dataQuery WHERE $offsetWhere" + } else { + s"$dataQuery AND $offsetWhere" + } } else { dataQuery }