-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Doobie multiple results with aggregated status: 219 #220
Doobie multiple results with aggregated status: 219 #220
Conversation
…ithAggStatus # Conflicts: # project/Dependencies.scala # server/src/test/scala/za/co/absa/atum/server/api/controller/CheckpointControllerUnitTests.scala
Release notes:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serviceCallWithStatus not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will remove the method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Most of these instances were placed in fa-db. Therefore, there is no need to keep them in Atum anymore.
import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get | ||
|
||
import doobie.postgres.implicits._ | ||
import doobie.postgres.circe.jsonb.implicits.jsonbPut | ||
import doobie.postgres.circe.json.implicits.jsonGet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you read jsonb, not json ... better to use jsonbGet
import doobie.postgres.implicits._ | ||
import doobie.postgres.circe.jsonb.implicits.jsonbPut | ||
import doobie.postgres.circe.json.implicits.jsonGet | ||
import io.circe.syntax.EncoderOps | ||
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator | ||
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling | ||
|
||
class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tests for this class are missing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use the implicits from fa-db
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure formatting is applied before pushing to Github.
DatabaseError(s"Operation '$operationName' failed with unexpected error: ${error.getMessage}") | ||
} | ||
|
||
private def dbCall[R](dbFuncCall: Task[R], operationName: String): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private def dbCall[R](dbFuncCall: Task[R], operationName: String): IO[DatabaseError, R] = {
dbFuncCall
.mapError(error => DatabaseError(error.getMessage))
.tapBoth(
error => ZIO.logError(s"Operation '$operationName' failed: ${error.getMessage}"),
_ => ZIO.logDebug(s"Operation '$operationName' succeeded in database")
)
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dbSingleResultCall, dbMultipleResultCall not used
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed
// Seq[FailedOrRow[R]] ~ Seq[Either[StatusException, Row[R]]] - dbMultipleResultCallWithStatus => IO[DatabaseError, Seq[R]] | ||
// FailedOrRows[R] ~ Either[StatusException, Seq[Row[R]]] - dbMultipleResultCallWithAggregatedStatus => IO[DatabaseError, Seq[R]] | ||
|
||
sealed trait PaginatedResult[R] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are these classes defined in this file?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed along with the pagination function
import zio._ | ||
|
||
// R - dbSingleResultCall[R] => IO[DatabaseError, R] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these comments still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope.
} | ||
} | ||
|
||
private def defaultErrorHandler[R](operationName: String): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type parameter is not needed
dbFuncCall: Task[R], | ||
operationName: String | ||
): IO[DatabaseError, R] = { | ||
private def logOperationResult[R](operationName: String, dbFuncCall: Task[R]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure of this. You apply partial function in the tap call assuming that the result will be Either[StatusException, R]. But the signature doesn't guarantee that, it's unsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, will fix that
.flatMap { | ||
case Left(statusException) => ZIO.fail(statusException) | ||
case Right(value) => ZIO.succeed( | ||
if (value.nonEmpty && value.head.functionStatus.statusCode == 11){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure this particular status code we want to use for "has more" cases? I think other codes were discussed in out Teams chat. Check this with others ....
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This operation is removed as it is not part of this PR. But I will still go back to the discussion, and also happy to confirm with others. Thank you.
@@ -24,5 +24,6 @@ import zio.macros.accessible | |||
|
|||
@accessible | |||
trait FlowRepository { | |||
def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] | |||
def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why can't this be on a single line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): IO[DatabaseError, Seq[CheckpointFromDB]] = { | ||
dbCall(getFlowCheckpointsFn(checkpointQueryDTO), "getFlowCheckpoints") | ||
override def getFlowCheckpoints(checkpointQueryDTO: CheckpointQueryDTO): | ||
IO[DatabaseError, Seq[CheckpointFromDB]] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
formatting is off
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
@@ -28,4 +27,7 @@ package object dto { | |||
implicit val decodeAdditionalDataDTO: Decoder[AdditionalDataDTO] = Decoder.decodeMap[String, Option[String]] | |||
implicit val encodeAdditionalDataDTO: Encoder[AdditionalDataDTO] = Encoder.encodeMap[String, Option[String]] | |||
|
|||
// Implicit encoders and decoders for PartitioningDTO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's not the best practice to define such implicits in package object as by doing that you implicitly import them into every possible scope in the entire package which might leaad to unintended implicit conversions or resolutions. These types should be changed. AdditionalDataDTO will undergo some changes when working on endpoints related to AdditionalData. PartitioningDTO could be modeled as case class and json serde defined in its companion or kept as is but the serde defined elsewhere.
for { | ||
getFlowCheckpoints <- ZIO.service[GetFlowCheckpoints] | ||
exit <- getFlowCheckpoints(partitioningQueryDTO).exit | ||
} yield assert(exit)(failsWithA[doobie.util.invariant.NonNullableColumnRead]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The database call should never end up in NonNullableColumnRead exception. That signals nothing but incorrect implementation of the fa-db class. As of now (fa-db 0.5.0) all columns that are potentially returned with NULL value have to be modeled as Option.
import io.circe.syntax._ | ||
|
||
import doobie.postgres.implicits._ | ||
import doobie.postgres.circe.jsonb.implicits.jsonbPut | ||
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep. Let's use implicits from fa-db. Btw since the doobie-postgres-circe has been made a dependency of doobie's module in fa-db there is no need to explicitly build the project with that dependency.
In other words the below dependency is not needed anymore as it's brought in transitively by the fa-db's doobie module. Pls remove it.
lazy val pgCirceDoobie = "org.tpolecat" %% "doobie-postgres-circe" % "1.0.0-RC2"
import za.co.absa.fadb.doobie.DoobieFunction.DoobieMultipleResultFunction | ||
import za.co.absa.db.fadb.DBSchema | ||
import za.co.absa.db.fadb.doobie.DoobieEngine | ||
import za.co.absa.db.fadb.doobie.DoobieFunction.DoobieMultipleResultFunctionWithAggStatus | ||
import za.co.absa.atum.server.api.database.PostgresDatabaseProvider | ||
import za.co.absa.atum.server.api.database.runs.Runs | ||
import zio._ | ||
import zio.interop.catz._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this one is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed that
) | ||
case Right(_) => ZIO.logDebug(s"Operation '$operationName' succeeded in database") | ||
case _ => ZIO.logError(s"Operation '$operationName' did not return an Either") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It does not make much sense to me. You should require the effects on its input to be Task[FailedOrRow[R]] or Task[FailedOrRows[R]] (another method).
repositoryCall: IO[DatabaseError, Either[StatusException, R]], | ||
operationName: String | ||
): IO[ServiceError, Either[StatusException, R]] = { | ||
def repositoryCallWithStatus[R](repositoryCall: IO[DatabaseError, Either[StatusException, R]], operationName: String |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not used since we decided to handle StatusException(s) on repository level. Now the question is whether we will always propagate StatusExceptions to upper layers (Controller) in cases when we really need to (for instance when we will need to decide on resulting status code of given REST call based on the status of the database call) or we will create a hierarchy of ServiceError(s) indicating what happened on the repository level and propagate that information this way using ZIO's error channel.
import io.circe._ | ||
|
||
package object dto { | ||
type PartitioningDTO = Seq[PartitionDTO] | ||
type AdditionalDataDTO = Map[String, Option[String]] | ||
|
||
// Todo. This implicit definition should not be defined here, so it is to be addressed in PR#221 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the PRs actually can have different numbers than the issues. But anyway I will be taking the 221 ticket and as very next one so let's not worry about correct wording here.
import zio.test._ | ||
|
||
import java.time.ZonedDateTime | ||
import java.util.UUID | ||
|
||
object WriteCheckpointIntegrationTests extends ConfigProviderTest { | ||
object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove the new line after object keyword
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the seemingly unused imports are actually needed, please add comments to them.
.../src/main/scala/za/co/absa/atum/server/api/database/flows/functions/GetFlowCheckpoints.scala
Show resolved
Hide resolved
"status", | ||
"status_text", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better approach:
While not incorrect, better approach, and enuring better compatibility, is to use the
super.fieldsToSelect ++
instead of naming explicitly naming "status", "status_text",
$partitioningJson | ||
) ${Fragment.const(alias)};""" | ||
} | ||
override val fieldsToSelect: Seq[String] = Seq("status", "status_text", "measure_name", "measured_columns") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto with super.fieldsToSelect
.
override val fieldsToSelect: Seq[String] = Seq( | ||
"status", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto with super.fieldsToSelect
.
extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, AdditionalDataFromDB, Task]( | ||
values => Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}")) | ||
with StandardStatusHandling with ByFirstErrorStatusAggregator { | ||
override val fieldsToSelect: Seq[String] = Seq("status", "status_text", "ad_name", "ad_value") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto with super.fieldsToSelect
.
...scala/za/co/absa/atum/server/api/database/runs/functions/CreatePartitioningIfNotExists.scala
Show resolved
Hide resolved
...scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningAdditionalData.scala
Outdated
Show resolved
Hide resolved
import doobie.postgres.implicits._ | ||
import doobie.postgres.circe.jsonb.implicits.jsonbPut | ||
import doobie.postgres.circe.json.implicits.jsonGet | ||
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unused
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JsonbGet is not needed, JsonbPut is needed.
.../main/scala/za/co/absa/atum/server/api/database/runs/functions/GetPartitioningMeasures.scala
Show resolved
Hide resolved
import za.co.absa.atum.model.dto.MeasureResultDTO._ | ||
import za.co.absa.atum.server.api.database.DoobieImplicits.Sequence.get | ||
import za.co.absa.atum.server.api.database.DoobieImplicits.Jsonb.jsonbArrayPut | ||
import doobie.postgres.circe.jsonb.implicits.jsonbGet | ||
import doobie.postgres.circe.jsonb.implicits.jsonbPut | ||
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut | ||
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut | ||
import doobie.postgres.implicits._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems unused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
import za.co.absa.atum.model.dto.MeasureResultDTO._ is not needed, the rest is.
Implemented suggestions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Release notes: