Skip to content

Commit

Permalink
Fixing format comments
Browse files Browse the repository at this point in the history
Implemented suggestions
  • Loading branch information
TebaleloS committed Aug 4, 2024
1 parent 3005404 commit 94750ce
Show file tree
Hide file tree
Showing 46 changed files with 255 additions and 204 deletions.
17 changes: 11 additions & 6 deletions server/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,24 @@ To create a jar file that can be executed:
> java -jar server/target/jvm-2.13/*.jar
```

If you want to quickly build and run from sbt you can run using the command below (alternatively you can execute za.co.absa.atum.server.Main within your IDE). This deploys it to `localhost:8080`.
If you want to quickly build and run from sbt you can run using the command below (alternatively you can execute
za.co.absa.atum.server.Main within your IDE). This deploys it to `localhost:8080`.

```shell
sbt "server/runMain za.co.absa.atum.server.Main"
```

### REST API Reference
### REST API Reference

The REST API exposes a Swagger Documentation UI which documents all the HTTP endpoints exposed.
It can be found at **{REST_API_HOST}/docs/** (e.g. `http://localhost:8080/docs/`)

### Observability metrics

Optionally you can run server with monitoring that collects metrics about http communication and/or jvm/zio runtime. `intervalInSeconds` parameter refers to frequency of data collection from its runtime environment.
Monitoring of http communication is based on intercepting of http calls therefore `intervalInSeconds` parameter does not apply.
Optionally you can run server with monitoring that collects metrics about http communication and/or jvm/zio
runtime. `intervalInSeconds` parameter refers to frequency of data collection from its runtime environment.
Monitoring of http communication is based on intercepting of http calls therefore `intervalInSeconds` parameter does not
apply.

```
{
Expand All @@ -41,5 +44,7 @@ Monitoring of http communication is based on intercepting of http calls therefor
}
```

When monitoring enabled, the application exposes `http://localhost:8080/metrics` and/or `http://localhost:8080/zio-metrics` endpoints which can be scraped by Prometheus.
For testing purposes there is [docker-compose.yml](./docker-compose.yml) file which can be used to start up dockerized Prometheus and Grafana instances. Prometheus scraping configs are defined in [prometheus.yml](./prometheus.yml) file.
When monitoring enabled, the application exposes `http://localhost:8080/metrics`
and/or `http://localhost:8080/zio-metrics` endpoints which can be scraped by Prometheus.
For testing purposes there is [docker-compose.yml](./docker-compose.yml) file which can be used to start up dockerized
Prometheus and Grafana instances. Prometheus scraping configs are defined in [prometheus.yml](./prometheus.yml) file.
4 changes: 2 additions & 2 deletions server/prometheus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,14 @@ scrape_configs:
- job_name: 'atum_server_http4s'
metrics_path: /metrics
static_configs:
- targets: ['host.docker.internal:8080']
- targets: [ 'host.docker.internal:8080' ]
labels:
env: 'local'
app: 'atum'
- job_name: 'atum_server_zio_runtime'
metrics_path: /zio-metrics
static_configs:
- targets: ['host.docker.internal:8080']
- targets: [ 'host.docker.internal:8080' ]
labels:
env: 'local'
app: 'atum'
2 changes: 1 addition & 1 deletion server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ object Main extends ZIOAppDefault with Server {
// enabling conditionally collection of ZIO runtime metrics and default JVM metrics
if (jvmMonitoringConfig.enabled) {
ZLayer.succeed(MetricsConfig(Duration.ofSeconds(jvmMonitoringConfig.intervalInSeconds))) ++
Runtime.enableRuntimeMetrics.unit ++ DefaultJvmMetrics.live.unit
Runtime.enableRuntimeMetrics.unit ++ DefaultJvmMetrics.live.unit
} else {
ZLayer.succeed(MetricsConfig(Duration.ofSeconds(Long.MaxValue)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ trait BaseController {
.mapError { serviceError: ServiceError =>
InternalServerErrorResponse(serviceError.message)
}
.flatMap {
result => ZIO.succeed(onSuccessFnc(result))
.flatMap { result =>
ZIO.succeed(onSuccessFnc(result))
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,7 @@ import zio.macros.accessible

@accessible
trait FlowController {
def getFlowCheckpointsV2(checkpointQueryDTO: CheckpointQueryDTO): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
def getFlowCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]]
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package za.co.absa.atum.server.api.database


import doobie.postgres.implicits._
import doobie.{Get, Put}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ object TransactorProvider {
awsConfig <- ZIO.config[AwsConfig](AwsConfig.config)

awsSecretsProvider <- ZIO.service[AwsSecretsProvider]
password <- awsSecretsProvider.getSecretValue(awsConfig.dbPasswordSecretName)
password <- awsSecretsProvider
.getSecretValue(awsConfig.dbPasswordSecretName)
// fallback to password property's value from postgres section of reference.conf; useful for local testing
.orElse {
ZIO.logError("Credentials were not retrieved from AWS, falling back to config value.")
ZIO
.logError("Credentials were not retrieved from AWS, falling back to config value.")
.as(postgresConfig.password)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,17 @@ import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusA
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetFlowCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](
values => Seq(
extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values =>
Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.limit}",
fr"${values.checkpointName}"
)
) with StandardStatusHandling with ByFirstErrorStatusAggregator {
override val fieldsToSelect: Seq[String] = Seq(
"status",
"status_text",
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq(
"id_checkpoint",
"checkpoint_name",
"author",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import doobie.postgres.implicits._
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut

class CreateOrUpdateAdditionalData(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task](
values => Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.additionalData.map{ case (k, v) => (k, v.orNull)}}",
fr"${values.author}"
extends DoobieSingleResultFunctionWithStatus[AdditionalDataSubmitDTO, Unit, Task](values =>
Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.additionalData.map { case (k, v) => (k, v.orNull) }}",
fr"${values.author}"
)
)
) with StandardStatusHandling
with StandardStatusHandling

object CreateOrUpdateAdditionalData {
val layer: URLayer[PostgresDatabaseProvider, CreateOrUpdateAdditionalData] = ZLayer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,14 @@ import io.circe.syntax._
import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut

class CreatePartitioningIfNotExists(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[PartitioningSubmitDTO, Unit, Task](
values => Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.authorIfNew}",
fr"${values.parentPartitioning.map(PartitioningForDB.fromSeqPartitionDTO).map(_.asJson)}"
extends DoobieSingleResultFunctionWithStatus[PartitioningSubmitDTO, Unit, Task](values =>
Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.authorIfNew}",
fr"${values.parentPartitioning.map(PartitioningForDB.fromSeqPartitionDTO).map(_.asJson)}"
)
)
) with StandardStatusHandling
with StandardStatusHandling

object CreatePartitioningIfNotExists {
val layer: URLayer[PostgresDatabaseProvider, CreatePartitioningIfNotExists] = ZLayer {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@ import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbPut
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetPartitioningAdditionalData (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, AdditionalDataFromDB, Task](
values => Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}"))
with StandardStatusHandling with ByFirstErrorStatusAggregator {
override val fieldsToSelect: Seq[String] = Seq("status", "status_text", "ad_name", "ad_value")
class GetPartitioningAdditionalData(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, AdditionalDataFromDB, Task](values =>
Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}")
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("ad_name", "ad_value")
}

object GetPartitioningAdditionalData {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,17 +32,18 @@ import za.co.absa.db.fadb.doobie.postgres.circe.implicits.{jsonbGet, jsonbPut}
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetPartitioningCheckpoints (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task] (
values => Seq(
class GetPartitioningCheckpoints(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[CheckpointQueryDTO, CheckpointFromDB, Task](values =>
Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.limit}",
fr"${values.checkpointName}"
)
) with StandardStatusHandling with ByFirstErrorStatusAggregator {
override val fieldsToSelect: Seq[String] = Seq(
"status",
"status_text",
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq(
"id_checkpoint",
"checkpoint_name",
"author",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ import za.co.absa.atum.server.model.MeasureFromDB
import za.co.absa.db.fadb.status.aggregation.implementations.ByFirstErrorStatusAggregator
import za.co.absa.db.fadb.status.handling.implementations.StandardStatusHandling

class GetPartitioningMeasures (implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, MeasureFromDB, Task](
values => Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}"))
with StandardStatusHandling with ByFirstErrorStatusAggregator {

override val fieldsToSelect: Seq[String] = Seq("status", "status_text", "measure_name", "measured_columns")
class GetPartitioningMeasures(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieMultipleResultFunctionWithAggStatus[PartitioningDTO, MeasureFromDB, Task](values =>
Seq(fr"${PartitioningForDB.fromSeqPartitionDTO(values).asJson}")
)
with StandardStatusHandling
with ByFirstErrorStatusAggregator {

override def fieldsToSelect: Seq[String] = super.fieldsToSelect ++ Seq("measure_name", "measured_columns")
}

object GetPartitioningMeasures {
Expand All @@ -48,4 +49,3 @@ object GetPartitioningMeasures {
} yield new GetPartitioningMeasures()(Runs, dbProvider.dbEngine)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ import za.co.absa.db.fadb.doobie.postgres.circe.implicits.jsonbArrayPut
import doobie.postgres.implicits._

class WriteCheckpoint(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task](
values => Seq(
extends DoobieSingleResultFunctionWithStatus[CheckpointDTO, Unit, Task](values =>
Seq(
fr"${PartitioningForDB.fromSeqPartitionDTO(values.partitioning).asJson}",
fr"${values.id}",
fr"${values.name}",
Expand All @@ -44,7 +44,8 @@ class WriteCheckpoint(implicit schema: DBSchema, dbEngine: DoobieEngine[Task])
fr"${values.measurements.toList.map(_.asJson)}",
fr"${values.measuredByAtumAgent}",
fr"${values.author}"
))
)
)
with StandardStatusHandling

object WriteCheckpoint {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,12 @@ package za.co.absa.atum.server.api.http
import sttp.model.StatusCode
import sttp.tapir.generic.auto.schemaForCaseClass
import sttp.tapir.json.circe.jsonBody
import za.co.absa.atum.server.model.{BadRequestResponse, ErrorResponse, GeneralErrorResponse, InternalServerErrorResponse}
import za.co.absa.atum.server.model.{
BadRequestResponse,
ErrorResponse,
GeneralErrorResponse,
InternalServerErrorResponse
}
import sttp.tapir.typelevel.MatchType
import sttp.tapir.ztapir._
import sttp.tapir.{EndpointOutput, PublicEndpoint}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package za.co.absa.atum.server.api.http


import sttp.model.StatusCode
import sttp.tapir.generic.auto.schemaForCaseClass
import sttp.tapir.ztapir._
Expand All @@ -27,7 +26,6 @@ import za.co.absa.atum.server.model.ErrorResponse
import za.co.absa.atum.server.model.SuccessResponse.{MultiSuccessResponse, SingleSuccessResponse}
import sttp.tapir.{PublicEndpoint, endpoint}


trait Endpoints extends BaseEndpoints {

protected val createCheckpointEndpointV1: PublicEndpoint[CheckpointDTO, ErrorResponse, CheckpointDTO, Any] = {
Expand All @@ -38,47 +36,53 @@ trait Endpoints extends BaseEndpoints {
.out(jsonBody[CheckpointDTO])
}

protected val createCheckpointEndpointV2: PublicEndpoint[CheckpointDTO, ErrorResponse, SingleSuccessResponse[CheckpointDTO], Any] = {
protected val createCheckpointEndpointV2
: PublicEndpoint[CheckpointDTO, ErrorResponse, SingleSuccessResponse[CheckpointDTO], Any] = {
apiV2.post
.in(CreateCheckpoint)
.in(jsonBody[CheckpointDTO])
.out(statusCode(StatusCode.Created))
.out(jsonBody[SingleSuccessResponse[CheckpointDTO]])
}

protected val createPartitioningEndpointV1: PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, AtumContextDTO, Any] = {
protected val createPartitioningEndpointV1
: PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, AtumContextDTO, Any] = {
apiV1.post
.in(pathToAPIv1CompatibleFormat(CreatePartitioning))
.in(jsonBody[PartitioningSubmitDTO])
.out(statusCode(StatusCode.Ok))
.out(jsonBody[AtumContextDTO])
}

protected val createPartitioningEndpointV2: PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, SingleSuccessResponse[AtumContextDTO], Any] = {
protected val createPartitioningEndpointV2
: PublicEndpoint[PartitioningSubmitDTO, ErrorResponse, SingleSuccessResponse[AtumContextDTO], Any] = {
apiV2.post
.in(CreatePartitioning)
.in(jsonBody[PartitioningSubmitDTO])
.out(statusCode(StatusCode.Ok))
.out(jsonBody[SingleSuccessResponse[AtumContextDTO]])
}

protected val createOrUpdateAdditionalDataEndpointV2: PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = {
protected val createOrUpdateAdditionalDataEndpointV2
: PublicEndpoint[AdditionalDataSubmitDTO, ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO], Any] = {
apiV2.post
.in(CreateOrUpdateAdditionalData)
.in(jsonBody[AdditionalDataSubmitDTO])
.out(statusCode(StatusCode.Ok))
.out(jsonBody[SingleSuccessResponse[AdditionalDataSubmitDTO]])
}

protected val getPartitioningCheckpointsEndpointV2: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = {
protected val getPartitioningCheckpointsEndpointV2
: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = {
apiV2.get
.in(GetPartitioningCheckpoints)
.in(jsonBody[CheckpointQueryDTO])
.out(statusCode(StatusCode.Ok))
.out(jsonBody[MultiSuccessResponse[CheckpointDTO]])
}

protected val getFlowCheckpointsEndpointV2: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = {
protected val getFlowCheckpointsEndpointV2
: PublicEndpoint[CheckpointQueryDTO, ErrorResponse, MultiSuccessResponse[CheckpointDTO], Any] = {
apiV2.post
.in(GetFlowCheckpoints)
.in(jsonBody[CheckpointQueryDTO])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ trait Routes extends Endpoints with ServerOptions {
createServerEndpoint(createCheckpointEndpointV2, CheckpointController.createCheckpointV2),
createServerEndpoint(createPartitioningEndpointV1, PartitioningController.createPartitioningIfNotExistsV1),
createServerEndpoint(createPartitioningEndpointV2, PartitioningController.createPartitioningIfNotExistsV2),
createServerEndpoint(createOrUpdateAdditionalDataEndpointV2, PartitioningController.createOrUpdateAdditionalDataV2),
createServerEndpoint(
createOrUpdateAdditionalDataEndpointV2,
PartitioningController.createOrUpdateAdditionalDataV2
),
createServerEndpoint(getPartitioningCheckpointsEndpointV2, PartitioningController.getPartitioningCheckpointsV2),
createServerEndpoint(getFlowCheckpointsEndpointV2, FlowController.getFlowCheckpointsV2),
createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit),
createServerEndpoint(healthEndpoint, (_: Unit) => ZIO.unit)
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(metricsInterceptorOption)).from(endpoints).toRoutes
}
Expand All @@ -61,7 +64,7 @@ trait Routes extends Endpoints with ServerOptions {
createPartitioningEndpointV2,
createOrUpdateAdditionalDataEndpointV2,
getPartitioningCheckpointsEndpointV2,
getFlowCheckpointsEndpointV2,
getFlowCheckpointsEndpointV2
)
ZHttp4sServerInterpreter[HttpEnv.Env](http4sServerOptions(None))
.from(SwaggerInterpreter().fromEndpoints[HttpEnv.F](endpoints, SwaggerApiName, SwaggerApiVersion))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ import za.co.absa.atum.server.api.exception.ServiceError
import za.co.absa.atum.server.api.repository.CheckpointRepository
import zio._

class CheckpointServiceImpl(checkpointRepository: CheckpointRepository)
extends CheckpointService with BaseService {
class CheckpointServiceImpl(checkpointRepository: CheckpointRepository) extends CheckpointService with BaseService {

override def saveCheckpoint(checkpointDTO: CheckpointDTO): IO[ServiceError, Unit] = {
repositoryCall(
checkpointRepository.writeCheckpoint(checkpointDTO), "saveCheckpoint"
checkpointRepository.writeCheckpoint(checkpointDTO),
"saveCheckpoint"
)
}

Expand Down
Loading

0 comments on commit 94750ce

Please sign in to comment.