Skip to content

Commit

Permalink
feat(anonymizer): base on pg_anonymizer, use detect to check columns. (
Browse files Browse the repository at this point in the history
  • Loading branch information
pmpetit authored Nov 29, 2024
1 parent 7374c20 commit 6bec33b
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 0 deletions.
8 changes: 8 additions & 0 deletions dblinter/default_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,14 @@ table:
message: "Uppercase used on table {0}.{1}.{2}."
fixes:
- Do not use uppercase for any database objects
- name: TableWithSensibleColumn
ruleid: T012
enabled: True
context:
desc: Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.
message: "{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users."
fixes:
- Install extension anon, and create some masking rules on.
schema:
- name: SchemaWithDefaultRoleNotGranted
ruleid: S001
Expand Down
43 changes: 43 additions & 0 deletions dblinter/rules/T012/TableWithSensibleColumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import logging

from dblinter.database_connection import DatabaseConnection

LOGGER = logging.getLogger("dblinter")


def table_with_sensible_column(
self, db: DatabaseConnection, _, context, table, sarif_document
):
LOGGER.debug(
"table_with_sensible_column for %s.%s in db %s", table[0], table[1], db.database
)
CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'"
anon = db.query(CHECK_EXTENSION)[0][0]
if anon == 0:
LOGGER.info(
"TableWithSensibleColumn is enabled, but anon extension not found. in db %s. see https://postgresql-anonymizer.readthedocs.io to install",
db.database,
)
return
SENSITIVE_COLS = f"""with coltable as (SELECT column_name,
identifiers_category from
anon.detect('en_US')
join pg_class c on oid=table_name
where c.relname='{table[1]}'
union
SELECT column_name,
identifiers_category from
anon.detect('fr_FR')
join pg_class c on oid=table_name
where c.relname='{table[1]}')
select distinct column_name,identifiers_category from coltable
"""

uri = f"{db.database}.{table[0]}.{table[1]}"
sensitive_cols = db.query(SENSITIVE_COLS)
if sensitive_cols:
for elt in sensitive_cols:
message_args = (uri, elt[0], elt[1])
sarif_document.add_check(
self.get_ruleid_from_function_name(), message_args, uri, context
)
1 change: 1 addition & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from testcontainers.core.waiting_utils import wait_container_is_ready
from testcontainers.postgres import PostgresContainer

# PG_IMAGE = "registry.gitlab.com/dalibo/postgresql_anonymizer:latest"
PG_IMAGE = "postgres:14"
PG_PORT = 5432
PG_USER = "postgres"
Expand Down
61 changes: 61 additions & 0 deletions tests/rules/T012/test_TableWithSensibleColumn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
from dblinter.configuration_model import Context
from dblinter.database_connection import DatabaseConnection
from dblinter.function_library import FunctionLibrary
from dblinter.sarif_document import SarifDocument


def test_table_with_sensitive_column(postgres_instance_args) -> None:
args = postgres_instance_args
db = DatabaseConnection(args)
CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'"
anon = db.query(CHECK_EXTENSION)[0][0]
if anon == 0:
assert True
return
context = Context(
desc="Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.",
fixes=[
"Install extension anon, and create some masking rules on.",
],
message="{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users.",
)
function_library = FunctionLibrary()
db.query("select anon.init()")
db.query("CREATE TABLE test (id integer, creditcard text)")
sarif_document = SarifDocument()
function_library.get_function_by_function_name("table_with_sensible_column")(
function_library, db, [], context, ("public", "test"), sarif_document
)
assert (
sarif_document.sarif_doc.runs[0].results[0].message.text
== "postgres.public.test have column creditcard (category creditcard) that can be consider has sensitive. It should be masked for non data-operator users."
)
assert (
sarif_document.sarif_doc.runs[0].results[1].message.text
== "postgres.public.test have column id (category account_id) that can be consider has sensitive. It should be masked for non data-operator users."
)


def test_table_without_sensitive_column(postgres_instance_args) -> None:
args = postgres_instance_args
db = DatabaseConnection(args)
CHECK_EXTENSION = "select count(*) as nb from pg_extension where extname='anon'"
anon = db.query(CHECK_EXTENSION)[0][0]
if anon == 0:
assert True
return
context = Context(
desc="Base on the extension anon (https://postgresql-anonymizer.readthedocs.io/en/stable/detection), show sensitive column.",
fixes=[
"Install extension anon, and create some masking rules on.",
],
message="{0} have column {1} (category {2}) that can be consider has sensitive. It should be masked for non data-operator users.",
)
function_library = FunctionLibrary()
db.query("select anon.init()")
db.query("CREATE TABLE test (test_id integer, description text)")
sarif_document = SarifDocument()
function_library.get_function_by_function_name("table_with_sensible_column")(
function_library, db, [], context, ("public", "test"), sarif_document
)
assert sarif_document.sarif_doc.runs[0].results == []

0 comments on commit 6bec33b

Please sign in to comment.