Skip to content

Commit

Permalink
tests
Browse files Browse the repository at this point in the history
  • Loading branch information
salamonpavel committed Sep 3, 2024
1 parent fd67127 commit 6537241
Show file tree
Hide file tree
Showing 18 changed files with 541 additions and 177 deletions.
116 changes: 116 additions & 0 deletions database/src/main/postgres/runs/V1.9.7__create_partitioning.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

CREATE OR REPLACE FUNCTION runs.create_partitioning(
IN i_partitioning JSONB,
IN i_by_user TEXT,
IN i_parent_partitioning JSONB = NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_partitioning BIGINT
) RETURNS record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.create_partitioning(3)
-- Creates a partitioning entry
--
-- Parameters:
-- i_partitioning - partitioning which existence to check
-- i_by_user - user behind the change
-- i_parent_partitioning - parent partitioning of the provided partitioning, optional
--
-- Returns:
-- status - Status code
-- status_text - Status text
-- id_partitioning - id of the partitioning
--
-- Status codes:
-- 11 - Partitioning created
-- 12 - Partitioning parent registered
-- 31 - Partitioning already present
--
-------------------------------------------------------------------------------
DECLARE
_fk_parent_partitioning BIGINT := NULL;
_create_partitioning BOOLEAN;
_status BIGINT;
BEGIN
-- SELECT
-- CASE
-- WHEN status = 14 THEN 31
-- ELSE status
-- END as o_status,
-- status_text as o_status_text,
-- id_partitioning as o_id_partitioning
-- FROM runs.create_partitioning_if_not_exists(i_partitioning, i_by_user, i_parent_partitioning);
-- RETURN;

id_partitioning := runs._get_id_partitioning(i_partitioning, true);

_create_partitioning := id_partitioning IS NULL;

IF i_parent_partitioning IS NOT NULL THEN
SELECT CPINE.id_partitioning
FROM runs.create_partitioning_if_not_exists(i_parent_partitioning, i_by_user, NULL) AS CPINE
INTO _fk_parent_partitioning;
END IF;


IF _create_partitioning THEN
INSERT INTO runs.partitionings (partitioning, created_by)
VALUES (i_partitioning, i_by_user)
RETURNING partitionings.id_partitioning
INTO create_partitioning.id_partitioning;

PERFORM 1
FROM flows._create_flow(id_partitioning, i_by_user);

status := 11;
status_text := 'Partitioning created';
ELSE
status := 31;
status_text := 'Partitioning already present';
RETURN;
END IF;

IF i_parent_partitioning IS NOT NULL THEN

SELECT ATPF.status
FROM flows._add_to_parent_flows(_fk_parent_partitioning, id_partitioning, i_by_user) AS ATPF
INTO _status;

IF _create_partitioning THEN
-- copying measure definitions to establish continuity
INSERT INTO runs.measure_definitions(fk_partitioning, measure_name, measured_columns, created_by, created_at)
SELECT id_partitioning, CMD.measure_name, CMD.measured_columns, CMD.created_by, CMD.created_at
FROM runs.measure_definitions CMD
WHERE CMD.fk_partitioning = _fk_parent_partitioning;

-- additional data are not copied, they are specific for particular partitioning
ELSIF (_status = 11) THEN
status := 12;
status_text := 'Partitioning parent registered';
END IF;
END IF;

RETURN;
END;
$$
LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.create_partitioning(JSONB, TEXT, JSONB) OWNER TO atum_owner;
GRANT EXECUTE ON FUNCTION runs.create_partitioning(JSONB, TEXT, JSONB) TO atum_user;
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.atum.database.runs

import za.co.absa.balta.DBTestSuite
import za.co.absa.balta.classes.JsonBString

