From 6bec33bd4881d565b89f04b7dec35c7b5c5b770c Mon Sep 17 00:00:00 2001 From: Pierre-Marie Petit Date: Fri, 29 Nov 2024 09:17:47 +0100 Subject: [PATCH] feat(anonymizer): base on pg_anonymizer, use detect to check columns. (#18) --- dblinter/default_config.yaml | 8 +++ .../rules/T012/TableWithSensibleColumn.py | 43 +++++++++++++ tests/conftest.py | 1 + .../T012/test_TableWithSensibleColumn.py | 61 +++++++++++++++++++ 4 files changed, 113 insertions(+) create mode 100644 dblinter/rules/T012/TableWithSensibleColumn.py create mode 100644 tests/rules/T012/test_TableWithSensibleColumn.py diff --git a/dblinter/default_config.yaml b/dblinter/default_config.yaml index ffa0330..66a3dd5 100644 --- a/dblinter/default_config.yaml +++ b/dblinter/default_config.yaml @@ -179,6 +179,14 @@ table: message: "Uppercase used on table {0}.{1}.{2}." fixes: - Do not use uppercase for any database objects + - name: TableWithSensibleColumn + ruleid: T012 + enabled: True + context: + desc: Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column. + message: "{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users." + fixes: + - Install extension anon, and create some masking rules on. schema: - name: SchemaWithDefaultRoleNotGranted ruleid: S001 diff --git a/dblinter/rules/T012/TableWithSensibleColumn.py b/dblinter/rules/T012/TableWithSensibleColumn.py new file mode 100644 index 0000000..1548126 --- /dev/null +++ b/dblinter/rules/T012/TableWithSensibleColumn.py @@ -0,0 +1,43 @@ +import logging + +from dblinter.database_connection import DatabaseConnection + +LOGGER = logging.getLogger("dblinter") + + +def table_with_sensible_column( + self, db: DatabaseConnection, _, context, table, sarif_document +): + LOGGER.debug( + "table_with_sensible_column for %s.%s in db %s", table[0], table[1], db.database + ) + CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'" + anon = db.query(CHECK_EXTENSION)[0][0] + if anon == 0: + LOGGER.info( + "TableWithSensibleColumn is enabled, but anon extension not found. in db %s. see https://postgresql-anonymizer.readthedocs.io to install", + db.database, + ) + return + SENSITIVE_COLS = f"""with coltable as (SELECT column_name, + identifiers_category from + anon.detect('en_US') + join pg_class c on oid=table_name + where c.relname='{table[1]}' + union + SELECT column_name, + identifiers_category from + anon.detect('fr_FR') + join pg_class c on oid=table_name + where c.relname='{table[1]}') + select distinct column_name,identifiers_category from coltable + """ + + uri = f"{db.database}.{table[0]}.{table[1]}" + sensitive_cols = db.query(SENSITIVE_COLS) + if sensitive_cols: + for elt in sensitive_cols: + message_args = (uri, elt[0], elt[1]) + sarif_document.add_check( + self.get_ruleid_from_function_name(), message_args, uri, context + ) diff --git a/tests/conftest.py b/tests/conftest.py index 1898ccd..5fb70bb 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -4,6 +4,7 @@ from testcontainers.core.waiting_utils import wait_container_is_ready from testcontainers.postgres import PostgresContainer +# PG_IMAGE = "registry.gitlab.com/dalibo/postgresql_anonymizer:latest" PG_IMAGE = "postgres:14" PG_PORT = 5432 PG_USER = "postgres" diff --git a/tests/rules/T012/test_TableWithSensibleColumn.py b/tests/rules/T012/test_TableWithSensibleColumn.py new file mode 100644 index 0000000..b3050a2 --- /dev/null +++ b/tests/rules/T012/test_TableWithSensibleColumn.py @@ -0,0 +1,61 @@ +from dblinter.configuration_model import Context +from dblinter.database_connection import DatabaseConnection +from dblinter.function_library import FunctionLibrary +from dblinter.sarif_document import SarifDocument + + +def test_table_with_sensitive_column(postgres_instance_args) -> None: + args = postgres_instance_args + db = DatabaseConnection(args) + CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'" + anon = db.query(CHECK_EXTENSION)[0][0] + if anon == 0: + assert True + return + context = Context( + desc="Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.", + fixes=[ + "Install extension anon, and create some masking rules on.", + ], + message="{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users.", + ) + function_library = FunctionLibrary() + db.query("select anon.init()") + db.query("CREATE TABLE test (id integer, creditcard text)") + sarif_document = SarifDocument() + function_library.get_function_by_function_name("table_with_sensible_column")( + function_library, db, [], context, ("public", "test"), sarif_document + ) + assert ( + sarif_document.sarif_doc.runs[0].results[0].message.text + == "postgres.public.test have column creditcard (category creditcard) that can be consider has sensitive. It should be masked for non data-operator users." + ) + assert ( + sarif_document.sarif_doc.runs[0].results[1].message.text + == "postgres.public.test have column id (category account_id) that can be consider has sensitive. It should be masked for non data-operator users." + ) + + +def test_table_without_sensitive_column(postgres_instance_args) -> None: + args = postgres_instance_args + db = DatabaseConnection(args) + CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'" + anon = db.query(CHECK_EXTENSION)[0][0] + if anon == 0: + assert True + return + context = Context( + desc="Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.", + fixes=[ + "Install extension anon, and create some masking rules on.", + ], + message="{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users.", + ) + function_library = FunctionLibrary() + db.query("select anon.init()") + db.query("CREATE TABLE test (test_id integer, description text)") + sarif_document = SarifDocument() + function_library.get_function_by_function_name("table_with_sensible_column")( + function_library, db, [], context, ("public", "test"), sarif_document + ) + assert sarif_document.sarif_doc.runs[0].results == []