Skip to content

Commit

Permalink
#188: implementing decoding of SupportValues as well, in measurements…
Browse files Browse the repository at this point in the history
… (will need refactoring)
  • Loading branch information
lsulak committed May 23, 2024
1 parent a8c8ead commit 95b4ba6
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import cats.Show
import cats.data.NonEmptyList
import doobie.postgres.implicits._
import doobie.{Get, Put}
import io.circe.Decoder
import io.circe.{Decoder, Encoder}
import org.postgresql.jdbc.PgArray
import org.postgresql.util.PGobject
import za.co.absa.atum.model.dto.MeasureResultDTO
Expand Down Expand Up @@ -155,7 +155,14 @@ object DoobieImplicits {
}
)
}
}

implicit val encodeResultValueType: Encoder[MeasureResultDTO.ResultValueType] = Encoder.encodeString.contramap {
case MeasureResultDTO.ResultValueType.String => "String"
case MeasureResultDTO.ResultValueType.Long => "Long"
case MeasureResultDTO.ResultValueType.BigDecimal => "BigDecimal"
case MeasureResultDTO.ResultValueType.Double => "Double"
case _ => "QWEQWEQWE"
}

implicit val decodeResultValueType: Decoder[MeasureResultDTO.ResultValueType] = Decoder.decodeString.emap {
Expand All @@ -166,11 +173,12 @@ object DoobieImplicits {
case other => Left(s"Cannot decode $other as ResultValueType")
}

implicit val encodeTypedValue: Encoder[MeasureResultDTO.TypedValue] =
Encoder.forProduct2("value", "valueType")(tv => (tv.value, tv.valueType))

implicit val decodeTypedValue: Decoder[MeasureResultDTO.TypedValue] =
Decoder.forProduct2("value", "valueType")(MeasureResultDTO.TypedValue.apply)

implicit val decodeMeasureResultDTO: Decoder[MeasureResultDTO] =
Decoder.forProduct2("mainValue", "supportValues")(MeasureResultDTO.apply)


}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,56 @@

package za.co.absa.atum.server.api.service

import za.co.absa.atum.model.dto.{CheckpointDTO, CheckpointQueryDTO, MeasurementDTO, MeasureDTO, MeasureResultDTO}
import za.co.absa.atum.model.dto._
import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.repository.FlowRepository
import zio._
import io.circe._
import io.circe.parser._
import io.circe.generic.auto._
import io.circe.syntax._
import za.co.absa.atum.server.api.database.DoobieImplicits.encodeTypedValue
import za.co.absa.atum.server.api.database.DoobieImplicits.decodeTypedValue
import za.co.absa.atum.server.api.database.DoobieImplicits.encodeResultValueType
import za.co.absa.atum.server.api.database.DoobieImplicits.decodeResultValueType

class FlowServiceImpl(flowRepository: FlowRepository)
extends FlowService with BaseService {

def extractMainValueFromMeasurementValue(json: Json): Either[Error, MeasureResultDTO.TypedValue] = {
json.as[MeasureResultDTO].map(_.mainValue)
}

def extractSupportValuesFromMeasurementValue(json: Json): Either[Error, Map[String, MeasureResultDTO.TypedValue]] = {
json.as[MeasureResultDTO].map(_.supportValues)
}

def parseCheckpointQueryResultDTO(measurementValue: Json): Either[Error, MeasureResultDTO] = {
for {
mainValue <- extractMainValueFromMeasurementValue(measurementValue)
supportValues <- extractSupportValuesFromMeasurementValue(measurementValue)
} yield MeasureResultDTO(mainValue, supportValues)
}

def extractSupportValuesFromMeasurementValue2(json: Json): Either[Error, Map[String, MeasureResultDTO.TypedValue]] = {
json.as[MeasureResultDTO].map(_.supportValues)
}

def parseCheckpointQueryResultDTO2(jsonString: String): Either[Error, Map[String, MeasureResultDTO.TypedValue]] = {
for {
parsedJson <- parse(jsonString)
checkpoint <- parsedJson.as[CheckpointQueryResultDTO]
supportValues <- extractSupportValuesFromMeasurementValue2(checkpoint.measurementValue)
} yield supportValues
}

override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[ServiceError, Seq[CheckpointDTO]] = {
repositoryCall(
flowRepository.getFlowCheckpoints(checkpointQueryDTO), "getFlowCheckpoints"
).map({
checkpointMeasurementsSeq =>
checkpointMeasurementsSeq.map { cm =>

CheckpointDTO(
id = cm.idCheckpoint,
name = cm.checkpointName,
Expand All @@ -44,19 +80,13 @@ class FlowServiceImpl(flowRepository: FlowRepository)
measureName = cm.measureName,
measuredColumns = cm.measuredColumns
),
result = MeasureResultDTO(
mainValue = MeasureResultDTO.TypedValue(
value = cm.measurementValue.hcursor.downField("value").as[String].getOrElse(""),
valueType = cm.measurementValue.hcursor.downField("valueType").as[String].getOrElse("") match {
case "String" => MeasureResultDTO.ResultValueType.String
case "Long" => MeasureResultDTO.ResultValueType.Long
case "BigDecimal" => MeasureResultDTO.ResultValueType.BigDecimal
case "Double" => MeasureResultDTO.ResultValueType.Double
case _ => MeasureResultDTO.ResultValueType.String
}
),
supportValues = Map.empty
)
result = parseCheckpointQueryResultDTO(cm.measurementValue)
.getOrElse( // todo no error silencing!
MeasureResultDTO(
mainValue = MeasureResultDTO.TypedValue("", MeasureResultDTO.ResultValueType.String),
supportValues = Map.empty
)
)
)
)
)
Expand Down

0 comments on commit 95b4ba6

Please sign in to comment.