diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala index 1f3297a1b..9e375b9e8 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumAgent.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.agent import com.typesafe.config.{Config, ConfigFactory} import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.dispatcher.{CapturingDispatcher, ConsoleDispatcher, Dispatcher, HttpDispatcher} -import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointSubmitDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, CheckpointDTO, PartitioningSubmitDTO} /** * Entity that communicate with the API, primarily focused on spawning Atum Context(s). @@ -41,11 +41,11 @@ trait AtumAgent { private[agent] def currentUser: String = System.getProperty("user.name") // platform independent /** - * Sends `CheckpointSubmitDTO` to the AtumService API + * Sends `CheckpointDTO` to the AtumService API * * @param checkpoint Already initialized Checkpoint object to store */ - private[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = { + private[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { dispatcher.saveCheckpoint(checkpoint) } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala index 22fd68d2d..d4a51ba83 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/AtumContext.scala @@ -81,7 +81,7 @@ class AtumContext private[agent] ( val measurementDTOs = takeMeasurements(dataToMeasure) val endTime = ZonedDateTime.now() - val checkpointDTO = CheckpointSubmitDTO( + val checkpointDTO = CheckpointDTO( id = UUID.randomUUID(), name = checkpointName, author = agent.currentUser, @@ -106,7 +106,7 @@ class AtumContext private[agent] ( def createCheckpointOnProvidedData(checkpointName: String, measurements: Map[Measure, MeasureResult]): AtumContext = { val dateTimeNow = ZonedDateTime.now() - val checkpointDTO = CheckpointSubmitDTO( + val checkpointDTO = CheckpointDTO( id = UUID.randomUUID(), name = checkpointName, author = agent.currentUser, diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala index 1b04f7c37..6684b7253 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcher.scala @@ -110,9 +110,9 @@ class CapturingDispatcher(config: Config) extends Dispatcher(config) { /** * This method is used to save checkpoint to server. * - * @param checkpoint : CheckpointSubmitDTO to be saved. + * @param checkpoint : CheckpointDTO to be saved. */ - override protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = { + override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { captureFunctionCall(checkpoint, ()) } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala index 30438a4ab..cbffd2ff4 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/ConsoleDispatcher.scala @@ -18,7 +18,7 @@ package za.co.absa.atum.agent.dispatcher import com.typesafe.config.Config import org.apache.spark.internal.Logging -import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointSubmitDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO} /** * dispatcher useful for development, testing and debugging @@ -32,7 +32,7 @@ class ConsoleDispatcher(config: Config) extends Dispatcher(config: Config) with AtumContextDTO(partitioning = partitioning.partitioning) } - override protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = { + override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { println(s"Saving checkpoint to server. $checkpoint") } diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala index 718cdb7c0..f0ea03812 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/Dispatcher.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.agent.dispatcher import com.typesafe.config.Config -import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointSubmitDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO} /** * This class provides a contract for different dispatchers. It has a constructor foe eventual creation via reflection. @@ -35,9 +35,9 @@ abstract class Dispatcher(config: Config) { /** * This method is used to save checkpoint to server. - * @param checkpoint: CheckpointSubmitDTO to be saved. + * @param checkpoint: CheckpointDTO to be saved. */ - protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit + protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit /** * This method is used to save the additional data to the server. diff --git a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala index 335625c56..13548a18c 100644 --- a/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala +++ b/agent/src/main/scala/za/co/absa/atum/agent/dispatcher/HttpDispatcher.scala @@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging import sttp.client3._ import sttp.model.Uri import za.co.absa.atum.agent.exception.AtumAgentException.HttpException -import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointSubmitDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointDTO, PartitioningSubmitDTO} import za.co.absa.atum.model.utils.SerializationUtils class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Logging { @@ -55,7 +55,7 @@ class HttpDispatcher(config: Config) extends Dispatcher(config: Config) with Log ) } - override protected[agent] def saveCheckpoint(checkpoint: CheckpointSubmitDTO): Unit = { + override protected[agent] def saveCheckpoint(checkpoint: CheckpointDTO): Unit = { val request = commonAtumRequest .post(createCheckpointEndpoint) .body(SerializationUtils.asJson(checkpoint)) diff --git a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala index eef9f84da..f3aba4731 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/AtumContextTest.scala @@ -25,7 +25,7 @@ import org.scalatest.matchers.should.Matchers import za.co.absa.atum.agent.AtumContext.AtumPartitions import za.co.absa.atum.agent.model.AtumMeasure.{RecordCount, SumOfValuesOfColumn} import za.co.absa.atum.agent.model.{Measure, MeasureResult, MeasurementBuilder, UnknownMeasure} -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.model.dto.MeasureResultDTO.ResultValueType class AtumContextTest extends AnyFlatSpec with Matchers { @@ -93,7 +93,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { atumContext.createCheckpoint("testCheckpoint", df) - val argument = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO]) + val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) assert(argument.getValue.name == "testCheckpoint") @@ -121,7 +121,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { atumContext.createCheckpointOnProvidedData( checkpointName = "name", measurements = measurements) - val argument = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO]) + val argument = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent).saveCheckpoint(argument.capture()) assert(argument.getValue.name == "name") @@ -165,7 +165,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { val df = spark.createDataFrame(rdd, schema) .createCheckpoint("checkPointNameCount") - val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO]) + val argumentFirst = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent, times(1)).saveCheckpoint(argumentFirst.capture()) assert(argumentFirst.getValue.name == "checkPointNameCount") @@ -178,7 +178,7 @@ class AtumContextTest extends AnyFlatSpec with Matchers { when(mockAgent.currentUser).thenReturn(authorTest + "Another") // maybe a process changed the author / current user df.createCheckpoint("checkPointNameSum") - val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointSubmitDTO]) + val argumentSecond = ArgumentCaptor.forClass(classOf[CheckpointDTO]) verify(mockAgent, times(2)).saveCheckpoint(argumentSecond.capture()) assert(argumentSecond.getValue.name == "checkPointNameSum") diff --git a/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala b/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala index 4732c3b2e..a4245dd3b 100644 --- a/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala +++ b/agent/src/test/scala/za/co/absa/atum/agent/dispatcher/CapturingDispatcherTest.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.agent.dispatcher import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointSubmitDTO, PartitionDTO, PartitioningDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{AtumContextDTO, CheckpointDTO, PartitionDTO, PartitioningDTO, PartitioningSubmitDTO} import java.time.ZonedDateTime import java.util.UUID @@ -35,8 +35,8 @@ class CapturingDispatcherTest extends AnyWordSpec with Matchers { emptyCfg.withValue("atum.dispatcher.capture.capture-limit", value) } - private def createCheckpoint(partition: PartitioningDTO): CheckpointSubmitDTO = - CheckpointSubmitDTO( + private def createCheckpoint(partition: PartitioningDTO): CheckpointDTO = + CheckpointDTO( id = UUID.randomUUID(), name = "name", author = "author", diff --git a/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql b/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql index 2266c1481..06a2e036b 100644 --- a/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql +++ b/database/src/main/postgres/flows/V1.9.1__get_flow_checkpoints.sql @@ -15,18 +15,20 @@ */ CREATE OR REPLACE FUNCTION flows.get_flow_checkpoints( - IN i_partitioning_of_flow JSONB, - IN i_limit INT DEFAULT 5, - IN i_checkpoint_name TEXT DEFAULT NULL, - OUT status INTEGER, - OUT status_text TEXT, - OUT id_checkpoint UUID, - OUT checkpoint_name TEXT, - OUT measure_name TEXT, - OUT measure_columns TEXT[], - OUT measurement_value JSONB, - OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, - OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE + IN i_partitioning_of_flow JSONB, + IN i_limit INT DEFAULT 5, + IN i_checkpoint_name TEXT DEFAULT NULL, + OUT status INTEGER, + OUT status_text TEXT, + OUT id_checkpoint UUID, + OUT checkpoint_name TEXT, + OUT author TEXT, + OUT measured_by_atum_agent BOOLEAN, + OUT measure_name TEXT, + OUT measured_columns TEXT[], + OUT measurement_value JSONB, + OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE, + OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE ) RETURNS SETOF record AS $$ ------------------------------------------------------------------------------- @@ -53,14 +55,17 @@ $$ -- specifying `i_checkpoint_name` parameter -- -- Returns: --- status - Status code --- status_text - Status text --- id_checkpoint - id of retrieved checkpoint --- checkpoint_name - name of retrieved checkpoint --- measure_name - measure name associated with a given checkpoint --- measure_columns - measure columns associated with a given checkpoint --- measurement_value - measurement details associated with a given checkpoint --- checkpoint_time - time +-- status - Status code +-- status_text - Status text +-- id_checkpoint - ID of retrieved checkpoint +-- checkpoint_name - Name of the retrieved checkpoint +-- author - Author of the checkpoint +-- measured_by_atum_agent - Flag indicating whether the checkpoint was measured by Atum Agent +-- (if false, data supplied manually) +-- measure_name - measure name associated with a given checkpoint +-- measured_columns - measure columns associated with a given checkpoint +-- measurement_value - measurement details associated with a given checkpoint +-- checkpoint_time - time -- -- Status codes: -- 11 - OK @@ -89,6 +94,7 @@ BEGIN RETURN QUERY SELECT 11 AS status, 'OK' AS status_text, CP.id_checkpoint, CP.checkpoint_name, + CP.created_by AS author, CP.measured_by_atum_agent, MD.measure_name, MD.measured_columns, M.measurement_value, CP.process_start_time AS checkpoint_start_time, CP.process_end_time AS checkpoint_end_time diff --git a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsTest.scala b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsTest.scala index bebe82c13..e56b1799f 100644 --- a/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsTest.scala +++ b/database/src/test/scala/za/co/absa/atum/database/flows/GetFlowCheckpointsTest.scala @@ -229,12 +229,14 @@ class GetFlowCheckpointsTest extends DBTestSuite { assert(row1.getString("status_text").contains("OK")) assert(row1.getUUID("id_checkpoint").contains(checkpointId)) assert(row1.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("author").contains("ObviouslySomeTest")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row1.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row1.getOffsetDateTime("checkpoint_end_time").contains(endTime)) val measure1 = MeasuredDetails( row1.getString("measure_name").get, - row1.getArray[String]("measure_columns").map(_.toList).get, + row1.getArray[String]("measured_columns").map(_.toList).get, row1.getJsonB("measurement_value").get ) @@ -243,12 +245,14 @@ class GetFlowCheckpointsTest extends DBTestSuite { assert(row2.getString("status_text").contains("OK")) assert(row2.getUUID("id_checkpoint").contains(checkpointId)) assert(row2.getString("checkpoint_name").contains("CheckpointNameCntAndAvg")) + assert(row1.getString("author").contains("ObviouslySomeTest")) + assert(row1.getBoolean("measured_by_atum_agent").contains(true)) assert(row2.getOffsetDateTime("checkpoint_start_time").contains(startTime)) assert(row2.getOffsetDateTime("checkpoint_end_time").contains(endTime)) val measure2 = MeasuredDetails( row2.getString("measure_name").get, - row2.getArray[String]("measure_columns").map(_.toList).get, + row2.getArray[String]("measured_columns").map(_.toList).get, row2.getJsonB("measurement_value").get ) diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointSubmitDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointDTO.scala similarity index 96% rename from model/src/main/scala/za/co/absa/atum/model/dto/CheckpointSubmitDTO.scala rename to model/src/main/scala/za/co/absa/atum/model/dto/CheckpointDTO.scala index a0a6187f5..b833e128f 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointSubmitDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointDTO.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.model.dto import java.time.ZonedDateTime import java.util.UUID -case class CheckpointSubmitDTO( +case class CheckpointDTO( id: UUID, name: String, author: String, diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointQueryResultDTO.scala b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointQueryResultDTO.scala index c2df1acc1..851ed793e 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointQueryResultDTO.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/CheckpointQueryResultDTO.scala @@ -16,40 +16,19 @@ package za.co.absa.atum.model.dto +import io.circe.Json import java.time.ZonedDateTime import java.util.UUID -// TODO REMOVE -case class MeasureResultDTO1( - mainValue: MeasureResultDTO1.TypedValue1, - // TODO READ doobie Map[String, MeasureResultDTO1.TypedValue1] - supportValues: Map[String, String /*MeasureResultDTO1.TypedValue1*/] = Map.empty - ) - -object MeasureResultDTO1 { - case class TypedValue1( - value: String, - // TODO READ doobie sealed trait ResultValueType - valueType: String//ResultValueType1 - ) - - sealed trait ResultValueType1 - - object ResultValueType1 { - case object String extends ResultValueType1 - case object Long extends ResultValueType1 - case object BigDecimal extends ResultValueType1 - case object Double extends ResultValueType1 - } - -} - +// TODO move to server?! It's not gonna be used for API communication - it's only for internal DB model (db -> scala) case class CheckpointQueryResultDTO( - idCheckpoint: UUID, - checkpointName: String, - measureName: String, - measureColumns: Seq[String], - measurementValue: MeasureResultDTO1, // TODO MeasureResultDTO - checkpointStartTime: ZonedDateTime, - checkpointEndTime: Option[ZonedDateTime], -) + idCheckpoint: UUID, + checkpointName: String, + author: String, + measuredByAtumAgent: Boolean = false, + measureName: String, + measuredColumns: Seq[String], + measurementValue: Json, + checkpointStartTime: ZonedDateTime, + checkpointEndTime: Option[ZonedDateTime] + ) diff --git a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala b/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala index da07fe918..0c0d1718c 100644 --- a/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala +++ b/model/src/test/scala/za/co/absa/atum/model/utils/SerializationUtilsTest.scala @@ -149,8 +149,8 @@ class SerializationUtilsTest extends AnyFlatSpecLike { assert(actualAtumContextDTO == expectedAtumContextDTO) } - // CheckpointSubmitDTO - "asJson" should "serialize CheckpointSubmitDTO into json string" in { + // CheckpointDTO + "asJson" should "serialize CheckpointDTO into json string" in { val uuid = UUID.randomUUID() val seqPartitionDTO = Seq(PartitionDTO("key", "val")) val timeWithZone = ZonedDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneId.of("CET")) @@ -163,7 +163,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { ) ) - val checkpointDTO = CheckpointSubmitDTO( + val checkpointDTO = CheckpointDTO( id = uuid, name = "checkpoint", author = "author", @@ -193,7 +193,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { assert(actualCheckpointDTOJson == expectedCheckpointDTOJson) } - "fromJson" should "deserialize CheckpointSubmitDTO from json string" in { + "fromJson" should "deserialize CheckpointDTO from json string" in { val uuid = UUID.randomUUID() val seqPartitionDTO = Seq(PartitionDTO("key", "val")) val timeWithZone = ZonedDateTime.of(2023, 10, 24, 10, 20, 59, 5000000, ZoneOffset.ofHours(2)) @@ -222,7 +222,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { ) ) - val expectedCheckpointDTO = CheckpointSubmitDTO( + val expectedCheckpointDTO = CheckpointDTO( id = uuid, name = "checkpoint", author = "author", @@ -233,7 +233,7 @@ class SerializationUtilsTest extends AnyFlatSpecLike { measurements = setMeasurementDTO ) - val actualCheckpointDTO = SerializationUtils.fromJson[CheckpointSubmitDTO](checkpointDTOJson) + val actualCheckpointDTO = SerializationUtils.fromJson[CheckpointDTO](checkpointDTOJson) assert(actualCheckpointDTO == expectedCheckpointDTO) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 4bb377157..dfc97afe1 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -28,6 +28,7 @@ object Dependencies { val balta = "0.1.0" val jacksonModuleScala = "2.14.2" + val circeVersion = "0.14.5" val specs2 = "4.10.0" val typesafeConfig = "1.4.2" @@ -115,12 +116,17 @@ object Dependencies { lazy val json4sJackson = "org.json4s" %% "json4s-jackson" % json4sVersion lazy val json4sNative = "org.json4s" %% "json4s-native" % json4sVersion % Provided + lazy val circeCore = "io.circe" %% "circe-core" % Versions.circeVersion + lazy val circeParser = "io.circe" %% "circe-parser" % Versions.circeVersion + Seq( jacksonModuleScala, json4sExt, json4sCore, json4sJackson, - json4sNative + json4sNative, + circeCore, + circeParser, ) } @@ -163,6 +169,7 @@ object Dependencies { // Fa-db lazy val faDbDoobie = faDbOrg %% "doobie" % Versions.fadb + lazy val pgCirceDoobie = "org.tpolecat" %% "doobie-postgres-circe" % "1.0.0-RC2" // aws lazy val awsSecretsManagerSdk = awsSdkOrg % "secretsmanager" % Versions.awssdk @@ -175,6 +182,7 @@ object Dependencies { Seq( faDbDoobie, + pgCirceDoobie, zioCore, zioMacros, zioLogging, @@ -244,7 +252,7 @@ object Dependencies { jsonSerdeDependencies(scalaVersion) } - def databaseDependencies: Seq[ModuleID] = { + def databaseDependencies: Seq[ModuleID] = { lazy val scalaTest = "org.scalatest" %% "scalatest" % Versions.scalatest % Test lazy val balta = "za.co.absa" %% "balta" % Versions.balta % Test diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala index 822220e11..082e18b52 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointController.scala @@ -16,12 +16,12 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.model.ErrorResponse import zio.IO import zio.macros.accessible @accessible trait CheckpointController { - def createCheckpoint(checkpointDTO: CheckpointSubmitDTO): IO[ErrorResponse, CheckpointSubmitDTO] + def createCheckpoint(checkpointDTO: CheckpointDTO): IO[ErrorResponse, CheckpointDTO] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index 880565a38..f1e3330c0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.service.CheckpointService import za.co.absa.atum.server.model.ErrorResponse import zio._ @@ -24,8 +24,8 @@ import zio._ class CheckpointControllerImpl(checkpointService: CheckpointService) extends CheckpointController with BaseController { - override def createCheckpoint(checkpointDTO: CheckpointSubmitDTO): IO[ErrorResponse, CheckpointSubmitDTO] = { - serviceCallWithStatus[Unit, CheckpointSubmitDTO]( + override def createCheckpoint(checkpointDTO: CheckpointDTO): IO[ErrorResponse, CheckpointDTO] = { + serviceCallWithStatus[Unit, CheckpointDTO]( checkpointService.saveCheckpoint(checkpointDTO), _ => checkpointDTO ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala index a0a790bb8..dae56d6ce 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala @@ -16,12 +16,12 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointQueryResultDTO} +import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO} import za.co.absa.atum.server.model.ErrorResponse import zio.IO import zio.macros.accessible @accessible trait FlowController { - def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, Seq[CheckpointQueryResultDTO]] + def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, Seq[CheckpointDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala index a536070f9..74d227170 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowControllerImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.controller -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointQueryResultDTO, CheckpointSubmitDTO} +import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO} import za.co.absa.atum.server.api.service.FlowService import za.co.absa.atum.server.model.ErrorResponse import zio._ @@ -24,8 +24,8 @@ import zio._ class FlowControllerImpl(flowService: FlowService) extends FlowController with BaseController { - override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, Seq[CheckpointQueryResultDTO]] = { - serviceCall[Seq[CheckpointQueryResultDTO], Seq[CheckpointQueryResultDTO]]( + override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, Seq[CheckpointDTO]] = { + serviceCall[Seq[CheckpointDTO], Seq[CheckpointDTO]]( flowService.getFlowCheckpoints(checkpointQueryDTO), r => r ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala index 40687a226..eb59dacf7 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala @@ -18,10 +18,12 @@ package za.co.absa.atum.server.api.database import cats.Show import cats.data.NonEmptyList -import doobie.{Get, Put} import doobie.postgres.implicits._ +import doobie.{Get, Put} +import io.circe.Decoder import org.postgresql.jdbc.PgArray import org.postgresql.util.PGobject +import za.co.absa.atum.model.dto.MeasureResultDTO import scala.util.{Failure, Success, Try} @@ -156,4 +158,19 @@ object DoobieImplicits { } + implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap { + case "String" => Right(MeasureResultDTO.ResultValueType.String) + case "Long" => Right(MeasureResultDTO.ResultValueType.Long) + case "BigDecimal" => Right(MeasureResultDTO.ResultValueType.BigDecimal) + case "Double" => Right(MeasureResultDTO.ResultValueType.Double) + case other => Left(s"Cannot decode $other as ResultValueType") + } + + implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] = + Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply) + + implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] = + Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply) + + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala index a1a5f75ba..b30d4b3b9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -23,6 +23,7 @@ import play.api.libs.json.Json import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointQueryResultDTO} import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.flows.Flows +import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import za.co.absa.atum.server.model.PartitioningForDB import za.co.absa.fadb.DBSchema import za.co.absa.fadb.doobie.DoobieEngine @@ -31,7 +32,9 @@ import zio._ import zio.interop.catz._ import doobie.postgres.implicits._ -import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get +import doobie.postgres.circe.jsonb.implicits._ +import io.circe.syntax.EncoderOps +import io.circe.generic.auto._ class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) extends DoobieMultipleResultFunction[CheckpointQueryDTO, CheckpointQueryResultDTO, Task] { @@ -39,7 +42,9 @@ class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task] override val fieldsToSelect: Seq[String] = Seq( "id_checkpoint", "checkpoint_name", - "measure_name", "measure_columns", "measurement_value", + "author", + "measured_by_atum_agent", + "measure_name", "measured_columns", "measurement_value", "checkpoint_start_time", "checkpoint_end_time", ) @@ -54,7 +59,7 @@ class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task] partitioningNormalized }, ${values.limit}, - ${values.checkpointName}, + ${values.checkpointName} ) AS ${Fragment.const(alias)};""" } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala index bbe1ca7f3..35b615dad 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala @@ -19,7 +19,7 @@ package za.co.absa.atum.server.api.database.runs.functions import doobie.Fragment import doobie.implicits._ import doobie.util.Read -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.model.PartitioningForDB import za.co.absa.fadb.DBSchema import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData} @@ -35,10 +35,10 @@ import za.co.absa.atum.server.model.PlayJsonImplicits.writesMeasurementDTO import doobie.postgres.implicits._ class WriteCheckpoint(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieSingleResultFunctionWithStatus[CheckpointSubmitDTO, Unit, Task] + extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task] with StandardStatusHandling { - override def sql(values: CheckpointSubmitDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = { + override def sql(values: CheckpointDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = { val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) val partitioningNormalized = Json.toJson(partitioning).toString // List[String] containing json data has to be properly escaped diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index dc8ae3af5..521a82fcc 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -21,19 +21,19 @@ import sttp.tapir.{PublicEndpoint, endpoint} import sttp.tapir.generic.auto.schemaForCaseClass import sttp.tapir.json.play.jsonBody import sttp.tapir.ztapir._ -import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, AtumContextDTO, CheckpointQueryDTO, CheckpointQueryResultDTO, CheckpointSubmitDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.Constants.Endpoints._ import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.PlayJsonImplicits._ trait Endpoints extends BaseEndpoints { - protected val createCheckpointEndpoint: PublicEndpoint[CheckpointSubmitDTO, ErrorResponse, CheckpointSubmitDTO, Any] = { + protected val createCheckpointEndpoint: PublicEndpoint[CheckpointDTO, ErrorResponse, CheckpointDTO, Any] = { apiV1.post .in(CreateCheckpoint) - .in(jsonBody[CheckpointSubmitDTO]) + .in(jsonBody[CheckpointDTO]) .out(statusCode(StatusCode.Created)) - .out(jsonBody[CheckpointSubmitDTO]) + .out(jsonBody[CheckpointDTO]) } protected val createPartitioningEndpoint @@ -55,12 +55,12 @@ trait Endpoints extends BaseEndpoints { } protected val getFlowCheckpointsEndpoint - : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, Seq[CheckpointQueryResultDTO], Any] = { + : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, Seq[CheckpointDTO], Any] = { apiV2.post .in(GetFlowCheckpoints) .in(jsonBody[CheckpointQueryDTO]) .out(statusCode(StatusCode.Ok)) - .out(jsonBody[Seq[CheckpointQueryResultDTO]]) + .out(jsonBody[Seq[CheckpointDTO]]) } protected val zioMetricsEndpoint: PublicEndpoint[Unit, Unit, String, Any] = { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index f88bbd011..bf1a681a2 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.fadb.exceptions.StatusException import zio._ @@ -24,5 +24,5 @@ import zio.macros.accessible @accessible trait CheckpointRepository { - def writeCheckpoint(checkpointDTO: CheckpointSubmitDTO): IO[DatabaseError, Either[StatusException, Unit]] + def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Either[StatusException, Unit]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index 269642935..e033f7f5f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.fadb.exceptions.StatusException @@ -24,7 +24,7 @@ import zio._ class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint) extends CheckpointRepository with BaseRepository { - override def writeCheckpoint(checkpointDTO: CheckpointSubmitDTO): IO[DatabaseError, Either[StatusException, Unit]] = { + override def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Either[StatusException, Unit]] = { dbCallWithStatus(writeCheckpointFn(checkpointDTO), "writeCheckpoint") } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index f2f4b35b5..fd96f8b46 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.fadb.exceptions.StatusException import zio._ @@ -24,5 +24,5 @@ import zio.macros.accessible @accessible trait CheckpointService { - def saveCheckpoint(checkpointDTO: CheckpointSubmitDTO): IO[ServiceError, Either[StatusException, Unit]] + def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Either[StatusException, Unit]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index 63b6f94de..dba47bcd7 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.CheckpointRepository import za.co.absa.fadb.exceptions.StatusException @@ -25,7 +25,7 @@ import zio._ class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends CheckpointService with BaseService { - override def saveCheckpoint(checkpointDTO: CheckpointSubmitDTO): IO[ServiceError, Either[StatusException, Unit]] = { + override def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Either[StatusException, Unit]] = { repositoryCallWithStatus( checkpointRepository.writeCheckpoint(checkpointDTO), "saveCheckpoint" ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala index 93ef12c0a..34781cbdf 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala @@ -16,12 +16,12 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointQueryResultDTO} +import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO} import za.co.absa.atum.server.api.exception.ServiceError import zio._ import zio.macros.accessible @accessible trait FlowService { - def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointQueryResultDTO]] + def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 6373b4d1e..e00aa3322 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointQueryResultDTO} +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO, MeasurementDTO, MeasureDTO, MeasureResultDTO} import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.FlowRepository import zio._ @@ -24,10 +24,44 @@ import zio._ class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with BaseService { - override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointQueryResultDTO]] = { + override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = { repositoryCall( flowRepository.getFlowCheckpoints(checkpointQueryDTO), "getFlowCheckpoints" - ) + ).map({ + checkpointMeasurementsSeq => + checkpointMeasurementsSeq.map { cm => + CheckpointDTO( + id = cm.idCheckpoint, + name = cm.checkpointName, + author = cm.author, + measuredByAtumAgent = cm.measuredByAtumAgent, + partitioning = checkpointQueryDTO.partitioning, + processStartTime = cm.checkpointStartTime, + processEndTime = cm.checkpointEndTime, + measurements = Set( + MeasurementDTO( + measure = MeasureDTO( + measureName = cm.measureName, + measuredColumns = cm.measuredColumns + ), + result = MeasureResultDTO( + mainValue = MeasureResultDTO.TypedValue( + value = cm.measurementValue.hcursor.downField("value").as[String].getOrElse(""), + valueType = cm.measurementValue.hcursor.downField("valueType").as[String].getOrElse("") match { + case "String" => MeasureResultDTO.ResultValueType.String + case "Long" => MeasureResultDTO.ResultValueType.Long + case "BigDecimal" => MeasureResultDTO.ResultValueType.BigDecimal + case "Double" => MeasureResultDTO.ResultValueType.Double + case _ => MeasureResultDTO.ResultValueType.String + } + ), + supportValues = Map.empty + ) + ) + ) + ) + } + }) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala b/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala index 9ea83ad2c..343658742 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/PlayJsonImplicits.scala @@ -19,7 +19,6 @@ package za.co.absa.atum.server.model import play.api.libs.functional.syntax.toFunctionalBuilderOps import play.api.libs.json._ import za.co.absa.atum.model.dto.MeasureResultDTO.{ResultValueType, TypedValue} -import za.co.absa.atum.model.dto.MeasureResultDTO1.ResultValueType1 import za.co.absa.atum.model.dto._ object PlayJsonImplicits { @@ -58,6 +57,9 @@ object PlayJsonImplicits { } } + implicit val readsMeasureDTO: Reads[MeasureDTO] = Json.reads[MeasureDTO] + implicit val writesMeasureDTO: Writes[MeasureDTO] = Json.writes[MeasureDTO] + implicit val readsTypedValue: Reads[MeasureResultDTO.TypedValue] = Json.reads[MeasureResultDTO.TypedValue] implicit val writesTypedValue: Writes[MeasureResultDTO.TypedValue] = Json.writes[MeasureResultDTO.TypedValue] @@ -69,17 +71,14 @@ object PlayJsonImplicits { implicit val writesMeasureResultDTO: Writes[MeasureResultDTO] = Json.writes[MeasureResultDTO] - implicit val readsMeasureDTO: Reads[MeasureDTO] = Json.reads[MeasureDTO] - implicit val writesMeasureDTO: Writes[MeasureDTO] = Json.writes[MeasureDTO] - implicit val readsMeasurementDTO: Reads[MeasurementDTO] = Json.reads[MeasurementDTO] implicit val writesMeasurementDTO: Writes[MeasurementDTO] = Json.writes[MeasurementDTO] implicit val readsPartitionDTO: Reads[PartitionDTO] = Json.reads[PartitionDTO] implicit val writesPartitionDTO: Writes[PartitionDTO] = Json.writes[PartitionDTO] - implicit val readsCheckpointDTO: Reads[CheckpointSubmitDTO] = Json.reads[CheckpointSubmitDTO] - implicit val writesCheckpointDTO: Writes[CheckpointSubmitDTO] = Json.writes[CheckpointSubmitDTO] + implicit val readsCheckpointDTO: Reads[CheckpointDTO] = Json.reads[CheckpointDTO] + implicit val writesCheckpointDTO: Writes[CheckpointDTO] = Json.writes[CheckpointDTO] implicit val readsPartitioningSubmitDTO: Reads[PartitioningSubmitDTO] = Json.reads[PartitioningSubmitDTO] implicit val writesPartitioningSubmitDTO: Writes[PartitioningSubmitDTO] = Json.writes[PartitioningSubmitDTO] @@ -96,46 +95,4 @@ object PlayJsonImplicits { implicit val readsCheckpointQueryDTO: Reads[CheckpointQueryDTO] = Json.reads[CheckpointQueryDTO] implicit val writesCheckpointQueryDTO: Writes[CheckpointQueryDTO] = Json.writes[CheckpointQueryDTO] - - - // TODO REMOVE - - - implicit val resultValueTypeReads1: Reads[ResultValueType1] = new Reads[ResultValueType1] { - override def reads(json: JsValue): JsResult[ResultValueType1] = json match { - case JsString("String") => JsSuccess(ResultValueType1.String) - case JsString("Long") => JsSuccess(ResultValueType1.Long) - case JsString("BigDecimal") => JsSuccess(ResultValueType1.BigDecimal) - case JsString("Double") => JsSuccess(ResultValueType1.Double) - case _ => JsError("Invalid ResultValueType1") - } - } - - implicit val resultValueTypeWrites1: Writes[ResultValueType1] = new Writes[ResultValueType1] { - def writes(resultValueType: ResultValueType1): JsValue = resultValueType match { - case ResultValueType1.String => Json.toJson("String") - case ResultValueType1.Long => Json.toJson("Long") - case ResultValueType1.BigDecimal => Json.toJson("BigDecimal") - case ResultValueType1.Double => Json.toJson("Double") - } - } - - implicit val readsTypedValue1: Reads[MeasureResultDTO1.TypedValue1] = Json.reads[MeasureResultDTO1.TypedValue1] - implicit val writesTypedValue1: Writes[MeasureResultDTO1.TypedValue1] = Json.writes[MeasureResultDTO1.TypedValue1] - - implicit val readsMeasureResultDTO1: Reads[MeasureResultDTO1] = { - ((__ \ "mainValue").read[MeasureResultDTO1.TypedValue1] and - (__ \ "supportValues").readNullable[Map[String, String /* MeasureResultDTO1.TypedValue1 */]].map(_.getOrElse(Map.empty)) - )(MeasureResultDTO1.apply _) - } - - implicit val writesMeasureResultDTO1: Writes[MeasureResultDTO1] = Json.writes[MeasureResultDTO1] - - - - - implicit val readsCheckpointQueryResultDTO: Reads[CheckpointQueryResultDTO] = Json.reads[CheckpointQueryResultDTO] - implicit val writesCheckpointQueryResultDTO: Writes[CheckpointQueryResultDTO] = Json.writes[CheckpointQueryResultDTO] - - } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 0884dbd19..4e8f84500 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -94,7 +94,7 @@ trait TestData { ) // Checkpoint - protected val checkpointDTO1: CheckpointSubmitDTO = CheckpointSubmitDTO( + protected val checkpointDTO1: CheckpointDTO = CheckpointDTO( id = UUID.randomUUID(), name = "name", author = "author", @@ -103,9 +103,9 @@ trait TestData { processEndTime = None, measurements = Set.empty ) - protected val checkpointDTO2: CheckpointSubmitDTO = checkpointDTO1.copy(id = UUID.randomUUID()) + protected val checkpointDTO2: CheckpointDTO = checkpointDTO1.copy(id = UUID.randomUUID()) - protected val checkpointDTO3: CheckpointSubmitDTO = checkpointDTO1.copy(id = UUID.randomUUID()) + protected val checkpointDTO3: CheckpointDTO = checkpointDTO1.copy(id = UUID.randomUUID()) protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*"))) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerSpec.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerSpec.scala index 66f61cd8f..04f6ba670 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerSpec.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerSpec.scala @@ -46,7 +46,7 @@ class CheckpointControllerSpec extends ZIOSpecDefault with TestData { suite("CheckpointControllerSuite")( suite("CreateCheckpointSuite")( - test("Returns expected CheckpointSubmitDTO") { + test("Returns expected CheckpointDTO") { for { result <- CheckpointController.createCheckpoint(checkpointDTO1) } yield assertTrue(result == checkpointDTO1) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerSpec.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerSpec.scala index 2295a5b24..cebda9f67 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerSpec.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerSpec.scala @@ -1,18 +1,19 @@ /* -* Copyright 2021 ABSA Group Limited -* -* Licensed under the Apache License, Version 2.0 (the "License"); -* you may not use this file except in compliance with the License. -* You may obtain a copy of the License at -* -* http://www.apache.org/licenses/LICENSE-2.0 -* -* Unless required by applicable law or agreed to in writing, software -* distributed under the License is distributed on an "AS IS" BASIS, -* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -* See the License for the specific language governing permissions and -* limitations under the License. -*/ + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.atum.server.api.controller import org.mockito.Mockito.{mock, when} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointSpec.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointSpec.scala index 091769df0..dd5383d1f 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointSpec.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointSpec.scala @@ -38,7 +38,7 @@ class WriteCheckpointSpec extends ConfigProviderSpec { suite("WriteCheckpointSuite")( test("Returns expected Left with DataNotFoundException as related partitioning is not in the database") { - val checkpointDTO = CheckpointSubmitDTO( + val checkpointDTO = CheckpointDTO( id = UUID.randomUUID(), name = "name", author = "author", diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointSpec.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointSpec.scala index 1370b87df..51c3bb8d2 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointSpec.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/CreateCheckpointEndpointSpec.scala @@ -23,7 +23,7 @@ import sttp.client3.playJson._ import sttp.model.StatusCode import sttp.tapir.server.stub.TapirStubInterpreter import sttp.tapir.ztapir.{RIOMonadError, RichZEndpoint} -import za.co.absa.atum.model.dto.CheckpointSubmitDTO +import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.controller.CheckpointController import za.co.absa.atum.server.model.{GeneralErrorResponse, InternalServerErrorResponse} @@ -56,10 +56,10 @@ object CreateCheckpointEndpointSpec extends ZIOSpecDefault with Endpoints with T val request = basicRequest .post(uri"https://test.com/api/v1/createCheckpoint") - .response(asJson[CheckpointSubmitDTO]) + .response(asJson[CheckpointDTO]) suite("CreateCheckpointEndpointSuite")( - test("Returns expected CheckpointSubmitDTO") { + test("Returns expected CheckpointDTO") { val response = request .body(checkpointDTO1) .send(backendStub)