Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#366 Track raw files at sources and in the metastore by size instead by count #371

Merged
merged 2 commits into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.metastore.peristence

import org.apache.hadoop.fs.{FileUtil, Path}
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
Expand All @@ -39,9 +39,9 @@ class MetastorePersistenceRaw(path: String,

(infoDateFrom, infoDateTo) match {
case (Some(from), Some(to)) if from.isEqual(to) =>
getListOfFiles(from).toDF("path")
getListOfFiles(from).map(_.getPath.toString).toDF("path")
case (Some(from), Some(to)) =>
getListOfFilesRange(from, to).toDF("path")
getListOfFilesRange(from, to).map(_.getPath.toString).toDF("path")
case _ =>
throw new IllegalArgumentException("Metastore 'raw' format requires info date for querying its contents.")
}
Expand Down Expand Up @@ -82,7 +82,7 @@ class MetastorePersistenceRaw(path: String,
}

MetaTableStats(
files.length,
totalSize,
Some(totalSize)
)
}
Expand All @@ -97,7 +97,7 @@ class MetastorePersistenceRaw(path: String,
var totalSize = 0L

files.foreach(file => {
totalSize += fsUtils.fs.getContentSummary(new Path(file)).getLength
totalSize += file.getLen
})

MetaTableStats(
Expand All @@ -119,13 +119,13 @@ class MetastorePersistenceRaw(path: String,
throw new UnsupportedOperationException("Raw format does not support Hive tables.")
}

private def getListOfFilesRange(infoDateFrom: LocalDate, infoDateTo: LocalDate): Seq[String] = {
private def getListOfFilesRange(infoDateFrom: LocalDate, infoDateTo: LocalDate): Seq[FileStatus] = {
if (infoDateFrom.isAfter(infoDateTo))
Seq.empty[String]
Seq.empty[FileStatus]
else {
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path)
var d = infoDateFrom
val files = mutable.ArrayBuffer.empty[String]
val files = mutable.ArrayBuffer.empty[FileStatus]

while (d.isBefore(infoDateTo) || d.isEqual(infoDateTo)) {
val subPath = SparkUtils.getPartitionPath(d, infoDateColumn, infoDateFormat, path)
Expand All @@ -138,15 +138,15 @@ class MetastorePersistenceRaw(path: String,
}
}

private def getListOfFiles(infoDate: LocalDate): Seq[String] = {
private def getListOfFiles(infoDate: LocalDate): Seq[FileStatus] = {
val fsUtils = new FsUtils(spark.sparkContext.hadoopConfiguration, path)

val subPath = SparkUtils.getPartitionPath(infoDate, infoDateColumn, infoDateFormat, path)

if (fsUtils.exists(new Path(path)) && !fsUtils.exists(subPath)) {
// The absence of the partition folder means no data is there, which is okay quite often.
// But fsUtils.getHadoopFiles() throws an exception that fails the job and dependent jobs in this case
Seq.empty[String]
Seq.empty[FileStatus]
} else {
fsUtils.getHadoopFiles(subPath).toSeq
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import za.co.absa.pramen.api.notification._
import za.co.absa.pramen.core.config.Keys.TIMEZONE
import za.co.absa.pramen.core.exceptions.{CmdFailedException, ProcessFailedException}
import za.co.absa.pramen.core.notify.message._
import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS}
import za.co.absa.pramen.core.notify.pipeline.PipelineNotificationBuilderHtml.{MIN_MEGABYTES, MIN_RPS_JOB_DURATION_SECONDS, MIN_RPS_RECORDS}
import za.co.absa.pramen.core.pipeline.TaskRunReason
import za.co.absa.pramen.core.runner.task.RunStatus._
import za.co.absa.pramen.core.runner.task.{NotificationFailure, RunStatus, TaskResult}
Expand All @@ -37,6 +37,7 @@ import scala.collection.mutable.ListBuffer
object PipelineNotificationBuilderHtml {
val MIN_RPS_JOB_DURATION_SECONDS = 60
val MIN_RPS_RECORDS = 1000
val MIN_MEGABYTES = 10
}

class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNotificationBuilder {
Expand Down Expand Up @@ -321,10 +322,13 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
case _ => false
})

val outputSizeKnown = tasks.exists(t => t.runStatus match {
case s: Succeeded => s.sizeBytes.isDefined
case _ => false
})
val outputSizeKnown = tasks.exists { t =>
val hasExplicitSize = t.runStatus match {
case s: Succeeded => s.sizeBytes.isDefined
case _ => false
}
t.isRawFilesJob || hasExplicitSize
}

val haveReasonColumn = tasks.exists(t => t.runStatus.getReason().nonEmpty || t.dependencyWarnings.nonEmpty)
val haveHiveColumn = tasks.exists(t => t.runStatus.isInstanceOf[Succeeded] && t.runStatus.asInstanceOf[Succeeded].hiveTablesUpdated.nonEmpty)
Expand All @@ -343,7 +347,7 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
tableHeaders.append(TableHeader(TextElement("Elapsed Time"), Align.Center))
if (outputSizeKnown)
tableHeaders.append(TableHeader(TextElement("Size"), Align.Right))
tableHeaders.append(TableHeader(TextElement("RPS"), Align.Right))
tableHeaders.append(TableHeader(TextElement("Throughput"), Align.Right))
tableHeaders.append(TableHeader(TextElement("Saved at"), Align.Center))
tableHeaders.append(TableHeader(TextElement("Status"), Align.Center))
if (haveReasonColumn)
Expand Down Expand Up @@ -375,8 +379,13 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot

row.append(TextElement(getElapsedTime(task)))

if (outputSizeKnown)
row.append(TextElement(getOutputSize(task)))
if (outputSizeKnown) {
if (task.isRawFilesJob) {
row.append(TextElement(getSizeText(task)))
} else {
row.append(TextElement(getOutputSize(task)))
}
}

row.append(getThroughputRps(task))
row.append(TextElement(getFinishTime(task)))
Expand Down Expand Up @@ -452,19 +461,41 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
case Some(runInfo) =>
val jobDuration = Duration.between(runInfo.started, runInfo.finished).getSeconds
if (jobDuration > MIN_RPS_JOB_DURATION_SECONDS && recordCount >= MIN_RPS_RECORDS) {
val throughput = recordCount / jobDuration

throughput match {
case n if n < minRps => TextElement(throughput.toString, Style.Warning)
case n if n >= goodRps => TextElement(throughput.toString, Style.Success)
case _ => TextElement(throughput.toString)
if (task.isRawFilesJob) {
getBytesPerSecondsText(recordCount, jobDuration)
} else {
getRpsText(recordCount, jobDuration)
}
} else
TextElement("")
case None => TextElement("")
}
}

private[core] def getRpsText(recordOrByteCount: Long, jobDurationSeconds: Long): TextElement = {
val throughput = recordOrByteCount / jobDurationSeconds
val rps = s"${throughput.toString} r/s"

throughput match {
case n if n < minRps => TextElement(rps, Style.Warning)
case n if n >= goodRps => TextElement(rps, Style.Success)
case _ => TextElement(rps)
}
}

private[core] def getBytesPerSecondsText(totalBytesCount: Long, jobDurationSeconds: Long): TextElement = {
val MEGABYTE = 1024L * 1024L

val sizeMb = totalBytesCount / MEGABYTE

if (sizeMb < MIN_MEGABYTES) {
TextElement("")
} else {
val throughput = totalBytesCount / jobDurationSeconds
TextElement(s"${StringUtils.prettySize(throughput)}/s")
}
}

private[core] def getRecordCountText(task: TaskResult): String = {
def renderDifference(numRecords: Long, numRecordsOld: Option[Long]): String = {
numRecordsOld match {
Expand All @@ -473,17 +504,44 @@ class PipelineNotificationBuilderHtml(implicit conf: Config) extends PipelineNot
if (diff > 0)
s"$numRecords (+$diff)"
else if (diff < 0)
s"$numRecords (-$diff)"
s"$numRecords ($diff)"
else {
numRecords.toString
}
case _ => numRecords.toString
}
}

if (task.isRawFilesJob) {
"-"
} else {
task.runStatus match {
case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld)
case d: InsufficientData => renderDifference(d.actual, d.recordCountOld)
case _ => ""
}
}
}

private[core] def getSizeText(task: TaskResult): String = {
def renderDifferenceSize(numBytes: Long, numBytesOld: Option[Long]): String = {
numBytesOld match {
case Some(old) if old > 0 =>
val diff = numBytes - old
if (diff > 0)
s"${StringUtils.prettySize(numBytes)} (+${StringUtils.prettySize(diff)})"
else if (diff < 0)
s"${StringUtils.prettySize(numBytes)} (-${StringUtils.prettySize(Math.abs(diff))})"
else {
StringUtils.prettySize(numBytes)
}
case _ => StringUtils.prettySize(numBytes)
}
}

task.runStatus match {
case s: Succeeded => renderDifference(s.recordCount, s.recordCountOld)
case d: InsufficientData => renderDifference(d.actual, d.recordCountOld)
case s: Succeeded => renderDifferenceSize(s.recordCount, s.recordCountOld)
case d: InsufficientData => renderDifferenceSize(d.actual, d.recordCountOld)
case _ => ""
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package za.co.absa.pramen.core.runner.jobrunner

import com.github.yruslan.channel.{Channel, ReadChannel}
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.DataFormat
import za.co.absa.pramen.core.app.config.RuntimeConfig
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.exceptions.FatalErrorWrapper
Expand Down Expand Up @@ -103,7 +104,8 @@ class ConcurrentJobRunnerImpl(runtimeConfig: RuntimeConfig,
}

private[core] def sendFailure(ex: Throwable, job: Job, isTransient: Boolean): Unit = {
completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient, Nil, Nil, Nil) :: Nil, false))
completedJobsChannel.send((job, TaskResult(job, RunStatus.Failed(ex), None, applicationId, isTransient,
job.outputTable.format.isInstanceOf[DataFormat.Raw], Nil, Nil, Nil) :: Nil, false))
}

private[core] def runJob(job: Job): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import com.github.yruslan.channel.Channel
import com.typesafe.config.Config
import org.apache.spark.sql.SparkSession
import org.slf4j.LoggerFactory
import za.co.absa.pramen.api.DataFormat
import za.co.absa.pramen.core.app.AppContext
import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ValidationException}
import za.co.absa.pramen.core.pipeline.{Job, JobDependency, OperationType}
Expand Down Expand Up @@ -124,7 +125,8 @@ class OrchestratorImpl extends Orchestrator {
val isTransient = job.outputTable.format.isTransient
val isFailure = hasNonPassiveNonOptionalDeps(job, missingTables)

val taskResult = TaskResult(job, RunStatus.MissingDependencies(isFailure, missingTables), None, applicationId, isTransient, Nil, Nil, Nil)
val taskResult = TaskResult(job, RunStatus.MissingDependencies(isFailure, missingTables), None, applicationId,
isTransient, job.outputTable.format.isInstanceOf[DataFormat.Raw], Nil, Nil, Nil)

state.addTaskCompletion(taskResult :: Nil)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ case class TaskResult(
runInfo: Option[RunInfo],
applicationId: String,
isTransient: Boolean,
isRawFilesJob: Boolean,
schemaChanges: Seq[SchemaDifference],
dependencyWarnings: Seq[DependencyWarning],
notificationTargetErrors: Seq[NotificationFailure]
Expand Down
Loading
Loading