Skip to content

Commit

Permalink
#188: final implementation of desired functionality with adjusted dat…
Browse files Browse the repository at this point in the history
…a model and tests
  • Loading branch information
lsulak committed May 23, 2024
1 parent c1c10ce commit a8c8ead
Show file tree
Hide file tree
Showing 34 changed files with 203 additions and 192 deletions.
6 changes: 3 additions & 3 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.agent
import com.typesafe.config.{Config, ConfigFactory}
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher}
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointSubmitDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* Entity that communicate with the API, primarily focused on spawning Atum Context(s).
Expand All @@ -41,11 +41,11 @@ trait AtumAgent {
private[agent] def currentUser: String = System.getProperty("user.name") // platform independent

/**
* Sends `CheckpointSubmitDTO` to the AtumService API
* Sends `CheckpointDTO` to the AtumService API
*
* @param checkpoint Already initialized Checkpoint object to store
*/
private[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = {
private[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
dispatcher.saveCheckpoint(checkpoint)
}

Expand Down
4 changes: 2 additions & 2 deletions agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class AtumContext private[agent] (
val measurementDTOs = takeMeasurements(dataToMeasure)
val endTime = ZonedDateTime.now()

val checkpointDTO = CheckpointSubmitDTO(
val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = agent.currentUser,
Expand All @@ -106,7 +106,7 @@ class AtumContext private[agent] (
def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[Measure, MeasureResult]): AtumContext = {
val dateTimeNow = ZonedDateTime.now()

val checkpointDTO = CheckpointSubmitDTO(
val checkpointDTO = CheckpointDTO(
id = UUID.randomUUID(),
name = checkpointName,
author = agent.currentUser,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) {
/**
* This method is used to save checkpoint to server.
*
* @param checkpoint : CheckpointSubmitDTO to be saved.
* @param checkpoint : CheckpointDTO to be saved.
*/
override protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = {
override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
captureFunctionCall(checkpoint, ())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import org.apache.spark.internal.Logging
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointSubmitDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* dispatcher useful for development, testing and debugging
Expand All @@ -32,7 +32,7 @@ class ConsoleDispatcher(config: Config) extends Dispatcher(config: Config) with
AtumContextDTO(partitioning = partitioning.partitioning)
}

override protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = {
override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
println(s"Saving checkpoint to server. $checkpoint")
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.atum.agent.dispatcher

import com.typesafe.config.Config
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointSubmitDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}

/**
* This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection.
Expand All @@ -35,9 +35,9 @@ abstract class Dispatcher(config: Config) {

/**
* This method is used to save checkpoint to server.
* @param checkpoint: CheckpointSubmitDTO to be saved.
* @param checkpoint: CheckpointDTO to be saved.
*/
protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit
protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit

/**
* This method is used to save the additional data to the server.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
import sttp.client3._
import sttp.model.Uri
import za.co.absa.atum.agent.exception.AtumAgentException.HttpException
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointSubmitDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.utils.SerializationUtils

class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Logging {
Expand Down Expand Up @@ -55,7 +55,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log
)
}

override protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = {
override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = {
val request = commonAtumRequest
.post(createCheckpointEndpoint)
.body(SerializationUtils.asJson(checkpoint))
Expand Down
10 changes: 5 additions & 5 deletions agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.scalatest.matchers.should.Matchers
import za.co.absa.atum.agent.AtumContext.AtumPartitions
import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn}
import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure}
import za.co.absa.atum.model.dto.CheckpointSubmitDTO
import za.co.absa.atum.model.dto.CheckpointDTO
import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType

class AtumContextTest extends AnyFlatSpec with Matchers {
Expand Down Expand Up @@ -93,7 +93,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {

atumContext.createCheckpoint("testCheckpoint", df)

val argument = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO])
val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())

assert(argument.getValue.name == "testCheckpoint")
Expand Down Expand Up @@ -121,7 +121,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
atumContext.createCheckpointOnProvidedData(
checkpointName = "name", measurements = measurements)

val argument = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO])
val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent).saveCheckpoint(argument.capture())

assert(argument.getValue.name == "name")
Expand Down Expand Up @@ -165,7 +165,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
val df = spark.createDataFrame(rdd, schema)
.createCheckpoint("checkPointNameCount")

val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO])
val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture())

assert(argumentFirst.getValue.name == "checkPointNameCount")
Expand All @@ -178,7 +178,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers {
when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user
df.createCheckpoint("checkPointNameSum")

val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO])
val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO])
verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture())

assert(argumentSecond.getValue.name == "checkPointNameSum")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.agent.dispatcher
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointSubmitDTO, PartitionDTO, PartitioningDTO, PartitioningSubmitDTO}
import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitionDTO, PartitioningDTO, PartitioningSubmitDTO}

import java.time.ZonedDateTime
import java.util.UUID
Expand All @@ -35,8 +35,8 @@ class CapturingDispatcherTest extends AnyWordSpec with Matchers {
emptyCfg.withValue("atum.dispatcher.capture.capture-limit", value)
}

private def createCheckpoint(partition: PartitioningDTO): CheckpointSubmitDTO =
CheckpointSubmitDTO(
private def createCheckpoint(partition: PartitioningDTO): CheckpointDTO =
CheckpointDTO(
id = UUID.randomUUID(),
name = "name",
author = "author",
Expand Down
46 changes: 26 additions & 20 deletions database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
*/

CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints(
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT measure_name TEXT,
OUT measure_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
IN i_partitioning_of_flow JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT author TEXT,
OUT measured_by_atum_agent BOOLEAN,
OUT measure_name TEXT,
OUT measured_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
) RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
Expand All @@ -53,14 +55,17 @@ $$
-- specifying `i_checkpoint_name` parameter
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_checkpoint - id of retrieved checkpoint
-- checkpoint_name - name of retrieved checkpoint
-- measure_name - measure name associated with a given checkpoint
-- measure_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
-- status - Status code
-- status_text - Status text
-- id_checkpoint - ID of retrieved checkpoint
-- checkpoint_name - Name of the retrieved checkpoint
-- author - Author of the checkpoint
-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent
-- (if false, data supplied manually)
-- measure_name - measure name associated with a given checkpoint
-- measured_columns - measure columns associated with a given checkpoint
-- measurement_value - measurement details associated with a given checkpoint
-- checkpoint_time - time
--
-- Status codes:
-- 11 - OK
Expand Down Expand Up @@ -89,6 +94,7 @@ BEGIN
RETURN QUERY
SELECT 11 AS status, 'OK' AS status_text,
CP.id_checkpoint, CP.checkpoint_name,
CP.created_by AS author, CP.measured_by_atum_agent,
MD.measure_name, MD.measured_columns,
M.measurement_value,
CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,12 +229,14 @@ class GetFlowCheckpointsTest extends DBTestSuite {
assert(row1.getString("status_text").contains("OK"))
assert(row1.getUUID("id_checkpoint").contains(checkpointId))
assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime))

val measure1 = MeasuredDetails(
row1.getString("measure_name").get,
row1.getArray[String]("measure_columns").map(_.toList).get,
row1.getArray[String]("measured_columns").map(_.toList).get,
row1.getJsonB("measurement_value").get
)

Expand All @@ -243,12 +245,14 @@ class GetFlowCheckpointsTest extends DBTestSuite {
assert(row2.getString("status_text").contains("OK"))
assert(row2.getUUID("id_checkpoint").contains(checkpointId))
assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg"))
assert(row1.getString("author").contains("ObviouslySomeTest"))
assert(row1.getBoolean("measured_by_atum_agent").contains(true))
assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime))
assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime))

val measure2 = MeasuredDetails(
row2.getString("measure_name").get,
row2.getArray[String]("measure_columns").map(_.toList).get,
row2.getArray[String]("measured_columns").map(_.toList).get,
row2.getJsonB("measurement_value").get
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package za.co.absa.atum.model.dto
import java.time.ZonedDateTime
import java.util.UUID

case class CheckpointSubmitDTO(
case class CheckpointDTO(
id: UUID,
name: String,
author: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,19 @@

package za.co.absa.atum.model.dto

import io.circe.Json
import java.time.ZonedDateTime
import java.util.UUID

// TODO REMOVE
case class MeasureResultDTO1(
mainValue: MeasureResultDTO1.TypedValue1,
// TODO READ doobie Map[String, MeasureResultDTO1.TypedValue1]
supportValues: Map[String, String /*MeasureResultDTO1.TypedValue1*/] = Map.empty
)

object MeasureResultDTO1 {
case class TypedValue1(
value: String,
// TODO READ doobie sealed trait ResultValueType
valueType: String//ResultValueType1
)

sealed trait ResultValueType1

object ResultValueType1 {
case object String extends ResultValueType1
case object Long extends ResultValueType1
case object BigDecimal extends ResultValueType1
case object Double extends ResultValueType1
}

}

// TODO move to server?! It's not gonna be used for API communication - it's only for internal DB model (db -> scala)
case class CheckpointQueryResultDTO(
idCheckpoint: UUID,
checkpointName: String,
measureName: String,
measureColumns: Seq[String],
measurementValue: MeasureResultDTO1, // TODO MeasureResultDTO
checkpointStartTime: ZonedDateTime,
checkpointEndTime: Option[ZonedDateTime],
)
idCheckpoint: UUID,
checkpointName: String,
author: String,
measuredByAtumAgent: Boolean = false,
measureName: String,
measuredColumns: Seq[String],
measurementValue: Json,
checkpointStartTime: ZonedDateTime,
checkpointEndTime: Option[ZonedDateTime]
)
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,8 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
assert(actualAtumContextDTO == expectedAtumContextDTO)
}

// CheckpointSubmitDTO
"asJson" should "serialize CheckpointSubmitDTO into json string" in {
// CheckpointDTO
"asJson" should "serialize CheckpointDTO into json string" in {
val uuid = UUID.randomUUID()
val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val timeWithZone = ZonedDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneId.of("CET"))
Expand All @@ -163,7 +163,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
)
)

val checkpointDTO = CheckpointSubmitDTO(
val checkpointDTO = CheckpointDTO(
id = uuid,
name = "checkpoint",
author = "author",
Expand Down Expand Up @@ -193,7 +193,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
assert(actualCheckpointDTOJson == expectedCheckpointDTOJson)
}

"fromJson" should "deserialize CheckpointSubmitDTO from json string" in {
"fromJson" should "deserialize CheckpointDTO from json string" in {
val uuid = UUID.randomUUID()
val seqPartitionDTO = Seq(PartitionDTO("key", "val"))
val timeWithZone = ZonedDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2))
Expand Down Expand Up @@ -222,7 +222,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
)
)

val expectedCheckpointDTO = CheckpointSubmitDTO(
val expectedCheckpointDTO = CheckpointDTO(
id = uuid,
name = "checkpoint",
author = "author",
Expand All @@ -233,7 +233,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike {
measurements = setMeasurementDTO
)

val actualCheckpointDTO = SerializationUtils.fromJson[CheckpointSubmitDTO](checkpointDTOJson)
val actualCheckpointDTO = SerializationUtils.fromJson[CheckpointDTO](checkpointDTOJson)

assert(actualCheckpointDTO == expectedCheckpointDTO)
}
Expand Down
Loading

0 comments on commit a8c8ead

Please sign in to comment.