Skip to content

Commit fe94be8

Browse files
authored
#520 Fix handling of incremental ingestion for 'raw' format of metastore tables. (#521)
1 parent dfd3322 commit fe94be8

File tree

7 files changed

+48
-20
lines changed

7 files changed

+48
-20
lines changed

pramen/api/src/main/scala/za/co/absa/pramen/api/offset/OffsetValue.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,9 +77,9 @@ object OffsetValue {
7777
}
7878

7979
def fromString(dataType: String, value: String): Option[OffsetValue] = {
80-
if (value.isEmpty)
80+
if (value == null || value.isEmpty) {
8181
None
82-
else
82+
} else
8383
dataType match {
8484
case DATETIME_TYPE_STR => Some(DateTimeValue(Instant.ofEpochMilli(value.toLong)))
8585
case INTEGRAL_TYPE_STR => Some(IntegralValue(value.toLong))

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/MetastoreImpl.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ class MetastoreImpl(appConfig: Config,
7979
override def getTable(tableName: String, infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
8080
val mt = getTableDef(tableName)
8181

82-
MetastorePersistence.fromMetaTable(mt, appConfig, batchId = batchId).loadTable(infoDateFrom, infoDateTo)
82+
MetastorePersistence.fromMetaTable(mt, appConfig, batchId).loadTable(infoDateFrom, infoDateTo)
8383
}
8484

8585
override def getBatch(tableName: String, infoDate: LocalDate, batchIdOpt: Option[Long]): DataFrame = {
@@ -116,7 +116,7 @@ class MetastoreImpl(appConfig: Config,
116116
var stats = MetaTableStats(Some(0), None, None)
117117

118118
withSparkConfig(mt.sparkConfig) {
119-
stats = MetastorePersistence.fromMetaTable(mt, appConfig, saveModeOverride, batchId).saveTable(infoDate, df, inputRecordCount)
119+
stats = MetastorePersistence.fromMetaTable(mt, appConfig, batchId, saveModeOverride).saveTable(infoDate, df, inputRecordCount)
120120
}
121121

122122
val finish = Instant.now.getEpochSecond

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistence.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ trait MetastorePersistence {
4343
}
4444

4545
object MetastorePersistence {
46-
def fromMetaTable(metaTable: MetaTable, conf: Config, saveModeOverride: Option[SaveMode] = None, batchId: Long)(implicit spark: SparkSession): MetastorePersistence = {
46+
def fromMetaTable(metaTable: MetaTable, conf: Config, batchId: Long, saveModeOverride: Option[SaveMode] = None)(implicit spark: SparkSession): MetastorePersistence = {
4747
val saveModeOpt = saveModeOverride.orElse(metaTable.saveModeOpt)
4848

4949
metaTable.format match {

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/MetastorePersistenceRaw.scala

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717
package za.co.absa.pramen.core.metastore.peristence
1818

1919
import org.apache.hadoop.fs.{FileStatus, Path}
20-
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
20+
import org.apache.spark.sql.types.{StringType, StructField, StructType}
21+
import org.apache.spark.sql.{DataFrame, Row, SaveMode, SparkSession}
2122
import org.slf4j.LoggerFactory
2223
import za.co.absa.pramen.core.metastore.MetaTableStats
2324
import za.co.absa.pramen.core.metastore.model.HiveConfig
25+
import za.co.absa.pramen.core.metastore.peristence.TransientTableManager.{RAW_OFFSET_FIELD_KEY, RAW_PATH_FIELD_KEY}
2426
import za.co.absa.pramen.core.utils.hive.QueryExecutor
2527
import za.co.absa.pramen.core.utils.{FsUtils, SparkUtils}
2628

@@ -30,30 +32,30 @@ import scala.collection.mutable
3032
class MetastorePersistenceRaw(path: String,
3133
infoDateColumn: String,
3234
infoDateFormat: String,
33-
saveModeOpt: Option[SaveMode]
34-
)(implicit spark: SparkSession) extends MetastorePersistence {
35+
saveModeOpt: Option[SaveMode])
36+
(implicit spark: SparkSession) extends MetastorePersistence {
37+
38+
import spark.implicits._
3539

3640
private val log = LoggerFactory.getLogger(this.getClass)
3741

3842
override def loadTable(infoDateFrom: Option[LocalDate], infoDateTo: Option[LocalDate]): DataFrame = {
39-
import spark.implicits._
40-
4143
(infoDateFrom, infoDateTo) match {
4244
case (Some(from), Some(to)) if from.isEqual(to) =>
43-
getListOfFiles(from).map(_.getPath.toString).toDF("path")
45+
listOfPathsToDf(getListOfFiles(from))
4446
case (Some(from), Some(to)) =>
45-
getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path")
47+
listOfPathsToDf(getListOfFilesRange(from, to))
4648
case _ =>
4749
throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.")
4850
}
4951
}
5052

5153
override def saveTable(infoDate: LocalDate, df: DataFrame, numberOfRecordsEstimate: Option[Long]): MetaTableStats = {
52-
if (!df.schema.exists(_.name == "path")) {
54+
if (!df.schema.exists(_.name == RAW_PATH_FIELD_KEY)) {
5355
throw new IllegalArgumentException("The 'raw' persistent format data frame should have 'path' column.")
5456
}
5557

56-
val files = df.select("path").collect().map(_.getString(0))
58+
val files = df.select(RAW_PATH_FIELD_KEY).collect().map(_.getString(0))
5759

5860
val outputDir = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path)
5961

@@ -159,4 +161,22 @@ class MetastorePersistenceRaw(path: String,
159161
fsUtils.getHadoopFiles(subPath).toSeq
160162
}
161163
}
164+
165+
private def listOfPathsToDf(listOfPaths: Seq[FileStatus]): DataFrame = {
166+
val list = listOfPaths.map { path =>
167+
(path.getPath.toString, path.getPath.getName)
168+
}
169+
if (list.isEmpty)
170+
getEmptyRawDf
171+
else {
172+
list.toDF(RAW_PATH_FIELD_KEY, RAW_OFFSET_FIELD_KEY)
173+
}
174+
}
175+
176+
private def getEmptyRawDf(implicit spark: SparkSession): DataFrame = {
177+
val schema = StructType(Seq(StructField(RAW_PATH_FIELD_KEY, StringType), StructField(RAW_OFFSET_FIELD_KEY, StringType)))
178+
179+
val emptyRDD = spark.sparkContext.emptyRDD[Row]
180+
spark.createDataFrame(emptyRDD, schema)
181+
}
162182
}

pramen/core/src/main/scala/za/co/absa/pramen/core/metastore/peristence/TransientTableManager.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@ import scala.util.Random
3232
object TransientTableManager {
3333
private val log = LoggerFactory.getLogger(this.getClass)
3434

35+
val RAW_PATH_FIELD_KEY = "path"
36+
val RAW_OFFSET_FIELD_KEY = "file_name"
37+
3538
private val rawDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
3639
private val cachedDataframes = new mutable.HashMap[MetastorePartition, DataFrame]()
3740
private val persistedLocations = new mutable.HashMap[MetastorePartition, String]()

pramen/core/src/main/scala/za/co/absa/pramen/core/notify/pipeline/PipelineNotificationBuilderHtml.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
308308
val errorMessage = ex.getMessage
309309

310310
val errorMessageTruncated = maxReasonLength match {
311+
case _ if errorMessage == null => "<null error message>"
311312
case Some(maxLength) if errorMessage.length > maxLength => StringUtils.escapeHTML(errorMessage.substring(0, maxLength)) + "..."
312313
case _ => StringUtils.escapeHTML(errorMessage)
313314
}

pramen/core/src/main/scala/za/co/absa/pramen/core/pipeline/IncrementalIngestionJob.scala

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@ import za.co.absa.pramen.api.jobdef.SourceTable
2424
import za.co.absa.pramen.api.offset.DataOffset.UncommittedOffset
2525
import za.co.absa.pramen.api.offset.{OffsetInfo, OffsetType, OffsetValue}
2626
import za.co.absa.pramen.api.sql.SqlGeneratorBase
27-
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskDef, TaskRunReason}
28-
import za.co.absa.pramen.api.{Reason, Source}
27+
import za.co.absa.pramen.api.status.{DependencyWarning, TaskRunReason}
28+
import za.co.absa.pramen.api.{DataFormat, Reason, Source}
2929
import za.co.absa.pramen.core.bookkeeper.model.{DataOffsetAggregated, DataOffsetRequest}
3030
import za.co.absa.pramen.core.bookkeeper.{Bookkeeper, OffsetManager}
3131
import za.co.absa.pramen.core.metastore.Metastore
@@ -151,12 +151,15 @@ class IncrementalIngestionJob(operationDef: OperationDef,
151151
metastore.saveTable(outputTable.name, infoDate, dfToSave, inputRecordCount, saveModeOverride = Some(SaveMode.Append))
152152
}
153153

154-
val updatedDf = metastore.getBatch(outputTable.name, infoDate, None)
154+
val updatedDf = if (outputTable.format.isInstanceOf[DataFormat.Raw])
155+
df
156+
else
157+
metastore.getBatch(outputTable.name, infoDate, None)
155158

156159
if (updatedDf.isEmpty) {
157160
om.rollbackOffsets(req)
158161
} else {
159-
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(df, offsetInfo)
162+
val (minOffset, maxOffset) = getMinMaxOffsetFromDf(updatedDf, offsetInfo)
160163

161164
if (isRerun) {
162165
om.commitRerun(req, minOffset, maxOffset)
@@ -291,8 +294,9 @@ class IncrementalIngestionJob(operationDef: OperationDef,
291294
val row = df.agg(min(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn)).cast(StringType)),
292295
max(offsetInfo.offsetType.getSparkCol(col(offsetInfo.offsetColumn))).cast(StringType))
293296
.collect()(0)
294-
val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).get
295-
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).get
297+
298+
val minValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(0).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(0)}"))
299+
val maxValue = OffsetValue.fromString(offsetInfo.offsetType.dataTypeString, row(1).asInstanceOf[String]).getOrElse(throw new IllegalArgumentException(s"Can't parse offset: ${row(1)}"))
296300

297301
SqlGeneratorBase.validateOffsetValue(minValue)
298302
SqlGeneratorBase.validateOffsetValue(maxValue)

0 commit comments

Comments
 (0)