class CreatePartitioningIntegrationTests extends DBTestSuite{

private val fncCreatePartitioning = "runs.create_partitioning"

private val partitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3", "key2", "key4"],
| "keysToValues": {
| "key1": "valueX",
| "key2": "valueY",
| "key3": "valueZ",
| "key4": "valueA"
| }
|}
|""".stripMargin
)

private val parentPartitioning = JsonBString(
"""
|{
| "version": 1,
| "keys": ["key1", "key3"],
| "keysToValues": {
| "key1": "valueX",
| "key3": "valueZ"
| }
|}
|""".stripMargin
)

test("Partitioning created") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

table("runs.partitionings").where(add("id_partitioning", partitioningID)) {partitioningResult =>
val row = partitioningResult.next()
// assert(row.getJsonB("partitioning").contains(partitioning)) TODO keys are reordered in JsonB and whitespaces removed
assert(row.getString("created_by").contains("Fantômas"))
assert(row.getOffsetDateTime("created_at").contains(now()))
}

val idFlow = table("flows.partitioning_to_flow").where(add("fk_partitioning", partitioningID)) { partToFlowResult =>
assert(partToFlowResult.hasNext)
val partToFlowRow = partToFlowResult.next()
val result = partToFlowRow.getLong("fk_flow")
assert(partToFlowRow.getString("created_by").contains("Fantômas"))
assert(!partToFlowResult.hasNext)
result.get
}

table("flows.flows").where(add("id_flow", idFlow)) {flowsResult =>
assert(flowsResult.hasNext)
val flowRow = flowsResult.next()
assert(flowRow.getString("flow_name").exists(_.startsWith("Custom flow #")))
assert(flowRow.getString("flow_description").contains(""))
assert(flowRow.getBoolean("from_pattern").contains(false))
assert(flowRow.getString("created_by").contains("Fantômas"))
assert(flowRow.getOffsetDateTime("created_at").contains(now()))
assert(!flowsResult.hasNext)
}
}
test("Partitioning created with parent partitioning that already exists") {
val parentPartitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", parentPartitioning)
.setParam("i_by_user", "Albert Einstein")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning", parentPartitioning)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", parentPartitioningID)) == 1
)
assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 2
)
}

test("Partitioning already exists") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already present"))
assert(row.getLong("id_partitioning").contains(partitioningID))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}

test("Partitioning exists, parent is not added") {
val partitioningID = function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParamNull("i_parent_partitioning")
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(11))
assert(row.getString("status_text").contains("Partitioning created"))
row.getLong("id_partitioning").get
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)

function(fncCreatePartitioning)
.setParam("i_partitioning", partitioning)
.setParam("i_by_user", "Fantômas")
.setParam("i_parent_partitioning", parentPartitioning)
.execute { queryResult =>
assert(queryResult.hasNext)
val row = queryResult.next()
assert(row.getInt("status").contains(31))
assert(row.getString("status_text").contains("Partitioning already present"))
}

assert(
table("flows.partitioning_to_flow").count(add("fk_partitioning", partitioningID)) == 1
)
}
}
2 changes: 1 addition & 1 deletion server/src/main/scala/za/co/absa/atum/server/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ object Main extends ZIOAppDefault with Server {
CheckpointRepositoryImpl.layer,
FlowRepositoryImpl.layer,
CreatePartitioningIfNotExists.layer,
CreatePartitioningIfNotExistsV2.layer,
CreatePartitioning.layer,
GetPartitioningMeasures.layer,
GetPartitioningAdditionalData.layer,
GetPartitioningAdditionalDataV2.layer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ trait PartitioningController {
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, AtumContextDTO]

def createPartitioningIfNotExistsV2(
def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,6 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
atumContextDTOEffect
}

override def createOrUpdateAdditionalDataV2(
additionalData: AdditionalDataSubmitDTO
): IO[ErrorResponse, SingleSuccessResponse[AdditionalDataSubmitDTO]] = {
mapToSingleSuccessResponse(
serviceCall[Unit, AdditionalDataSubmitDTO](
partitioningService.createOrUpdateAdditionalData(additionalData),
_ => additionalData
)
)
}

override def getPartitioningCheckpointsV2(
checkpointQueryDTO: CheckpointQueryDTO
): IO[ErrorResponse, MultiSuccessResponse[CheckpointDTO]] = {
Expand Down Expand Up @@ -90,13 +79,13 @@ class PartitioningControllerImpl(partitioningService: PartitioningService)
)
}

override def createPartitioningIfNotExistsV2(
override def postPartitioning(
partitioningSubmitDTO: PartitioningSubmitDTO
): IO[ErrorResponse, (SingleSuccessResponse[PartitioningWithIdDTO], String)] = {
for {
response <- mapToSingleSuccessResponse(
serviceCall[PartitioningWithIdDTO, PartitioningWithIdDTO](
partitioningService.createPartitioningIfNotExistsV2(partitioningSubmitDTO)
partitioningService.createPartitioning(partitioningSubmitDTO)
)
)
uri <- createV2RootAnchoredResourcePath(
Expand Down
Loading

0 comments on commit 6537241

Please sign in to comment.