Skip to content

Commit

Permalink
Create DB function to get all checkpoint data of a partitioning :#189 (
Browse files Browse the repository at this point in the history
…#191)

* Fixes #189 - Define and implemented a db function to return the checkpoint

* Fixes #189 - Modifying a db function

* Fixes #189 - Modifying a db function

* Fixes #189 - Adding test cases

* Fixes #189 - Added two input fields to the db function

* Fixes #189 - Fixing type mismatch

* Testing #120 db integration

* Fixes #189 - Adding test cases

* Fixes #189 - Changing back the details of flows.flows

* Fixes #189 - Implementing GitHub comments

* Update database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoint.sql

Co-authored-by: David Benedeki <[email protected]>

* Update database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoint.sql

Co-authored-by: David Benedeki <[email protected]>

* Update database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoint.sql

Co-authored-by: David Benedeki <[email protected]>

* Update database/src/main/postgres/runs/V1.8.3__get_partitioning_checkpoint.sql

Co-authored-by: David Benedeki <[email protected]>

* Fixes #189 - Addressing GitHub comments

* Closes #189

* Closes #189 - Addressing GitHub comments

* Closes #189 - Addressing GitHub comments

---------

Co-authored-by: David Benedeki <[email protected]>
  • Loading branch information
TebaleloS and benedeki authored May 7, 2024
1 parent 4d83d79 commit f98828b
Show file tree
Hide file tree
Showing 3 changed files with 457 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ $$
-- Parameters:
-- i_partitioning_of_flow - partitioning to use for identifying the flow associate with checkpoints
-- that will be retrieved
-- i_limit - (optional) maximum number of checkpoints retrieved
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
--
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright 2021 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
CREATE OR REPLACE FUNCTION runs.get_partitioning_checkpoints(
IN i_partitioning JSONB,
IN i_limit INT DEFAULT 5,
IN i_checkpoint_name TEXT DEFAULT NULL,
OUT status INTEGER,
OUT status_text TEXT,
OUT id_checkpoint UUID,
OUT checkpoint_name TEXT,
OUT measure_name TEXT,
OUT measure_columns TEXT[],
OUT measurement_value JSONB,
OUT checkpoint_start_time TIMESTAMP WITH TIME ZONE,
OUT checkpoint_end_time TIMESTAMP WITH TIME ZONE
)
RETURNS SETOF record AS
$$
-------------------------------------------------------------------------------
--
-- Function: runs.get_partitioning_checkpoints(JSONB, INT, TEXT)
-- Retrieves all checkpoints (measures and their measurement details) related to a
-- given partitioning (and checkpoint name, if specified).
--
-- Parameters:
-- i_partitioning - partitioning of requested checkpoints
-- i_limit - (optional) maximum number of checkpoint's measurements to return
-- if 0 specified, all data will be returned, i.e. no limit will be applied
-- i_checkpoint_name - (optional) if specified, returns data related to particular checkpoint's name
--
-- Returns:
-- status - Status code
-- status_text - Status message
-- id_checkpoint - ID of the checkpoint
-- checkpoint_name - Name of the checkpoint
-- measure_name - Name of the measure
-- measure_columns - Columns of the measure
-- measurement_value - Value of the measurement
-- checkpoint_start_time - Time of the checkpoint
-- checkpoint_end_time - End time of the checkpoint computation
--
-- Status codes:
-- 11 - OK
-- 41 - Partitioning not found
--
-------------------------------------------------------------------------------
DECLARE
_fk_partitioning BIGINT;
BEGIN
_fk_partitioning = runs._get_id_partitioning(i_partitioning);

IF _fk_partitioning IS NULL THEN
status := 41;
status_text := 'Partitioning not found';
RETURN NEXT;
RETURN;
END IF;

RETURN QUERY
SELECT
11 AS status,
'Ok' AS status_text,
c.id_checkpoint,
c.checkpoint_name,
md.measure_name,
md.measured_columns,
m.measurement_value,
c.process_start_time AS checkpoint_start_time,
c.process_end_time AS checkpoint_end_time
FROM
runs.checkpoints c
JOIN
runs.measurements m ON c.id_checkpoint = m.fk_checkpoint
JOIN
runs.measure_definitions md ON m.fk_measure_definition = md.id_measure_definition
WHERE
c.fk_partitioning = _fk_partitioning
AND
(i_checkpoint_name IS NULL OR c.checkpoint_name = i_checkpoint_name)
ORDER BY
c.process_start_time,
c.id_checkpoint
LIMIT nullif(i_limit, 0);

END;
$$

LANGUAGE plpgsql VOLATILE SECURITY DEFINER;

ALTER FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) OWNER TO atum_owner;

GRANT EXECUTE ON FUNCTION runs.get_partitioning_checkpoints(JSONB, INT, TEXT) TO atum_owner;

Loading

0 comments on commit f98828b

Please sign in to comment.