diff --git a/model/src/main/scala/za/co/absa/atum/model/dto/package.scala b/model/src/main/scala/za/co/absa/atum/model/dto/package.scala index bca1a6c77..b025d2b27 100644 --- a/model/src/main/scala/za/co/absa/atum/model/dto/package.scala +++ b/model/src/main/scala/za/co/absa/atum/model/dto/package.scala @@ -17,15 +17,18 @@ package za.co.absa.atum.model -import io.circe.generic.semiauto._ import io.circe._ package object dto { type PartitioningDTO = Seq[PartitionDTO] type AdditionalDataDTO = Map[String, Option[String]] + // Todo. This implicit definition should not be defined here, so it is to be addressed in Ticket #221 // Implicit encoders and decoders for AdditionalDataDTO implicit val decodeAdditionalDataDTO: Decoder[AdditionalDataDTO] = Decoder.decodeMap[String, Option[String]] implicit val encodeAdditionalDataDTO: Encoder[AdditionalDataDTO] = Encoder.encodeMap[String, Option[String]] + // Implicit encoders and decoders for PartitioningDTO + implicit val decodePartitioningDTO: Decoder[PartitioningDTO] = Decoder.decodeSeq[PartitionDTO] + implicit val encodePartitioningDTO: Encoder[PartitioningDTO] = Encoder.encodeSeq[PartitionDTO] } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index b391a87fa..6ca200e35 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -42,7 +42,7 @@ object Dependencies { val postgresql = "42.6.0" - val fadb = "0.3.0" + val fadb = "0.5.0" val logback = "1.2.3" @@ -105,7 +105,7 @@ object Dependencies { val zioOrg = "dev.zio" val tapirOrg = "com.softwaremill.sttp.tapir" val http4sOrg = "org.http4s" - val faDbOrg = "za.co.absa.fa-db" + val faDbOrg = "za.co.absa.db.fa-db" val sbtOrg = "com.github.sbt" val logbackOrg = "ch.qos.logback" val awsSdkOrg = "software.amazon.awssdk" @@ -147,7 +147,6 @@ object Dependencies { // Fa-db lazy val faDbDoobie = faDbOrg %% "doobie" % Versions.fadb - lazy val pgCirceDoobie = "org.tpolecat" %% "doobie-postgres-circe" % "1.0.0-RC2" // aws lazy val awsSecretsManagerSdk = awsSdkOrg % "secretsmanager" % Versions.awssdk @@ -160,7 +159,6 @@ object Dependencies { Seq( faDbDoobie, - pgCirceDoobie, zioCore, zioMacros, zioLogging, diff --git a/server/README.md b/server/README.md index 5dd693703..35be75d25 100644 --- a/server/README.md +++ b/server/README.md @@ -9,21 +9,24 @@ To create a jar file that can be executed: > java -jar server/target/jvm-2.13/*.jar ``` -If you want to quickly build and run from sbt you can run using the command below (alternatively you can execute za.co.absa.atum.server.Main within your IDE). This deploys it to `localhost:8080`. +If you want to quickly build and run from sbt you can run using the command below (alternatively you can execute +za.co.absa.atum.server.Main within your IDE). This deploys it to `localhost:8080`. ```shell sbt "server/runMain za.co.absa.atum.server.Main" ``` -### REST API Reference +### REST API Reference The REST API exposes a Swagger Documentation UI which documents all the HTTP endpoints exposed. It can be found at **{REST_API_HOST}/docs/** (e.g. `http://localhost:8080/docs/`) ### Observability metrics -Optionally you can run server with monitoring that collects metrics about http communication and/or jvm/zio runtime. `intervalInSeconds` parameter refers to frequency of data collection from its runtime environment. -Monitoring of http communication is based on intercepting of http calls therefore `intervalInSeconds` parameter does not apply. +Optionally you can run server with monitoring that collects metrics about http communication and/or jvm/zio +runtime. `intervalInSeconds` parameter refers to frequency of data collection from its runtime environment. +Monitoring of http communication is based on intercepting of http calls therefore `intervalInSeconds` parameter does not +apply. ``` { @@ -41,5 +44,7 @@ Monitoring of http communication is based on intercepting of http calls therefor } ``` -When monitoring enabled, the application exposes `http://localhost:8080/metrics` and/or `http://localhost:8080/zio-metrics` endpoints which can be scraped by Prometheus. -For testing purposes there is [docker-compose.yml](./docker-compose.yml) file which can be used to start up dockerized Prometheus and Grafana instances. Prometheus scraping configs are defined in [prometheus.yml](./prometheus.yml) file. +When monitoring enabled, the application exposes `http://localhost:8080/metrics` +and/or `http://localhost:8080/zio-metrics` endpoints which can be scraped by Prometheus. +For testing purposes there is [docker-compose.yml](./docker-compose.yml) file which can be used to start up dockerized +Prometheus and Grafana instances. Prometheus scraping configs are defined in [prometheus.yml](./prometheus.yml) file. diff --git a/server/prometheus.yml b/server/prometheus.yml index 5abcfa42c..7d0a27cad 100644 --- a/server/prometheus.yml +++ b/server/prometheus.yml @@ -19,14 +19,14 @@ scrape_configs: - job_name: 'atum_server_http4s' metrics_path: /metrics static_configs: - - targets: ['host.docker.internal:8080'] + - targets: [ 'host.docker.internal:8080' ] labels: env: 'local' app: 'atum' - job_name: 'atum_server_zio_runtime' metrics_path: /zio-metrics static_configs: - - targets: ['host.docker.internal:8080'] + - targets: [ 'host.docker.internal:8080' ] labels: env: 'local' app: 'atum' diff --git a/server/src/main/scala/za/co/absa/atum/server/Main.scala b/server/src/main/scala/za/co/absa/atum/server/Main.scala index 34295349e..19ab07fb8 100644 --- a/server/src/main/scala/za/co/absa/atum/server/Main.scala +++ b/server/src/main/scala/za/co/absa/atum/server/Main.scala @@ -67,7 +67,7 @@ object Main extends ZIOAppDefault with Server { // enabling conditionally collection of ZIO runtime metrics and default JVM metrics if (jvmMonitoringConfig.enabled) { ZLayer.succeed(MetricsConfig(Duration.ofSeconds(jvmMonitoringConfig.intervalInSeconds))) ++ - Runtime.enableRuntimeMetrics.unit ++ DefaultJvmMetrics.live.unit + Runtime.enableRuntimeMetrics.unit ++ DefaultJvmMetrics.live.unit } else { ZLayer.succeed(MetricsConfig(Duration.ofSeconds(Long.MaxValue))) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala index 9bfe5c83e..88c133487 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/BaseController.scala @@ -17,9 +17,8 @@ package za.co.absa.atum.server.api.controller import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.atum.server.model.{ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse} +import za.co.absa.atum.server.model.{ErrorResponse, InternalServerErrorResponse} import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} -import za.co.absa.fadb.exceptions.StatusException import zio._ trait BaseController { @@ -33,26 +32,8 @@ trait BaseController { .mapError { serviceError: ServiceError => InternalServerErrorResponse(serviceError.message) } - .flatMap { - result => ZIO.succeed(onSuccessFnc(result)) - } - - } - - def serviceCallWithStatus[A, B]( - serviceCall: IO[ServiceError, Either[StatusException, A]], - onSuccessFnc: A => B - ): IO[ErrorResponse, B] = { - - serviceCall - .mapError { serviceError: ServiceError => - InternalServerErrorResponse(serviceError.message) - } - .flatMap { - case Left(statusException) => - ZIO.fail(GeneralErrorResponse(s"(${statusException.status.statusCode}) ${statusException.status.statusText}")) - case Right(result) => - ZIO.succeed(onSuccessFnc(result)) + .flatMap { result => + ZIO.succeed(onSuccessFnc(result)) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala index 8815a89c2..b64c825a7 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/CheckpointControllerImpl.scala @@ -27,7 +27,7 @@ class CheckpointControllerImpl(checkpointService: CheckpointService) extends Che override def createCheckpointV1( checkpointDTO: CheckpointDTO ): IO[ErrorResponse, CheckpointDTO] = { - serviceCallWithStatus[Unit, CheckpointDTO]( + serviceCall[Unit, CheckpointDTO]( checkpointService.saveCheckpoint(checkpointDTO), _ => checkpointDTO ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala index d122a93c2..684c8e8ac 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/FlowController.scala @@ -24,5 +24,7 @@ import zio.macros.accessible @accessible trait FlowController { - def getFlowCheckpointsV2(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] + def getFlowCheckpointsV2( + checkpointQueryDTO: CheckpointQueryDTO + ): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala index d37cc94a1..6fe9efff9 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/controller/PartitioningControllerImpl.scala @@ -59,7 +59,7 @@ class PartitioningControllerImpl(partitioningService: PartitioningService) additionalData: AdditionalDataSubmitDTO ): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]] = { mapToSingleSuccessResponse( - serviceCallWithStatus[Unit, AdditionalDataSubmitDTO]( + serviceCall[Unit, AdditionalDataSubmitDTO]( partitioningService.createOrUpdateAdditionalData(additionalData), _ => additionalData ) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala index 28163b264..a37c3629c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/DoobieImplicits.scala @@ -16,97 +16,17 @@ package za.co.absa.atum.server.api.database -import cats.Show -import cats.data.NonEmptyList import doobie.postgres.implicits._ import doobie.{Get, Put} -import io.circe.{Json => CirceJson} -import org.postgresql.jdbc.PgArray -import org.postgresql.util.PGobject -import io.circe.parser._ -import scala.util.Try - -package object DoobieImplicits { - - private implicit val showPgArray: Show[PgArray] = Show.fromToString +object DoobieImplicits { implicit val getMapWithOptionStringValues: Get[Map[String, Option[String]]] = Get[Map[String, String]] .tmap(map => map.map { case (k, v) => k -> Option(v) }) - private def circeJsonListToPGJsonArrayString(jsonList: List[CirceJson]): String = { - val arrayElements = jsonList.map { x => - // Convert to compact JSON string and escape inner quotes - val escapedJsonString = x.noSpaces.replace("\"", "\\\"") - // Wrap in double quotes for the array element - s""""$escapedJsonString"""" - } - - arrayElements.mkString("{", ",", "}") - } - - private def pgArrayToListOfCirceJson(pgArray: PgArray): Either[String, List[CirceJson]] = { - Try { - Option(pgArray.getArray) match { - case Some(array: Array[_]) => array.collect { - case str: String => parse(str).toTry.get - case other => parse(other.toString).toTry.get - }.toList - case None => List.empty[CirceJson] - case _ => throw new IllegalArgumentException("Unexpected type encountered.") - } - } - .toEither - .left.map(_.getMessage) - } - object Sequence { - implicit val get: Get[Seq[String]] = Get[List[String]].map(_.toSeq) implicit val put: Put[Seq[String]] = Put[List[String]].contramap(_.toList) - - } - - object Json { - - implicit val jsonArrayPut: Put[List[CirceJson]] = { - Put.Advanced - .other[PGobject]( - NonEmptyList.of("json[]") - ) - .tcontramap { a => - val o = new PGobject - o.setType("json[]") - o.setValue(circeJsonListToPGJsonArrayString(a)) - o - } - } - - implicit val jsonArrayGet: Get[List[CirceJson]] = { - Get.Advanced - .other[PgArray]( - NonEmptyList.of("json[]") - ) - .temap(pgArray => pgArrayToListOfCirceJson(pgArray)) - } - - } - - object Jsonb { - - implicit val jsonbArrayPut: Put[List[CirceJson]] = { - Put.Advanced - .other[PGobject]( - NonEmptyList.of("jsonb[]") - ) - .tcontramap { a => - val o = new PGobject - o.setType("jsonb[]") - o.setValue(circeJsonListToPGJsonArrayString(a)) - o - } - } - } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/PostgresDatabaseProvider.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/PostgresDatabaseProvider.scala index 68ab05c56..27d1bdf87 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/PostgresDatabaseProvider.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/PostgresDatabaseProvider.scala @@ -17,7 +17,7 @@ package za.co.absa.atum.server.api.database import doobie.Transactor -import za.co.absa.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieEngine import zio._ import zio.interop.catz._ diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/TransactorProvider.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/TransactorProvider.scala index a82a07aa5..2ce29fd26 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/TransactorProvider.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/TransactorProvider.scala @@ -32,10 +32,12 @@ object TransactorProvider { awsConfig <- ZIO.config[AwsConfig](AwsConfig.config) awsSecretsProvider <- ZIO.service[AwsSecretsProvider] - password <- awsSecretsProvider.getSecretValue(awsConfig.dbPasswordSecretName) + password <- awsSecretsProvider + .getSecretValue(awsConfig.dbPasswordSecretName) // fallback to password property's value from postgres section of reference.conf; useful for local testing .orElse { - ZIO.logError("Credentials were not retrieved from AWS, falling back to config value.") + ZIO + .logError("Credentials were not retrieved from AWS, falling back to config value.") .as(postgresConfig.password) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala index 94ca4c13e..4e993a7c3 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/Flows.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.database.flows -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.naming.implementations.SnakeCaseNaming.Implicits._ object Flows extends DBSchema diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala index a145d55c3..7d0c8e079 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala @@ -16,50 +16,44 @@ package za.co.absa.atum.server.api.database.flows.functions -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read import za.co.absa.atum.model.dto.CheckpointQueryDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.flows.Flows import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.DoobieEngine -import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import zio._ -import zio.interop.catz._ - import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get - import doobie.postgres.implicits._ -import doobie.postgres.circe.jsonb.implicits.jsonbPut -import doobie.postgres.circe.json.implicits.jsonGet +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} import io.circe.syntax.EncoderOps +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunction[CheckpointQueryDTO, CheckpointFromDB, Task] { + extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values => + Seq( + fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", + fr"${values.limit}", + fr"${values.checkpointName}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { - override val fieldsToSelect: Seq[String] = Seq( + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", "checkpoint_name", "author", "measured_by_atum_agent", - "measure_name", "measured_columns", "measurement_value", - "checkpoint_start_time", "checkpoint_end_time", + "measure_name", + "measured_columns", + "measurement_value", + "checkpoint_start_time", + "checkpoint_end_time" ) - - override def sql(values: CheckpointQueryDTO)(implicit read: Read[CheckpointFromDB]): Fragment = { - val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) - val partitioningNormalized = partitioning.asJson - - sql"""SELECT ${Fragment.const(selectEntry)} - FROM ${Fragment.const(functionName)}( - $partitioningNormalized, - ${values.limit}, - ${values.checkpointName} - ) AS ${Fragment.const(alias)};""" - } - } object GetFlowCheckpoints { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/Runs.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/Runs.scala index 13b9fd807..c950cbf54 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/Runs.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/Runs.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.database.runs -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.naming.implementations.SnakeCaseNaming.Implicits._ +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.naming.implementations.SnakeCaseNaming.Implicits._ object Runs extends DBSchema diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalData.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalData.scala index c28c74627..ca8281a55 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalData.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalData.scala @@ -16,42 +16,30 @@ package za.co.absa.atum.server.api.database.runs.functions -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read import za.co.absa.atum.model.dto.AdditionalDataSubmitDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import za.co.absa.atum.server.model.PartitioningForDB -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus -import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData} -import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import zio._ -import zio.interop.catz._ import io.circe.syntax._ import doobie.postgres.implicits._ -import doobie.postgres.circe.jsonb.implicits.jsonbPut +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut class CreateOrUpdateAdditionalData(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task] - with StandardStatusHandling { - - override def sql(values: AdditionalDataSubmitDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = { - val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) - val partitioningJson = partitioning.asJson - - // implicits from Doobie can't handle Map[String, Option[String]] -> HStore, so we converted None to null basically - val additionalDataNormalized = values.additionalData.map{ case (k, v) => (k, v.orNull)} - - sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}( - $partitioningJson, - $additionalDataNormalized, - ${values.author} - ) ${Fragment.const(alias)};""" - } -} + extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task](values => + Seq( + fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", + fr"${values.additionalData.map { case (k, v) => (k, v.orNull) }}", + fr"${values.author}" + ) + ) + with StandardStatusHandling object CreateOrUpdateAdditionalData { val layer: URLayer[PostgresDatabaseProvider, CreateOrUpdateAdditionalData] = ZLayer { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExists.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExists.scala index 65cab3a1e..a9f1e8b13 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExists.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExists.scala @@ -16,44 +16,29 @@ package za.co.absa.atum.server.api.database.runs.functions - -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read import za.co.absa.atum.model.dto.PartitioningSubmitDTO import za.co.absa.atum.server.model.PartitioningForDB -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData} -import za.co.absa.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus -import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import zio._ -import zio.interop.catz._ import io.circe.syntax._ -import doobie.postgres.circe.jsonb.implicits.jsonbPut +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut class CreatePartitioningIfNotExists(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieSingleResultFunctionWithStatus[PartitioningSubmitDTO, Unit, Task] - with StandardStatusHandling { - - override def sql(values: PartitioningSubmitDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = { - val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) - val partitioningJson = partitioning.asJson - - val parentPartitioningJson = values.parentPartitioning.map { parentPartitioning => - val parentPartitioningForDB = PartitioningForDB.fromSeqPartitionDTO(parentPartitioning) - parentPartitioningForDB.asJson - } - - sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}( - $partitioningJson, - ${values.authorIfNew}, - $parentPartitioningJson - ) ${Fragment.const(alias)};""" - } -} + extends DoobieSingleResultFunctionWithStatus[PartitioningSubmitDTO, Unit, Task](values => + Seq( + fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", + fr"${values.authorIfNew}", + fr"${values.parentPartitioning.map(PartitioningForDB.fromSeqPartitionDTO).map(_.asJson)}" + ) + ) + with StandardStatusHandling object CreatePartitioningIfNotExists { val layer: URLayer[PostgresDatabaseProvider, CreatePartitioningIfNotExists] = ZLayer { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalData.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalData.scala index e3d4ac86d..968e1081b 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalData.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalData.scala @@ -16,38 +16,29 @@ package za.co.absa.atum.server.api.database.runs.functions -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read import za.co.absa.atum.model.dto.PartitioningDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs -import za.co.absa.atum.server.model.PartitioningForDB -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction -import za.co.absa.fadb.doobie.DoobieEngine -import zio.interop.catz.asyncInstance +import za.co.absa.atum.server.model.{AdditionalDataFromDB, PartitioningForDB} +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus +import za.co.absa.db.fadb.doobie.DoobieEngine import zio.{Task, URLayer, ZIO, ZLayer} import io.circe.syntax._ - import za.co.absa.atum.server.api.database.DoobieImplicits.getMapWithOptionStringValues -import doobie.postgres.circe.jsonb.implicits.jsonbPut - -class GetPartitioningAdditionalData (implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunction[PartitioningDTO, (String, Option[String]), Task] - { - - override val fieldsToSelect: Seq[String] = Seq("ad_name", "ad_value") - - override def sql(values: PartitioningDTO)(implicit read: Read[(String, Option[String])]): Fragment = { - val partitioning: PartitioningForDB = PartitioningForDB.fromSeqPartitionDTO(values) - val partitioningJson = partitioning.asJson - - sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}( - $partitioningJson - ) ${Fragment.const(alias)};""" - } - +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling + +class GetPartitioningAdditionalData(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, AdditionalDataFromDB, Task](values => + Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}") + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ad_name", "ad_value") } object GetPartitioningAdditionalData { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala index 4427c4797..4901b867a 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpoints.scala @@ -16,29 +16,34 @@ package za.co.absa.atum.server.api.database.runs.functions -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read import za.co.absa.atum.model.dto.CheckpointQueryDTO import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import za.co.absa.atum.server.model.{CheckpointFromDB, PartitioningForDB} -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.DoobieEngine -import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import zio._ -import zio.interop.catz._ -import io.circe.syntax._ - +import io.circe.syntax.EncoderOps import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.implicits._ -import doobie.postgres.circe.jsonb.implicits.jsonbPut -import doobie.postgres.circe.json.implicits.jsonGet +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling -class GetPartitioningCheckpoints (implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunction[CheckpointQueryDTO, CheckpointFromDB, Task] { +class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values => + Seq( + fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", + fr"${values.limit}", + fr"${values.checkpointName}" + ) + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { - override val fieldsToSelect: Seq[String] = Seq( + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq( "id_checkpoint", "checkpoint_name", "author", @@ -47,20 +52,8 @@ class GetPartitioningCheckpoints (implicit schema: DBSchema, dbEngine: DoobieEng "measured_columns", "measurement_value", "checkpoint_start_time", - "checkpoint_end_time", + "checkpoint_end_time" ) - - override def sql(values: CheckpointQueryDTO)(implicit read: Read[CheckpointFromDB]): Fragment = { - val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) - val partitioningNormalized = partitioning.asJson - - sql"""SELECT ${Fragment.const(selectEntry)} - FROM ${Fragment.const(functionName)}( - $partitioningNormalized, - ${values.limit}, - ${values.checkpointName} - ) AS ${Fragment.const(alias)};""" - } } object GetPartitioningCheckpoints { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasures.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasures.scala index ed1694f3d..9b653365d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasures.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasures.scala @@ -16,38 +16,30 @@ package za.co.absa.atum.server.api.database.runs.functions -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read -import za.co.absa.atum.model.dto.{MeasureDTO, PartitioningDTO} +import za.co.absa.atum.model.dto.PartitioningDTO import za.co.absa.atum.server.model.PartitioningForDB -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.DoobieEngine -import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import zio._ -import zio.interop.catz._ import io.circe.syntax._ - import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get import doobie.postgres.circe.jsonb.implicits.jsonbPut - -class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieMultipleResultFunction[PartitioningDTO, MeasureDTO, Task] - { - - override val fieldsToSelect: Seq[String] = Seq("measure_name", "measured_columns") - - override def sql(values: PartitioningDTO)(implicit read: Read[MeasureDTO]): Fragment = { - val partitioning = PartitioningForDB.fromSeqPartitionDTO(values) - val partitioningJson = partitioning.asJson - - sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}( - $partitioningJson - ) ${Fragment.const(alias)};""" - } - +import za.co.absa.atum.server.model.MeasureFromDB +import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling + +class GetPartitioningMeasures(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) + extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, MeasureFromDB, Task](values => + Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}") + ) + with StandardStatusHandling + with ByFirstErrorStatusAggregator { + + override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns") } object GetPartitioningMeasures { @@ -57,4 +49,3 @@ object GetPartitioningMeasures { } yield new GetPartitioningMeasures()(Runs, dbProvider.dbEngine) } } - diff --git a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala index a196038c1..ce6c5c129 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpoint.scala @@ -16,53 +16,37 @@ package za.co.absa.atum.server.api.database.runs.functions -import doobie.Fragment import doobie.implicits.toSqlInterpolator -import doobie.util.Read import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.model.PartitioningForDB -import za.co.absa.fadb.DBSchema -import za.co.absa.fadb.doobie.{DoobieEngine, StatusWithData} -import za.co.absa.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus -import za.co.absa.fadb.status.handling.implementations.StandardStatusHandling +import za.co.absa.db.fadb.DBSchema +import za.co.absa.db.fadb.doobie.DoobieEngine +import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieSingleResultFunctionWithStatus +import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import za.co.absa.atum.server.api.database.runs.Runs import zio._ -import zio.interop.catz._ import io.circe.syntax._ import za.co.absa.atum.model.dto.MeasureResultDTO._ import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get -import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbArrayPut -import doobie.postgres.circe.jsonb.implicits.jsonbGet -import doobie.postgres.circe.jsonb.implicits.jsonbPut +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut +import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut import doobie.postgres.implicits._ class WriteCheckpoint(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) - extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task] - with StandardStatusHandling { - - override def sql(values: CheckpointDTO)(implicit read: Read[StatusWithData[Unit]]): Fragment = { - val partitioning = PartitioningForDB.fromSeqPartitionDTO(values.partitioning) - val partitioningNormalized = partitioning.asJson - - // List[String] containing json data has to be properly escaped - // It would be safer to use Json data type and derive Put instance - val measurementsNormalized = { - values.measurements.toList.map(_.asJson) - } - - sql"""SELECT ${Fragment.const(selectEntry)} FROM ${Fragment.const(functionName)}( - $partitioningNormalized, - ${values.id}, - ${values.name}, - ${values.processStartTime}, - ${values.processEndTime}, - $measurementsNormalized, - ${values.measuredByAtumAgent}, - ${values.author} - ) ${Fragment.const(alias)};""" - } -} + extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task](values => + Seq( + fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}", + fr"${values.id}", + fr"${values.name}", + fr"${values.processStartTime}", + fr"${values.processEndTime}", + fr"${values.measurements.toList.map(_.asJson)}", + fr"${values.measuredByAtumAgent}", + fr"${values.author}" + ) + ) + with StandardStatusHandling object WriteCheckpoint { val layer: URLayer[PostgresDatabaseProvider, WriteCheckpoint] = ZLayer { diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala index 422f044ef..3ef8edfb1 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/BaseEndpoints.scala @@ -19,7 +19,12 @@ package za.co.absa.atum.server.api.http import sttp.model.StatusCode import sttp.tapir.generic.auto.schemaForCaseClass import sttp.tapir.json.circe.jsonBody -import za.co.absa.atum.server.model.{BadRequestResponse, ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse} +import za.co.absa.atum.server.model.{ + BadRequestResponse, + ErrorResponse, + GeneralErrorResponse, + InternalServerErrorResponse +} import sttp.tapir.typelevel.MatchType import sttp.tapir.ztapir._ import sttp.tapir.{EndpointOutput, PublicEndpoint} diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala index f86103204..792d68a25 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Endpoints.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.api.http - import sttp.model.StatusCode import sttp.tapir.generic.auto.schemaForCaseClass import sttp.tapir.ztapir._ @@ -27,7 +26,6 @@ import za.co.absa.atum.server.model.ErrorResponse import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse} import sttp.tapir.{PublicEndpoint, endpoint} - trait Endpoints extends BaseEndpoints { protected val createCheckpointEndpointV1: PublicEndpoint[CheckpointDTO, ErrorResponse, CheckpointDTO, Any] = { @@ -38,7 +36,8 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[CheckpointDTO]) } - protected val createCheckpointEndpointV2: PublicEndpoint[CheckpointDTO, ErrorResponse, SingleSuccessResponse[CheckpointDTO], Any] = { + protected val createCheckpointEndpointV2 + : PublicEndpoint[CheckpointDTO, ErrorResponse, SingleSuccessResponse[CheckpointDTO], Any] = { apiV2.post .in(CreateCheckpoint) .in(jsonBody[CheckpointDTO]) @@ -46,7 +45,8 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[SingleSuccessResponse[CheckpointDTO]]) } - protected val createPartitioningEndpointV1: PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, AtumContextDTO, Any] = { + protected val createPartitioningEndpointV1 + : PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, AtumContextDTO, Any] = { apiV1.post .in(pathToAPIv1CompatibleFormat(CreatePartitioning)) .in(jsonBody[PartitioningSubmitDTO]) @@ -54,7 +54,8 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[AtumContextDTO]) } - protected val createPartitioningEndpointV2: PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, SingleSuccessResponse[AtumContextDTO], Any] = { + protected val createPartitioningEndpointV2 + : PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, SingleSuccessResponse[AtumContextDTO], Any] = { apiV2.post .in(CreatePartitioning) .in(jsonBody[PartitioningSubmitDTO]) @@ -62,7 +63,8 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[SingleSuccessResponse[AtumContextDTO]]) } - protected val createOrUpdateAdditionalDataEndpointV2: PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = { + protected val createOrUpdateAdditionalDataEndpointV2 + : PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = { apiV2.post .in(CreateOrUpdateAdditionalData) .in(jsonBody[AdditionalDataSubmitDTO]) @@ -70,7 +72,8 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[SingleSuccessResponse[AdditionalDataSubmitDTO]]) } - protected val getPartitioningCheckpointsEndpointV2: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { + protected val getPartitioningCheckpointsEndpointV2 + : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { apiV2.get .in(GetPartitioningCheckpoints) .in(jsonBody[CheckpointQueryDTO]) @@ -78,7 +81,8 @@ trait Endpoints extends BaseEndpoints { .out(jsonBody[MultiSuccessResponse[CheckpointDTO]]) } - protected val getFlowCheckpointsEndpointV2: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { + protected val getFlowCheckpointsEndpointV2 + : PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = { apiV2.post .in(GetFlowCheckpoints) .in(jsonBody[CheckpointQueryDTO]) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala index 8f03690c1..1da88efce 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/http/Routes.scala @@ -42,10 +42,13 @@ trait Routes extends Endpoints with ServerOptions { createServerEndpoint(createCheckpointEndpointV2, CheckpointController.createCheckpointV2), createServerEndpoint(createPartitioningEndpointV1, PartitioningController.createPartitioningIfNotExistsV1), createServerEndpoint(createPartitioningEndpointV2, PartitioningController.createPartitioningIfNotExistsV2), - createServerEndpoint(createOrUpdateAdditionalDataEndpointV2, PartitioningController.createOrUpdateAdditionalDataV2), + createServerEndpoint( + createOrUpdateAdditionalDataEndpointV2, + PartitioningController.createOrUpdateAdditionalDataV2 + ), createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2), createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2), - createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit), + createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit) ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes } @@ -61,7 +64,7 @@ trait Routes extends Endpoints with ServerOptions { createPartitioningEndpointV2, createOrUpdateAdditionalDataEndpointV2, getPartitioningCheckpointsEndpointV2, - getFlowCheckpointsEndpointV2, + getFlowCheckpointsEndpointV2 ) ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None)) .from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion)) diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala index 31116e7ea..9f9764f91 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/BaseRepository.scala @@ -17,35 +17,62 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.fadb.exceptions.StatusException +import za.co.absa.db.fadb.exceptions.StatusException +import za.co.absa.db.fadb.status.{FailedOrRow, FailedOrRows} import zio._ trait BaseRepository { - def dbCall[R]( - dbFuncCall: Task[R], - operationName: String - ): IO[DatabaseError, R] = { - dbFuncCall - .zipLeft(ZIO.logDebug(s"Operation '$operationName' succeeded in database")) - .mapError(error => DatabaseError(error.getMessage)) - .tapError(error => ZIO.logError(s"Operation '$operationName' failed: ${error.message}")) - } - - def dbCallWithStatus[R]( - dbFuncCall: Task[Either[StatusException, R]], - operationName: String - ): IO[DatabaseError, Either[StatusException, R]] = { + private def logAndReturn[R]( + operationName: String, + dbFuncCall: Task[Either[StatusException, R]] + ): ZIO[Any, Throwable, Either[StatusException, R]] = { dbFuncCall .tap { - case Left(statusException) => + case Left(statusException: StatusException) => ZIO.logError( s"Exception caused by operation: '$operationName': " + - s"(${statusException.status.statusCode}) ${statusException.status.statusText}" + s"(${statusException.status.statusCode}), ${statusException.status.statusText}" ) case Right(_) => ZIO.logDebug(s"Operation '$operationName' succeeded in database") } - .mapError(error => DatabaseError(error.getMessage)) + } + + private def defaultErrorHandler(operationName: String): PartialFunction[Throwable, DatabaseError] = { + case statusException: StatusException => + DatabaseError( + s"Exception caused by operation: '$operationName': " + + s"(${statusException.status.statusCode}) ${statusException.status.statusText}" + ) + case error => + DatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}") + } + + def dbSingleResultCallWithStatus[R](dbFuncCall: Task[FailedOrRow[R]], operationName: String): IO[DatabaseError, R] = { + logAndReturn(operationName, dbFuncCall) + .flatMap { + case Left(statusException) => ZIO.fail(statusException) + case Right(value) => ZIO.succeed(value.data) + } + .mapError { + defaultErrorHandler(operationName) + } .tapError(error => ZIO.logError(s"Operation '$operationName' failed: ${error.message}")) } + + def dbMultipleResultCallWithAggregatedStatus[R]( + dbFuncCall: Task[FailedOrRows[R]], + operationName: String + ): IO[DatabaseError, Seq[R]] = { + logAndReturn(operationName, dbFuncCall) + .flatMap { + case Left(statusException) => ZIO.fail(statusException) + case Right(value) => ZIO.succeed(value.map(_.data)) + } + .mapError { + defaultErrorHandler(operationName) + } + .tapError(error => ZIO.logError(s"Operation '$operationName' failed: ${error.message}")) + } + } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala index 26b551fcf..59c33d1b6 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepository.scala @@ -18,12 +18,11 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.fadb.exceptions.StatusException import zio._ import zio.macros.accessible @accessible trait CheckpointRepository { - def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Either[StatusException, Unit]] + def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala index e033f7f5f..e63d92c9f 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryImpl.scala @@ -19,13 +19,13 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.fadb.exceptions.StatusException import zio._ +import zio.interop.catz.asyncInstance class CheckpointRepositoryImpl(writeCheckpointFn: WriteCheckpoint) extends CheckpointRepository with BaseRepository { - override def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Either[StatusException, Unit]] = { - dbCallWithStatus(writeCheckpointFn(checkpointDTO), "writeCheckpoint") + override def writeCheckpoint(checkpointDTO: CheckpointDTO): IO[DatabaseError, Unit] = { + dbSingleResultCallWithStatus(writeCheckpointFn(checkpointDTO), "writeCheckpoint") } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala index 3a8e96a88..71152b335 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/FlowRepositoryImpl.scala @@ -21,11 +21,12 @@ import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB import zio._ +import zio.interop.catz.asyncInstance class FlowRepositoryImpl(getFlowCheckpointsFn: GetFlowCheckpoints) extends FlowRepository with BaseRepository { override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbCall(getFlowCheckpointsFn(checkpointQueryDTO), "getFlowCheckpoints") + dbMultipleResultCallWithAggregatedStatus(getFlowCheckpointsFn(checkpointQueryDTO), "getFlowCheckpoints") } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala index d61c13a6f..b34bda040 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepository.scala @@ -16,30 +16,28 @@ package za.co.absa.atum.server.api.repository -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, CheckpointQueryDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{ + AdditionalDataDTO, + AdditionalDataSubmitDTO, + CheckpointQueryDTO, + MeasureDTO, + PartitioningDTO, + PartitioningSubmitDTO +} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB -import za.co.absa.fadb.exceptions.StatusException import zio.IO import zio.macros.accessible @accessible trait PartitioningRepository { - def createPartitioningIfNotExists( - partitioningSubmitDTO: PartitioningSubmitDTO - ): IO[DatabaseError, Either[StatusException, Unit]] + def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[DatabaseError, Unit] - def getPartitioningMeasures( - partitioning: PartitioningDTO - ): IO[DatabaseError, Seq[MeasureDTO]] + def getPartitioningMeasures(partitioning: PartitioningDTO): IO[DatabaseError, Seq[MeasureDTO]] - def getPartitioningAdditionalData( - partitioning: PartitioningDTO - ): IO[DatabaseError, AdditionalDataDTO] + def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[DatabaseError, AdditionalDataDTO] - def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): - IO[DatabaseError, Either[StatusException, Unit]] + def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[DatabaseError, Unit] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): - IO[DatabaseError, Seq[CheckpointFromDB]] + def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala index a9441d66b..fd9fe8cb0 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryImpl.scala @@ -17,20 +17,26 @@ package za.co.absa.atum.server.api.repository import za.co.absa.atum.model.dto.{ - AdditionalDataDTO, AdditionalDataSubmitDTO, - CheckpointQueryDTO, MeasureDTO, PartitioningDTO, - PartitioningSubmitDTO} + AdditionalDataDTO, + AdditionalDataSubmitDTO, + CheckpointQueryDTO, + MeasureDTO, + PartitioningDTO, + PartitioningSubmitDTO +} +import za.co.absa.atum.server.model.MeasureFromDB import za.co.absa.atum.server.api.database.runs.functions.{ CreateOrUpdateAdditionalData, CreatePartitioningIfNotExists, GetPartitioningAdditionalData, GetPartitioningCheckpoints, - GetPartitioningMeasures } + GetPartitioningMeasures +} import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB -import za.co.absa.fadb.exceptions.StatusException import zio._ -import zio.prelude.ZivariantOps +import zio.interop.catz.asyncInstance +import za.co.absa.atum.server.model.AdditionalDataFromDB class PartitioningRepositoryImpl( createPartitioningIfNotExistsFn: CreatePartitioningIfNotExists, @@ -38,36 +44,43 @@ class PartitioningRepositoryImpl( getPartitioningAdditionalDataFn: GetPartitioningAdditionalData, createOrUpdateAdditionalDataFn: CreateOrUpdateAdditionalData, getPartitioningCheckpointsFn: GetPartitioningCheckpoints -) extends PartitioningRepository with BaseRepository { +) extends PartitioningRepository + with BaseRepository { - override def createPartitioningIfNotExists( - partitioningSubmitDTO: PartitioningSubmitDTO - ): IO[DatabaseError, Either[StatusException, Unit]] = { - dbCallWithStatus(createPartitioningIfNotExistsFn(partitioningSubmitDTO), "createPartitioningIfNotExists") + override def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[DatabaseError, Unit] = { + dbSingleResultCallWithStatus( + createPartitioningIfNotExistsFn(partitioningSubmitDTO), + "createPartitioningIfNotExists" + ) } - override def createOrUpdateAdditionalData( - additionalData: AdditionalDataSubmitDTO - ): IO[DatabaseError, Either[StatusException, Unit]] = { - dbCallWithStatus(createOrUpdateAdditionalDataFn(additionalData), "createOrUpdateAdditionalData") + override def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[DatabaseError, Unit] = { + dbSingleResultCallWithStatus(createOrUpdateAdditionalDataFn(additionalData), "createOrUpdateAdditionalData") } - override def getPartitioningMeasures( - partitioning: PartitioningDTO - ): IO[DatabaseError, Seq[MeasureDTO]] = { - val m = getPartitioningMeasuresFn(partitioning) - m.mapLeft(err => DatabaseError(err.getMessage)) + override def getPartitioningMeasures(partitioning: PartitioningDTO): IO[DatabaseError, Seq[MeasureDTO]] = { + dbMultipleResultCallWithAggregatedStatus(getPartitioningMeasuresFn(partitioning), "getPartitioningMeasures") + .map(_.map { case MeasureFromDB(measureName, measuredColumns) => + MeasureDTO(measureName.get, measuredColumns.get) + }) } - override def getPartitioningAdditionalData(partitioning: PartitioningDTO): - IO[DatabaseError, AdditionalDataDTO] = { - getPartitioningAdditionalDataFn(partitioning).mapBoth(err => DatabaseError(err.getMessage), _.toMap) + override def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[DatabaseError, AdditionalDataDTO] = { + dbMultipleResultCallWithAggregatedStatus( + getPartitioningAdditionalDataFn(partitioning), + "getPartitioningAdditionalData" + ).map(_.map { case AdditionalDataFromDB(adName, adValue) => adName.get -> adValue }.toMap) } - override def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): - IO[DatabaseError, Seq[CheckpointFromDB]] = { - dbCall(getPartitioningCheckpointsFn(checkpointQueryDTO), "getPartitioningCheckpoints") + override def getPartitioningCheckpoints( + checkpointQueryDTO: CheckpointQueryDTO + ): IO[DatabaseError, Seq[CheckpointFromDB]] = { + dbMultipleResultCallWithAggregatedStatus( + getPartitioningCheckpointsFn(checkpointQueryDTO), + "getPartitioningCheckpoints" + ) } + } object PartitioningRepositoryImpl { @@ -90,6 +103,7 @@ object PartitioningRepositoryImpl { getPartitioningMeasures, getPartitioningAdditionalData, createOrUpdateAdditionalData, - getPartitioningCheckpoints) + getPartitioningCheckpoints + ) } } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala index 3b070df36..680dcc22d 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/BaseService.scala @@ -17,25 +17,11 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} -import za.co.absa.fadb.exceptions.StatusException import zio._ trait BaseService { - def repositoryCall[R]( - repositoryCall: IO[DatabaseError, R], - operationName: String - ): IO[ServiceError, R] = { - repositoryCall - .mapError { case DatabaseError(message) => - ServiceError(s"Failed to perform '$operationName': $message") - } - } - - def repositoryCallWithStatus[R]( - repositoryCall: IO[DatabaseError, Either[StatusException, R]], - operationName: String - ): IO[ServiceError, Either[StatusException, R]] = { + def repositoryCall[R](repositoryCall: IO[DatabaseError, R], operationName: String): IO[ServiceError, R] = { repositoryCall .mapError { case DatabaseError(message) => ServiceError(s"Failed to perform '$operationName': $message") diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala index 9d5bce9d1..a38811890 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointService.scala @@ -18,12 +18,11 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.fadb.exceptions.StatusException -import zio._ +import zio.IO import zio.macros.accessible @accessible trait CheckpointService { - def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Either[StatusException, Unit]] + def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala index dba47bcd7..aae123ea4 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/CheckpointServiceImpl.scala @@ -19,15 +19,14 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto.CheckpointDTO import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.CheckpointRepository -import za.co.absa.fadb.exceptions.StatusException import zio._ -class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) - extends CheckpointService with BaseService { +class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends CheckpointService with BaseService { - override def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Either[StatusException, Unit]] = { - repositoryCallWithStatus( - checkpointRepository.writeCheckpoint(checkpointDTO), "saveCheckpoint" + override def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] = { + repositoryCall( + checkpointRepository.writeCheckpoint(checkpointDTO), + "saveCheckpoint" ) } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala index 34781cbdf..72945d705 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowService.scala @@ -16,7 +16,7 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.{CheckpointQueryDTO, CheckpointDTO} +import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO} import za.co.absa.atum.server.api.exception.ServiceError import zio._ import zio.macros.accessible diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala index 93e333c67..5f788b86c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/FlowServiceImpl.scala @@ -22,19 +22,18 @@ import za.co.absa.atum.server.api.repository.FlowRepository import za.co.absa.atum.server.model.CheckpointFromDB import zio._ - -class FlowServiceImpl(flowRepository: FlowRepository) - extends FlowService with BaseService { +class FlowServiceImpl(flowRepository: FlowRepository) extends FlowService with BaseService { override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = { for { checkpointsFromDB <- repositoryCall( - flowRepository.getFlowCheckpoints(checkpointQueryDTO), "getFlowCheckpoints" + flowRepository.getFlowCheckpoints(checkpointQueryDTO), + "getFlowCheckpoints" ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { - checkpointFromDB => - ZIO.fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => ServiceError(error.getMessage)) + checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => + ZIO + .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) + .mapError(error => ServiceError(error.getMessage)) } } yield checkpointDTOs } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala index dd2e7c5c1..2f5d06a20 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningService.scala @@ -16,26 +16,28 @@ package za.co.absa.atum.server.api.service -import za.co.absa.atum.model.dto.{AdditionalDataDTO, AdditionalDataSubmitDTO, CheckpointDTO, CheckpointQueryDTO, MeasureDTO, PartitioningDTO, PartitioningSubmitDTO} +import za.co.absa.atum.model.dto.{ + AdditionalDataDTO, + AdditionalDataSubmitDTO, + CheckpointDTO, + CheckpointQueryDTO, + MeasureDTO, + PartitioningDTO, + PartitioningSubmitDTO +} import za.co.absa.atum.server.api.exception.ServiceError -import za.co.absa.fadb.exceptions.StatusException import zio.IO import zio.macros.accessible @accessible trait PartitioningService { - def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): - IO[ServiceError, Either[StatusException, Unit]] + def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ServiceError, Unit] - def getPartitioningMeasures(partitioning: PartitioningDTO): - IO[ServiceError, Seq[MeasureDTO]] + def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] - def getPartitioningAdditionalData(partitioning: PartitioningDTO): - IO[ServiceError, AdditionalDataDTO] + def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[ServiceError, AdditionalDataDTO] - def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): - IO[ServiceError, Either[StatusException, Unit]] + def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[ServiceError, Unit] - def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): - IO[ServiceError, Seq[CheckpointDTO]] + def getPartitioningCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] } diff --git a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala index 687e1d629..f764d0249 100644 --- a/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala +++ b/server/src/main/scala/za/co/absa/atum/server/api/service/PartitioningServiceImpl.scala @@ -19,54 +19,53 @@ package za.co.absa.atum.server.api.service import za.co.absa.atum.model.dto._ import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.fadb.exceptions.StatusException -import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.model.CheckpointFromDB import zio._ class PartitioningServiceImpl(partitioningRepository: PartitioningRepository) - extends PartitioningService with BaseService { + extends PartitioningService + with BaseService { - override def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): - IO[ServiceError, Either[StatusException, Unit]] = { - repositoryCallWithStatus( - partitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO), "createPartitioningIfNotExists" - ).mapError(error => ServiceError(error.message)) + override def createPartitioningIfNotExists(partitioningSubmitDTO: PartitioningSubmitDTO): IO[ServiceError, Unit] = { + repositoryCall( + partitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO), + "createPartitioningIfNotExists" + ) } - override def createOrUpdateAdditionalData( - additionalData: AdditionalDataSubmitDTO - ): IO[ServiceError, Either[StatusException, Unit]] = { - repositoryCallWithStatus( - partitioningRepository.createOrUpdateAdditionalData(additionalData), "createOrUpdateAdditionalData" - ).mapError(error => ServiceError(error.message)) + override def createOrUpdateAdditionalData(additionalData: AdditionalDataSubmitDTO): IO[ServiceError, Unit] = { + repositoryCall( + partitioningRepository.createOrUpdateAdditionalData(additionalData), + "createOrUpdateAdditionalData" + ) } override def getPartitioningMeasures(partitioning: PartitioningDTO): IO[ServiceError, Seq[MeasureDTO]] = { - partitioningRepository.getPartitioningMeasures(partitioning) - .mapError { case DatabaseError(message) => - ServiceError(s"Failed to retrieve partitioning measures': $message") - } + repositoryCall( + partitioningRepository.getPartitioningMeasures(partitioning), + "getPartitioningMeasures" + ) } override def getPartitioningAdditionalData(partitioning: PartitioningDTO): IO[ServiceError, AdditionalDataDTO] = { - partitioningRepository.getPartitioningAdditionalData(partitioning) - .mapError { case DatabaseError(message) => - ServiceError(s"Failed to retrieve partitioning additional data': $message") - } + repositoryCall( + partitioningRepository.getPartitioningAdditionalData(partitioning), + "getPartitioningAdditionalData" + ) } override def getPartitioningCheckpoints( - checkpointQueryDTO: CheckpointQueryDTO - ): IO[ServiceError, Seq[CheckpointDTO]] = { + checkpointQueryDTO: CheckpointQueryDTO + ): IO[ServiceError, Seq[CheckpointDTO]] = { for { checkpointsFromDB <- repositoryCall( - partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), "getPartitioningCheckpoints" + partitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO), + "getPartitioningCheckpoints" ) - checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { - checkpointFromDB => - ZIO.fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) - .mapError(error => ServiceError(error.getMessage)) + checkpointDTOs <- ZIO.foreach(checkpointsFromDB) { checkpointFromDB => + ZIO + .fromEither(CheckpointFromDB.toCheckpointDTO(checkpointQueryDTO.partitioning, checkpointFromDB)) + .mapError(error => ServiceError(error.getMessage)) } } yield checkpointDTOs diff --git a/server/src/main/scala/za/co/absa/atum/server/model/AdditionalDataFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/AdditionalDataFromDB.scala new file mode 100644 index 000000000..4816ab2bb --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/model/AdditionalDataFromDB.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +case class AdditionalDataFromDB( + adName: Option[String], + adValue: Option[String] +) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala index 705e6c319..0eba1e01c 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/CheckpointFromDB.scala @@ -23,15 +23,17 @@ import java.time.ZonedDateTime import java.util.UUID case class CheckpointFromDB( - idCheckpoint: UUID, - checkpointName: String, - author: String, - measuredByAtumAgent: Boolean = false, - measureName: String, - measuredColumns: Seq[String], - measurementValue: Json, // it's easier to convert this attribute to our `MeasurementDTO` after we received this as JSON from DB - checkpointStartTime: ZonedDateTime, - checkpointEndTime: Option[ZonedDateTime] + idCheckpoint: Option[UUID], + checkpointName: Option[String], + author: Option[String], + measuredByAtumAgent: Option[Boolean], + measureName: Option[String], + measuredColumns: Option[Seq[String]], + measurementValue: Option[ + Json + ], // it's easier to convert this attribute to our `MeasurementDTO` after we received this as JSON from DB + checkpointStartTime: Option[ZonedDateTime], + checkpointEndTime: Option[ZonedDateTime] ) object CheckpointFromDB { @@ -40,25 +42,25 @@ object CheckpointFromDB { partitioning: PartitioningDTO, checkpointQueryResult: CheckpointFromDB ): Either[DecodingFailure, CheckpointDTO] = { - val measureResultOrErr = checkpointQueryResult.measurementValue.as[MeasureResultDTO] + val measureResultOrErr = checkpointQueryResult.measurementValue.get.as[MeasureResultDTO] measureResultOrErr match { case Left(err) => Left(err) case Right(measureResult) => Right( CheckpointDTO( - id = checkpointQueryResult.idCheckpoint, - name = checkpointQueryResult.checkpointName, - author = checkpointQueryResult.author, - measuredByAtumAgent = checkpointQueryResult.measuredByAtumAgent, + id = checkpointQueryResult.idCheckpoint.get, + name = checkpointQueryResult.checkpointName.get, + author = checkpointQueryResult.author.get, + measuredByAtumAgent = checkpointQueryResult.measuredByAtumAgent.get, partitioning = partitioning, - processStartTime = checkpointQueryResult.checkpointStartTime, + processStartTime = checkpointQueryResult.checkpointStartTime.get, processEndTime = checkpointQueryResult.checkpointEndTime, measurements = Set( MeasurementDTO( measure = MeasureDTO( - measureName = checkpointQueryResult.measureName, - measuredColumns = checkpointQueryResult.measuredColumns + measureName = checkpointQueryResult.measureName.get, + measuredColumns = checkpointQueryResult.measuredColumns.get ), result = measureResult ) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala b/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala index bf17272b4..65d316931 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/ErrorResponse.scala @@ -16,7 +16,6 @@ package za.co.absa.atum.server.model - import io.circe._ import io.circe.generic.semiauto._ @@ -27,28 +26,25 @@ object ErrorResponse { implicit val encodeErrorResponse: Encoder[ErrorResponse] = deriveEncoder } - sealed trait ErrorResponse extends ResponseEnvelope { - def message: String - } +sealed trait ErrorResponse extends ResponseEnvelope { + def message: String +} - final case class BadRequestResponse(message: String, requestId: UUID = UUID.randomUUID()) - extends ErrorResponse +final case class BadRequestResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse object BadRequestResponse { implicit val decodeBadRequestResponse: Decoder[BadRequestResponse] = deriveDecoder implicit val encodeBadRequestResponse: Encoder[BadRequestResponse] = deriveEncoder } - final case class GeneralErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) - extends ErrorResponse +final case class GeneralErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse object GeneralErrorResponse { implicit val decodeGeneralErrorResponse: Decoder[GeneralErrorResponse] = deriveDecoder implicit val encodeGeneralErrorResponse: Encoder[GeneralErrorResponse] = deriveEncoder } - final case class InternalServerErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) - extends ErrorResponse +final case class InternalServerErrorResponse(message: String, requestId: UUID = UUID.randomUUID()) extends ErrorResponse object InternalServerErrorResponse { implicit val decodeInternalServerErrorResponse: Decoder[InternalServerErrorResponse] = deriveDecoder diff --git a/server/src/main/scala/za/co/absa/atum/server/model/MeasureFromDB.scala b/server/src/main/scala/za/co/absa/atum/server/model/MeasureFromDB.scala new file mode 100644 index 000000000..dd122c069 --- /dev/null +++ b/server/src/main/scala/za/co/absa/atum/server/model/MeasureFromDB.scala @@ -0,0 +1,22 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.model + +case class MeasureFromDB( + measureName: Option[String], + measuredColumns: Option[Seq[String]] +) diff --git a/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala b/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala index f3898cc66..05d27f2f7 100644 --- a/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala +++ b/server/src/main/scala/za/co/absa/atum/server/model/SuccessResponse.scala @@ -25,16 +25,14 @@ object SuccessResponse { sealed trait SuccessResponse extends ResponseEnvelope - case class SingleSuccessResponse[T](data: T, requestId: UUID = UUID.randomUUID()) - extends SuccessResponse + case class SingleSuccessResponse[T](data: T, requestId: UUID = UUID.randomUUID()) extends SuccessResponse object SingleSuccessResponse { implicit def encoder[T: Encoder]: Encoder[SingleSuccessResponse[T]] = deriveEncoder implicit def decoder[T: Decoder]: Decoder[SingleSuccessResponse[T]] = deriveDecoder } - case class MultiSuccessResponse[T](data: Seq[T], requestId: UUID = UUID.randomUUID()) - extends SuccessResponse + case class MultiSuccessResponse[T](data: Seq[T], requestId: UUID = UUID.randomUUID()) extends SuccessResponse object MultiSuccessResponse { implicit def encoder[T: Encoder]: Encoder[MultiSuccessResponse[T]] = deriveEncoder diff --git a/server/src/test/resources/logback-test.xml b/server/src/test/resources/logback-test.xml index 6e2afba1d..7b7cb3bb9 100644 --- a/server/src/test/resources/logback-test.xml +++ b/server/src/test/resources/logback-test.xml @@ -6,7 +6,7 @@ - + - + diff --git a/server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala b/server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala index bf57c9fda..eee346043 100644 --- a/server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala +++ b/server/src/test/scala/za/co/absa/atum/server/ConfigProviderTest.scala @@ -26,4 +26,3 @@ abstract class ConfigProviderTest extends ZIOSpec[Unit] { Runtime.setConfigProvider(TypesafeConfigProvider.fromResourcePath()) } - diff --git a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala index 19acbfdc9..b846e7973 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/TestData.scala @@ -24,6 +24,7 @@ import java.time.ZonedDateTime import java.util.UUID import MeasureResultDTO.TypedValue import za.co.absa.atum.model.ResultValueType +import za.co.absa.atum.server.model.MeasureFromDB trait TestData { @@ -58,6 +59,10 @@ trait TestData { protected val measureDTO1: MeasureDTO = MeasureDTO("count1", Seq("col_A1", "col_B1")) protected val measureDTO2: MeasureDTO = MeasureDTO("count2", Seq("col_A2", "col_B2")) + // Measure from DB + protected val measureFromDB1: MeasureFromDB = MeasureFromDB(Some("count1"), Some(Seq("col_A1", "col_B1"))) + protected val measureFromDB2: MeasureFromDB = MeasureFromDB(Some("count2"), Some(Seq("col_A2", "col_B2"))) + // Additional Data protected val additionalDataDTO1: AdditionalDataDTO = Map( "key1" -> Some("value1"), @@ -147,13 +152,13 @@ trait TestData { protected val checkpointQueryDTO1: CheckpointQueryDTO = CheckpointQueryDTO( partitioning = partitioningDTO1, limit = Option(2), - checkpointName = Option("checkpointName"), + checkpointName = Option("checkpointName") ) protected val checkpointQueryDTO2: CheckpointQueryDTO = CheckpointQueryDTO( partitioning = partitioningDTO2, limit = Option(5), - checkpointName = Option("noCheckpoints"), + checkpointName = Option("noCheckpoints") ) protected val checkpointQueryDTO3: CheckpointQueryDTO = CheckpointQueryDTO( @@ -189,15 +194,16 @@ trait TestData { // Checkpoint From DB protected val checkpointFromDB1: CheckpointFromDB = CheckpointFromDB( - idCheckpoint = checkpointDTO1.id, - checkpointName = checkpointQueryDTO1.checkpointName.get, - author = "author", - measuredByAtumAgent = true, - measureName = measureDTO1.measureName, - measuredColumns = measureDTO1.measuredColumns.toIndexedSeq, - measurementValue = parser - .parse( - """ + idCheckpoint = Some(checkpointDTO1.id), + checkpointName = checkpointQueryDTO1.checkpointName, + author = Some("author"), + measuredByAtumAgent = Some(true), + measureName = Some(measureDTO1.measureName), + measuredColumns = Some(measureDTO1.measuredColumns.toIndexedSeq), + measurementValue = Some( + parser + .parse( + """ |{ | "mainValue": { | "value": "123", @@ -215,28 +221,31 @@ trait TestData { | } |} |""".stripMargin - ) - .getOrElse { - throw new Exception("Failed to parse JSON") - }, - checkpointStartTime = checkpointDTO1.processStartTime, + ) + .getOrElse { + throw new Exception("Failed to parse JSON") + } + ), + checkpointStartTime = Some(checkpointDTO1.processStartTime), checkpointEndTime = checkpointDTO1.processEndTime ) protected val checkpointFromDB2: CheckpointFromDB = checkpointFromDB1 .copy( - idCheckpoint = checkpointDTO2.id, - checkpointName = checkpointQueryDTO2.checkpointName.get, - author = "author2", - measuredByAtumAgent = true, - measureName = measureDTO2.measureName, - measuredColumns = measureDTO2.measuredColumns.toIndexedSeq, - checkpointStartTime = checkpointDTO2.processStartTime, + idCheckpoint = Some(checkpointDTO2.id), + checkpointName = checkpointQueryDTO2.checkpointName, + author = Some("author2"), + measuredByAtumAgent = Some(true), + measureName = Some(measureDTO2.measureName), + measuredColumns = Some(measureDTO2.measuredColumns.toIndexedSeq), + checkpointStartTime = Some(checkpointDTO2.processStartTime), checkpointEndTime = checkpointDTO2.processEndTime - ) protected val checkpointFromDB3: CheckpointFromDB = checkpointFromDB1 - .copy(idCheckpoint = checkpointDTO3.id, checkpointStartTime = checkpointDTO3.processStartTime) + .copy( + idCheckpoint = Some(checkpointDTO3.id), + checkpointStartTime = Some(checkpointDTO3.processStartTime) + ) protected def createAtumContextDTO(partitioningSubmitDTO: PartitioningSubmitDTO): AtumContextDTO = { val measures: Set[MeasureDTO] = Set(MeasureDTO("count", Seq("*"))) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala index faa02bc01..7554742eb 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala @@ -20,9 +20,7 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.ServiceError import za.co.absa.atum.server.api.service.CheckpointService -import za.co.absa.atum.server.model.{GeneralErrorResponse, InternalServerErrorResponse} -import za.co.absa.fadb.exceptions.ErrorInDataException -import za.co.absa.fadb.status.FunctionStatus +import za.co.absa.atum.server.model.InternalServerErrorResponse import zio.test.Assertion.failsWithA import zio._ import zio.test._ @@ -31,9 +29,9 @@ object CheckpointControllerUnitTests extends ZIOSpecDefault with TestData { private val checkpointServiceMock = mock(classOf[CheckpointService]) - when(checkpointServiceMock.saveCheckpoint(checkpointDTO1)).thenReturn(ZIO.right(())) + when(checkpointServiceMock.saveCheckpoint(checkpointDTO1)).thenReturn(ZIO.succeed(())) when(checkpointServiceMock.saveCheckpoint(checkpointDTO2)) - .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in data")))) + .thenReturn(ZIO.fail(ServiceError("error in data"))) when(checkpointServiceMock.saveCheckpoint(checkpointDTO3)) .thenReturn(ZIO.fail(ServiceError("boom!"))) @@ -49,10 +47,14 @@ object CheckpointControllerUnitTests extends ZIOSpecDefault with TestData { } yield assertTrue(result == checkpointDTO1) }, test("Returns expected InternalServerErrorResponse") { - assertZIO(CheckpointController.createCheckpointV1(checkpointDTO3).exit)(failsWithA[InternalServerErrorResponse]) + assertZIO(CheckpointController.createCheckpointV1(checkpointDTO3).exit)( + failsWithA[InternalServerErrorResponse] + ) }, test("Returns expected GeneralErrorResponse") { - assertZIO(CheckpointController.createCheckpointV1(checkpointDTO2).exit)(failsWithA[GeneralErrorResponse]) + assertZIO(CheckpointController.createCheckpointV1(checkpointDTO2).exit)( + failsWithA[InternalServerErrorResponse] + ) } ) ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala index be3f586b7..4cf152128 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/FlowControllerUnitTests.scala @@ -43,13 +43,11 @@ object FlowControllerUnitTests extends ZIOSpecDefault with TestData { failsWithA[InternalServerErrorResponse] ) }, - test("Returns expected CheckpointDTO") { for { result <- FlowController.getFlowCheckpointsV2(checkpointQueryDTO2) - } yield assertTrue (result.data == Seq(checkpointDTO2)) + } yield assertTrue(result.data == Seq(checkpointDTO2)) } - ) ).provide( FlowControllerImpl.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala index 7de77eb40..5a78b5012 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/controller/PartitioningControllerUnitTests.scala @@ -31,7 +31,7 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { private val partitioningServiceMock = mock(classOf[PartitioningService]) when(partitioningServiceMock.createPartitioningIfNotExists(partitioningSubmitDTO1)) - .thenReturn(ZIO.right(())) + .thenReturn(ZIO.succeed(())) when(partitioningServiceMock.createPartitioningIfNotExists(partitioningSubmitDTO2)) .thenReturn(ZIO.fail(ServiceError("boom!"))) @@ -42,7 +42,7 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { .thenReturn(ZIO.succeed(Map.empty)) when(partitioningServiceMock.createOrUpdateAdditionalData(additionalDataSubmitDTO1)) - .thenReturn(ZIO.right(())) + .thenReturn(ZIO.succeed(())) when(partitioningServiceMock.createOrUpdateAdditionalData(additionalDataSubmitDTO2)) .thenReturn(ZIO.fail(ServiceError("boom!"))) @@ -83,7 +83,6 @@ object PartitioningControllerUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( test("Returns expected Seq[MeasureDTO]") { for { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala new file mode 100644 index 000000000..67ad6c05a --- /dev/null +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpointsIntegrationTests.scala @@ -0,0 +1,58 @@ +/* + * Copyright 2021 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.atum.server.api.database.flows.functions + +import za.co.absa.atum.server.ConfigProviderTest +import za.co.absa.atum.model.dto.{CheckpointQueryDTO, PartitionDTO, PartitioningDTO} +import za.co.absa.atum.server.api.TestTransactorProvider +import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.interop.catz.asyncInstance +import zio.{Scope, ZIO} +import zio.test._ + +object GetFlowCheckpointsIntegrationTests extends ConfigProviderTest { + + override def spec: Spec[TestEnvironment with Scope, Any] = { + + val partitioningDTO1: PartitioningDTO = Seq( + PartitionDTO("stringA", "stringA"), + PartitionDTO("stringB", "stringB") + ) + + suite("GetFlowCheckpointsIntegrationTests")( + test("Returns expected sequence of flow of Checkpoints with existing partitioning") { + val partitioningQueryDTO: CheckpointQueryDTO = CheckpointQueryDTO( + partitioning = partitioningDTO1, + limit = Some(10), + checkpointName = Some("checkpointName") + ) + + for { + getFlowCheckpoints <- ZIO.service[GetFlowCheckpoints] + result <- getFlowCheckpoints(partitioningQueryDTO) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) + } + ).provide( + GetFlowCheckpoints.layer, + PostgresDatabaseProvider.layer, + TestTransactorProvider.layerWithRollback + ) + } + +} diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala index a0a282109..94bb6564c 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreateOrUpdateAdditionalDataIntegrationTests.scala @@ -20,9 +20,10 @@ import za.co.absa.atum.model.dto.{AdditionalDataSubmitDTO, PartitionDTO} import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import za.co.absa.fadb.exceptions.DataNotFoundException -import za.co.absa.fadb.status.FunctionStatus +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus import zio._ +import zio.interop.catz.asyncInstance import zio.test._ object CreateOrUpdateAdditionalDataIntegrationTests extends ConfigProviderTest { @@ -33,7 +34,7 @@ object CreateOrUpdateAdditionalDataIntegrationTests extends ConfigProviderTest { test("Returns expected Right with Unit") { val additionalDataSubmitDTO = AdditionalDataSubmitDTO( partitioning = Seq(PartitionDTO("key1", "val1"), PartitionDTO("key2", "val2")), - additionalData = Map[String, Option[String]]( + additionalData = Map[String, Option[String]]( "ownership" -> Some("total"), "role" -> Some("primary") ), diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala index fa3f9dddb..32361e75f 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExistsIntegrationTests.scala @@ -21,6 +21,7 @@ import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider import zio._ +import zio.interop.catz.asyncInstance import zio.test._ object CreatePartitioningIfNotExistsIntegrationTests extends ConfigProviderTest { diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalDataIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalDataIntegrationTests.scala index 045f74c69..3bced5c34 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalDataIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalDataIntegrationTests.scala @@ -20,24 +20,27 @@ import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus import zio._ +import zio.interop.catz.asyncInstance import zio.test._ -import zio.test.Assertion._ object GetPartitioningAdditionalDataIntegrationTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { suite("GetPartitioningAdditionalDataSuite")( test("Returns expected sequence of Additional data with provided partitioning") { - val partitioningDTO: PartitioningDTO = Seq(PartitionDTO("stringA", "stringB"), PartitionDTO("string2", "string2")) + val partitioningDTO: PartitioningDTO = + Seq(PartitionDTO("stringA", "stringB"), PartitionDTO("string2", "string2")) for { getPartitioningAdditionalData <- ZIO.service[GetPartitioningAdditionalData] - exit <- getPartitioningAdditionalData(partitioningDTO).exit - } yield assert(exit)(failsWithA[doobie.util.invariant.NonNullableColumnRead]) + result <- getPartitioningAdditionalData(partitioningDTO) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( GetPartitioningAdditionalData.layer, PostgresDatabaseProvider.layer, - TestTransactorProvider.layerWithRollback, + TestTransactorProvider.layerWithRollback ) } } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala index 911581c9b..ab38e39ca 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningCheckpointsIntegrationTests.scala @@ -20,7 +20,9 @@ import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.model.dto.{CheckpointQueryDTO, PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import zio.test.Assertion.failsWithA +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus +import zio.interop.catz.asyncInstance import zio.{Scope, ZIO} import zio.test._ @@ -43,8 +45,8 @@ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest { for { getPartitioningCheckpoints <- ZIO.service[GetPartitioningCheckpoints] - exit <- getPartitioningCheckpoints(partitioningQueryDTO).exit - } yield assert(exit)(failsWithA[doobie.util.invariant.NonNullableColumnRead]) + result <- getPartitioningCheckpoints(partitioningQueryDTO) + } yield assertTrue(result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( GetPartitioningCheckpoints.layer, @@ -54,4 +56,3 @@ object GetPartitioningCheckpointsIntegrationTests extends ConfigProviderTest { } } - diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresIntegrationTests.scala index 35d4c4c8a..c43bfb634 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasuresIntegrationTests.scala @@ -20,8 +20,10 @@ import za.co.absa.atum.model.dto.{PartitionDTO, PartitioningDTO} import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import zio.test.Assertion.failsWithA -import zio.test.{Spec, TestEnvironment, assert} +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import zio.interop.catz.asyncInstance +import za.co.absa.db.fadb.status.FunctionStatus +import zio.test.{Spec, TestEnvironment, assertTrue} import zio.{Scope, ZIO} object GetPartitioningMeasuresIntegrationTests extends ConfigProviderTest { @@ -30,11 +32,12 @@ object GetPartitioningMeasuresIntegrationTests extends ConfigProviderTest { suite("GetPartitioningMeasuresSuite")( test("Returns expected sequence of Measures with existing partitioning") { - val partitioningDTO: PartitioningDTO = Seq(PartitionDTO("string1", "string1"), PartitionDTO("string2", "string2")) + val partitioningDTO: PartitioningDTO = + Seq(PartitionDTO("string11", "string11"), PartitionDTO("string12", "string12")) for { getPartitioningMeasures <- ZIO.service[GetPartitioningMeasures] - result <- getPartitioningMeasures(partitioningDTO).exit - } yield assert(result)(failsWithA[doobie.util.invariant.NonNullableColumnRead]) + result <- getPartitioningMeasures(partitioningDTO) + } yield assertTrue (result == Left(DataNotFoundException(FunctionStatus(41, "Partitioning not found")))) } ).provide( GetPartitioningMeasures.layer, diff --git a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala index f24b05dff..2f1b8ffe5 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/database/runs/functions/WriteCheckpointIntegrationTests.scala @@ -22,9 +22,10 @@ import za.co.absa.atum.model.dto.MeasureResultDTO.TypedValue import za.co.absa.atum.server.ConfigProviderTest import za.co.absa.atum.server.api.TestTransactorProvider import za.co.absa.atum.server.api.database.PostgresDatabaseProvider -import za.co.absa.fadb.exceptions.DataNotFoundException -import za.co.absa.fadb.status.FunctionStatus +import za.co.absa.db.fadb.exceptions.DataNotFoundException +import za.co.absa.db.fadb.status.FunctionStatus import zio._ +import zio.interop.catz.asyncInstance import zio.test._ import java.time.ZonedDateTime @@ -44,8 +45,9 @@ object WriteCheckpointIntegrationTests extends ConfigProviderTest { partitioning = Seq(PartitionDTO("key4", "value4")), processStartTime = ZonedDateTime.now(), processEndTime = Option(ZonedDateTime.now()), - measurements = - Set(MeasurementDTO(MeasureDTO("count", Seq("*")), MeasureResultDTO(TypedValue("1", ResultValueType.LongValue)))) + measurements = Set( + MeasurementDTO(MeasureDTO("count", Seq("*")), MeasureResultDTO(TypedValue("1", ResultValueType.LongValue))) + ) ) for { writeCheckpoint <- ZIO.service[WriteCheckpoint] diff --git a/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala index 37d54e8bc..e8798974b 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/http/BaseEndpointsUnitTests.scala @@ -32,54 +32,54 @@ class BaseEndpointsUnitTests extends AnyFlatSpec { "pathToAPIv1CompatibleFormat" should "successfully convert our standard API path format to format compatible with API V1 (kebab)" in { - val input = "create-checkpoint" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheckpoint" - assert(actual == expected) - } + val input = "create-checkpoint" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckpoint" + assert(actual == expected) + } "pathToAPIv1CompatibleFormat" should "successfully convert our standard API path format to format compatible with API V1 (kebab2)" in { - val input = "create-check-point2" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheckPoint2" - assert(actual == expected) - } + val input = "create-check-point2" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckPoint2" + assert(actual == expected) + } "pathToAPIv1CompatibleFormat" should "successfully convert our standard API path format to format compatible with API V1 (kebab3)" in { - val input = "Create-check-" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheck" - assert(actual == expected) - } + val input = "Create-check-" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheck" + assert(actual == expected) + } "pathToAPIv1CompatibleFormat" should "successfully convert our standard API path format to format compatible with API V1 (snake)" in { - val input = "_create_check_point" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "CreateCheckPoint" - assert(actual == expected) - } + val input = "_create_check_point" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "CreateCheckPoint" + assert(actual == expected) + } "pathToAPIv1CompatibleFormat" should "successfully convert our standard API path format to format compatible with API V1 (kebab and snake)" in { - val input = "Create-check_Point" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createCheckPoint" - assert(actual == expected) - } + val input = "Create-check_Point" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createCheckPoint" + assert(actual == expected) + } "pathToAPIv1CompatibleFormat" should "successfully convert our standard API path format to format compatible with API V1 (one word)" in { - val input = "createcheckpoint" - val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) - val expected = "createcheckpoint" - assert(actual == expected) - } + val input = "createcheckpoint" + val actual = BaseEndpointsForTests.pathToAPIv1CompatibleFormat(input) + val expected = "createcheckpoint" + assert(actual == expected) + } } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala index fb4097e5f..e7af6ecf1 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/CheckpointRepositoryUnitTests.scala @@ -20,21 +20,23 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.database.runs.functions.WriteCheckpoint import za.co.absa.atum.server.api.exception.DatabaseError import za.co.absa.atum.server.api.TestData -import za.co.absa.fadb.exceptions.ErrorInDataException -import za.co.absa.fadb.status.FunctionStatus +import za.co.absa.db.fadb.exceptions.ErrorInDataException +import za.co.absa.db.fadb.status.FunctionStatus import zio._ -import zio.test.Assertion.failsWithA +import zio.interop.catz.asyncInstance +import zio.test.Assertion.{failsWithA, isUnit} import zio.test._ +import za.co.absa.db.fadb.status.Row + object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { private val writeCheckpointMock: WriteCheckpoint = mock(classOf[WriteCheckpoint]) - when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(())) + when(writeCheckpointMock.apply(checkpointDTO1)).thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(writeCheckpointMock.apply(checkpointDTO2)) - .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in data")))) - when(writeCheckpointMock.apply(checkpointDTO3)) - .thenReturn(ZIO.fail(new Exception("boom!"))) + .thenReturn(ZIO.fail(DatabaseError("Operation 'writeCheckpoint' failed with unexpected error: null"))) + when(writeCheckpointMock.apply(checkpointDTO3)).thenReturn(ZIO.fail(new Exception("boom!"))) private val writeCheckpointMockLayer = ZLayer.succeed(writeCheckpointMock) @@ -45,12 +47,14 @@ object CheckpointRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Unit") { for { result <- CheckpointRepository.writeCheckpoint(checkpointDTO1) - } yield assertTrue(result.isRight) + } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { for { - result <- CheckpointRepository.writeCheckpoint(checkpointDTO2) - } yield assertTrue(result.isLeft) + result <- CheckpointRepository.writeCheckpoint(checkpointDTO2).exit + } yield assertTrue( + result == Exit.fail(DatabaseError("Operation 'writeCheckpoint' failed with unexpected error: null")) + ) }, test("Returns expected DatabaseError") { assertZIO(CheckpointRepository.writeCheckpoint(checkpointDTO3).exit)(failsWithA[DatabaseError]) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala index a0baa3be7..7f34bab93 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/FlowRepositoryUnitTests.scala @@ -21,15 +21,22 @@ import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.flows.functions.GetFlowCheckpoints import za.co.absa.atum.server.api.exception.DatabaseError import zio._ +import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ +import za.co.absa.db.fadb.status.{FunctionStatus, Row} object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getFlowCheckpointsMock = mock(classOf[GetFlowCheckpoints]) when(getFlowCheckpointsMock.apply(checkpointQueryDTO1)).thenReturn(ZIO.fail(new Exception("boom!"))) - when(getFlowCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.succeed(Seq(checkpointFromDB1, checkpointFromDB2))) + when(getFlowCheckpointsMock.apply(checkpointQueryDTO2)) + .thenReturn( + ZIO.right( + Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1), Row(FunctionStatus(0, "success"), checkpointFromDB2)) + ) + ) private val getFlowCheckpointsMockLayer = ZLayer.succeed(getFlowCheckpointsMock) @@ -46,11 +53,11 @@ object FlowRepositoryUnitTests extends ZIOSpecDefault with TestData { for { result <- FlowRepository.getFlowCheckpoints(checkpointQueryDTO2) } yield assertTrue(result == Seq(checkpointFromDB1, checkpointFromDB2)) - }, - ), + } + ) ).provide( FlowRepositoryImpl.layer, - getFlowCheckpointsMockLayer, + getFlowCheckpointsMockLayer ) } diff --git a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala index aad01953a..1c054613f 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/repository/PartitioningRepositoryUnitTests.scala @@ -20,19 +20,21 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.database.runs.functions._ import za.co.absa.atum.server.api.exception.DatabaseError -import za.co.absa.atum.server.model.CheckpointFromDB -import za.co.absa.fadb.exceptions.ErrorInDataException -import za.co.absa.fadb.status.FunctionStatus +import za.co.absa.db.fadb.exceptions.ErrorInDataException +import za.co.absa.db.fadb.status.{FunctionStatus, Row} import zio._ +import zio.interop.catz.asyncInstance import zio.test.Assertion.failsWithA import zio.test._ +import za.co.absa.atum.server.model.AdditionalDataFromDB object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { // Create Partitioning Mocks private val createPartitioningIfNotExistsMock = mock(classOf[CreatePartitioningIfNotExists]) - when(createPartitioningIfNotExistsMock.apply(partitioningSubmitDTO1)).thenReturn(ZIO.right(())) + when(createPartitioningIfNotExistsMock.apply(partitioningSubmitDTO1)) + .thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(createPartitioningIfNotExistsMock.apply(partitioningSubmitDTO2)) .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in Partitioning data")))) when(createPartitioningIfNotExistsMock.apply(partitioningSubmitDTO3)) @@ -43,7 +45,8 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { // Create Additional Data Mocks private val createOrUpdateAdditionalDataMock = mock(classOf[CreateOrUpdateAdditionalData]) - when(createOrUpdateAdditionalDataMock.apply(additionalDataSubmitDTO1)).thenReturn(ZIO.right(())) + when(createOrUpdateAdditionalDataMock.apply(additionalDataSubmitDTO1)) + .thenReturn(ZIO.right(Row(FunctionStatus(0, "success"), ()))) when(createOrUpdateAdditionalDataMock.apply(additionalDataSubmitDTO2)) .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in AD data")))) when(createOrUpdateAdditionalDataMock.apply(additionalDataSubmitDTO3)) @@ -54,7 +57,12 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { // Get Partitioning Measures Mocks private val getPartitioningMeasuresMock = mock(classOf[GetPartitioningMeasures]) - when(getPartitioningMeasuresMock.apply(partitioningDTO1)).thenReturn(ZIO.succeed(Seq(measureDTO1, measureDTO2))) + when(getPartitioningMeasuresMock.apply(partitioningDTO1)) + .thenReturn( + ZIO.right( + Seq(Row(FunctionStatus(0, "success"), measureFromDB1), Row(FunctionStatus(0, "success"), measureFromDB2)) + ) + ) when(getPartitioningMeasuresMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!"))) private val getPartitioningMeasuresMockLayer = ZLayer.succeed(getPartitioningMeasuresMock) @@ -63,7 +71,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { private val getPartitioningAdditionalDataMock = mock(classOf[GetPartitioningAdditionalData]) when(getPartitioningAdditionalDataMock.apply(partitioningDTO1)) - .thenReturn(ZIO.succeed(additionalDataDTOSeq1)) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), AdditionalDataFromDB(Some("key"), Some("value")))))) when(getPartitioningAdditionalDataMock.apply(partitioningDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!"))) private val getPartitioningAdditionalDataMockLayer = ZLayer.succeed(getPartitioningAdditionalDataMock) @@ -71,9 +79,10 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { // Get Partitioning Checkpoints Mocks private val getPartitioningCheckpointsMock = mock(classOf[GetPartitioningCheckpoints]) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)).thenReturn(ZIO.succeed(Seq(checkpointFromDB1))) + when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO1)) + .thenReturn(ZIO.right(Seq(Row(FunctionStatus(0, "success"), checkpointFromDB1)))) + when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.right(Seq.empty)) when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO2)).thenReturn(ZIO.fail(DatabaseError("boom!"))) - when(getPartitioningCheckpointsMock.apply(checkpointQueryDTO3)).thenReturn(ZIO.succeed(Seq.empty)) private val getPartitioningCheckpointsMockLayer = ZLayer.succeed(getPartitioningCheckpointsMock) @@ -84,12 +93,18 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Unit") { for { result <- PartitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO1) - } yield assertTrue(result.isRight) + } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { for { - result <- PartitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO2) - } yield assertTrue(result.isLeft) + result <- PartitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO2).exit + } yield assertTrue( + result == Exit.fail( + DatabaseError( + "Exception caused by operation: 'createPartitioningIfNotExists': (50) error in Partitioning data" + ) + ) + ) }, test("Returns expected DatabaseError") { assertZIO(PartitioningRepository.createPartitioningIfNotExists(partitioningSubmitDTO3).exit)( @@ -97,17 +112,20 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("CreateOrUpdateAdditionalDataSuite")( test("Returns expected Right with Unit") { for { result <- PartitioningRepository.createOrUpdateAdditionalData(additionalDataSubmitDTO1) - } yield assertTrue(result.isRight) + } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { for { - result <- PartitioningRepository.createOrUpdateAdditionalData(additionalDataSubmitDTO2) - } yield assertTrue(result.isLeft) + result <- PartitioningRepository.createOrUpdateAdditionalData(additionalDataSubmitDTO2).exit + } yield assertTrue( + result == Exit.fail( + DatabaseError("Exception caused by operation: 'createOrUpdateAdditionalData': (50) error in AD data") + ) + ) }, test("Returns expected DatabaseError") { assertZIO(PartitioningRepository.createOrUpdateAdditionalData(additionalDataSubmitDTO3).exit)( @@ -115,7 +133,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningMeasuresSuite")( test("Returns expected Seq") { for { @@ -128,12 +145,11 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningAdditionalDataSuite")( - test("Returns expected Right with empty Map") { + test("Returns expected Right with Map") { for { result <- PartitioningRepository.getPartitioningAdditionalData(partitioningDTO1) - } yield assertTrue(result == additionalDataDTO1) + } yield assertTrue(result.get("key").contains(Some("value")) && result.size == 1) }, test("Returns expected Left with DatabaseError") { assertZIO(PartitioningRepository.getPartitioningAdditionalData(partitioningDTO2).exit)( @@ -141,7 +157,6 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { ) } ), - suite("GetPartitioningCheckpointsSuite")( test("Returns expected Seq") { for { @@ -156,7 +171,7 @@ object PartitioningRepositoryUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Seq.empty") { for { result <- PartitioningRepository.getPartitioningCheckpoints(checkpointQueryDTO3) - } yield assertTrue(result.isInstanceOf[Seq[CheckpointFromDB]] && result.isEmpty) + } yield assertTrue(result.isEmpty) } ) ).provide( diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala index c68a2ede7..f73c21c8c 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/CheckpointServiceUnitTests.scala @@ -20,8 +20,6 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} import za.co.absa.atum.server.api.repository.CheckpointRepository -import za.co.absa.fadb.exceptions.ErrorInDataException -import za.co.absa.fadb.status.FunctionStatus import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -30,11 +28,9 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { private val checkpointRepositoryMock = mock(classOf[CheckpointRepository]) - when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO1)).thenReturn(ZIO.right(())) - when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO2)) - .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in data")))) - when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO3)) - .thenReturn(ZIO.fail(DatabaseError("boom!"))) + when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO1)).thenReturn(ZIO.succeed(())) + when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO2)).thenReturn(ZIO.fail(DatabaseError("error in data"))) + when(checkpointRepositoryMock.writeCheckpoint(checkpointDTO3)).thenReturn(ZIO.fail(DatabaseError("boom!"))) private val checkpointRepositoryMockLayer = ZLayer.succeed(checkpointRepositoryMock) @@ -45,12 +41,12 @@ object CheckpointServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Unit") { for { result <- CheckpointService.saveCheckpoint(checkpointDTO1) - } yield assertTrue(result.isRight) + } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { for { - result <- CheckpointService.saveCheckpoint(checkpointDTO2) - } yield assertTrue(result.isLeft) + result <- CheckpointService.saveCheckpoint(checkpointDTO2).exit + } yield assertTrue(result == Exit.fail(ServiceError("Failed to perform 'saveCheckpoint': error in data"))) }, test("Returns expected ServiceError") { assertZIO(CheckpointService.saveCheckpoint(checkpointDTO3).exit)(failsWithA[ServiceError]) diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala index 8b70529ae..da1bfba87 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/FlowServiceUnitTests.scala @@ -45,12 +45,11 @@ object FlowServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Seq[CheckpointDTO]") { for { result <- FlowService.getFlowCheckpoints(checkpointQueryDTO2) - } yield assertTrue{ + } yield assertTrue { result == Seq(checkpointDTO2) } - }, - - ), + } + ) ).provide( FlowServiceImpl.layer, flowRepositoryMockLayer diff --git a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala index 1308faaa6..34da663c0 100644 --- a/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/api/service/PartitioningServiceUnitTests.scala @@ -20,8 +20,6 @@ import org.mockito.Mockito.{mock, when} import za.co.absa.atum.server.api.TestData import za.co.absa.atum.server.api.exception.{DatabaseError, ServiceError} import za.co.absa.atum.server.api.repository.PartitioningRepository -import za.co.absa.fadb.exceptions.ErrorInDataException -import za.co.absa.fadb.status.FunctionStatus import zio.test.Assertion.failsWithA import zio.test._ import zio._ @@ -30,15 +28,15 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { private val partitioningRepositoryMock = mock(classOf[PartitioningRepository]) - when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO1)).thenReturn(ZIO.right(())) + when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO1)).thenReturn(ZIO.succeed(())) when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO2)) - .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in data")))) + .thenReturn(ZIO.fail(DatabaseError("error in data"))) when(partitioningRepositoryMock.createPartitioningIfNotExists(partitioningSubmitDTO3)) .thenReturn(ZIO.fail(DatabaseError("boom!"))) - when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO1)).thenReturn(ZIO.right(())) + when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO1)).thenReturn(ZIO.succeed(())) when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO2)) - .thenReturn(ZIO.left(ErrorInDataException(FunctionStatus(50, "error in AD data")))) + .thenReturn(ZIO.fail(DatabaseError("error in AD data"))) when(partitioningRepositoryMock.createOrUpdateAdditionalData(additionalDataSubmitDTO3)) .thenReturn(ZIO.fail(DatabaseError("boom!"))) @@ -66,12 +64,14 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Unit") { for { result <- PartitioningService.createPartitioningIfNotExists(partitioningSubmitDTO1) - } yield assertTrue(result.isRight) + } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { for { - result <- PartitioningService.createPartitioningIfNotExists(partitioningSubmitDTO2) - } yield assertTrue(result.isLeft) + result <- PartitioningService.createPartitioningIfNotExists(partitioningSubmitDTO2).exit + } yield assertTrue( + result == Exit.fail(ServiceError("Failed to perform 'createPartitioningIfNotExists': error in data")) + ) }, test("Returns expected ServiceError") { assertZIO(PartitioningService.createPartitioningIfNotExists(partitioningSubmitDTO3).exit)( @@ -83,12 +83,14 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Unit") { for { result <- PartitioningService.createOrUpdateAdditionalData(additionalDataSubmitDTO1) - } yield assertTrue(result.isRight) + } yield assertTrue(result == ()) }, test("Returns expected Left with StatusException") { for { - result <- PartitioningService.createOrUpdateAdditionalData(additionalDataSubmitDTO2) - } yield assertTrue(result.isLeft) + result <- PartitioningService.createOrUpdateAdditionalData(additionalDataSubmitDTO2).exit + } yield assertTrue( + result == Exit.fail(ServiceError("Failed to perform 'createOrUpdateAdditionalData': error in AD data")) + ) }, test("Returns expected ServiceError") { assertZIO(PartitioningService.createOrUpdateAdditionalData(additionalDataSubmitDTO3).exit)( @@ -100,7 +102,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Seq[MeasureDTO]") { for { result <- PartitioningService.getPartitioningMeasures(partitioningDTO1) - } yield assertTrue{ + } yield assertTrue { result == Seq(measureDTO1, measureDTO2) } }, @@ -114,7 +116,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Seq[AdditionalDataDTO]") { for { result <- PartitioningService.getPartitioningAdditionalData(partitioningDTO1) - } yield assertTrue{result == additionalDataDTO1} + } yield assertTrue { result == additionalDataDTO1 } }, test("Returns expected ServiceError") { assertZIO(PartitioningService.getPartitioningAdditionalData(partitioningDTO2).exit)( @@ -126,7 +128,7 @@ object PartitioningServiceUnitTests extends ZIOSpecDefault with TestData { test("Returns expected Right with Seq[CheckpointDTO]") { for { result <- PartitioningService.getPartitioningCheckpoints(checkpointQueryDTO1) - } yield assertTrue{ + } yield assertTrue { result == Seq(checkpointDTO1, checkpointDTO2.copy(partitioning = checkpointDTO1.partitioning)) } }, diff --git a/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala b/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala index b53fdc2ef..49547e921 100644 --- a/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala +++ b/server/src/test/scala/za/co/absa/atum/server/aws/AwsSecretsProviderUnitTests.scala @@ -39,7 +39,7 @@ object AwsSecretsProviderUnitTests extends ConfigProviderTest { override def spec: Spec[TestEnvironment with Scope, Any] = { suite("AwsSecretsProviderSuite")( - test("GetSecretValue returns expected secret's value"){ + test("GetSecretValue returns expected secret's value") { for { awsConfig <- ZIO.config[AwsConfig](AwsConfig.config) awsSecretValue <- AwsSecretsProvider.getSecretValue(awsConfig.dbPasswordSecretName)