Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support common table expressions (CTEs) #21

Merged
merged 5 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "sinker"
version = "0.1.2"
version = "0.1.3"
description = "Synchronize Postgres to Elasticsearch"
authors = ["Loren Siebert <[email protected]>"]
license = "MIT/Apache-2.0"
Expand All @@ -15,6 +15,7 @@ elasticsearch = "^8.17.0"
environs = ">=9.5,<15.0"
psycopg = "^3.1.8"
pytest-mock = "^3.10.0"
sqlglot = "^26.2.1"

[tool.poetry.group.dev.dependencies]
flake8 = ">=6,<8"
Expand Down
26 changes: 13 additions & 13 deletions src/sinker/sinker.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
PGCHUNK_SIZE,
SCHEMA_TABLE_DELIMITER,
)
from .utils import generate_schema_tables
from .utils import parse_schema_tables

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -107,23 +107,23 @@ def setup_pg(self) -> None:
plpgsql: str = f"{schema_view_name}_fn"
create_function: str = q.CREATE_FUNCTION.format(plpgsql, SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name)
ddl_list.append(create_function)
for schema_table in generate_schema_tables(view_select_query):
# The last table is the top-level table that gets DELETE events with an ID in the replication slot.
# The materialized views do not contain the ID of the doc being deleted,
# so we'll use this table's delete events as a proxy.
# lsn,xid,data
# 0/24EDA4D8,17393,BEGIN 17393
# 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'"
# 0/24EDC1B0,17393,COMMIT 17393
# 0/24EDC228,17394,BEGIN 17394
# 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data)
# 0/24EF4718,17394,COMMIT 17394
self.parent_table, schema_tables = parse_schema_tables(view_select_query)
for schema_table in schema_tables:
schema, _, table = schema_table.rpartition(SCHEMA_TABLE_DELIMITER)
schema = schema or DEFAULT_SCHEMA
trigger_name: str = f"{SINKER_SCHEMA}_{self.view}_{schema}_{table}"
create_trigger: str = q.CREATE_TRIGGER.format(trigger_name, schema, table, plpgsql)
ddl_list.append(create_trigger)
# The last table is the top-level table that gets DELETE events with an ID in the replication slot.
# The materialized views do not contain the ID of the doc being deleted,
# so we'll use this table's delete events as a proxy.
# lsn,xid,data
# 0/24EDA4D8,17393,BEGIN 17393
# 0/24EDA4D8,17393,"table public.""Foo"": DELETE: id[text]:'91754ea9-2983-4cf7-bdf9-fc23d2386d90'"
# 0/24EDC1B0,17393,COMMIT 17393
# 0/24EDC228,17394,BEGIN 17394
# 0/24EF0D60,17394,table sinker.foo_mv: DELETE: (no-tuple-data)
# 0/24EF4718,17394,COMMIT 17394
self.parent_table = schema_table
create_todo_entry: str = q.CREATE_TODO_ENTRY.format(SINKER_SCHEMA, SINKER_TODO_TABLE, schema_view_name)
ddl_list.append(create_todo_entry)
psycopg.connect(autocommit=True).execute("; ".join(ddl_list))
Expand Down
28 changes: 14 additions & 14 deletions src/sinker/utils.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import re
from typing import Set, Tuple

from typing import Iterable
import sqlglot
from sqlglot.expressions import Table, CTE

TABLE_RE = re.compile(r"from\s\"?(\S+)\b", re.I)


def generate_schema_tables(view_select_query: str) -> Iterable[str]:
def parse_schema_tables(view_select_query: str) -> Tuple[str, Set[str]]:
"""
Given a view select query, return a list of unique tables that are referenced in the query
in the order they were encountered.
Skip anything that looks like a function call.
Given a view select query, return a primary parent table and the set of unique tables that are referenced in the
query. Skip anything that looks like a function call.
:param view_select_query: The select query from the view
"""
seen: set = set()
for table_candidate in TABLE_RE.findall(view_select_query):
if "(" not in table_candidate:
if table_candidate not in seen:
seen.add(table_candidate)
yield table_candidate
parsed = sqlglot.parse_one(view_select_query)
parent_table = parsed.find(Table)
if parent_table is None:
raise ValueError("No table found in the query")
tables = {table.name for table in parsed.find_all(Table)}
ctes = {cte.alias for cte in parsed.find_all(CTE)}
schema_tables = tables - ctes
return parent_table.name, schema_tables
15 changes: 0 additions & 15 deletions tests/test_generate_schema_tables.py

This file was deleted.

72 changes: 72 additions & 0 deletions tests/test_parse_schema_tables.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
from sinker.utils import parse_schema_tables


def test_parse_schema_tables():
view_select_query = """select id,
json_build_object(
'name', "name",
'otherEmailDomains',(select array_agg(split_part(email, '@', 2)) FROM unnest(emails) as email),
'emailDomains', (select array_agg(split_part(value, '@', 2))
from "EmailAddress" EA where "personId"="Person".id),
'emailAddresses', (select array_agg(value) from "EmailAddress" EA where "personId"="Person".id),
) as "person"
from "person"
"""
parent_table, schema_tables = parse_schema_tables(view_select_query)
assert parent_table == "person"
assert schema_tables == {"EmailAddress", "person"}


def test_parse_schema_tables_with_cte():
view_select_query = """
WITH
attendees AS (
SELECT DISTINCT ON (a."personId", a."hostedEventId")
a."hostedEventId",
a.status,
e.value as email,
p."primaryOrganizationId"
FROM "HostedEventAttendance" a
JOIN "Person" p ON a."personId" = p.id
JOIN "EmailAddress" e ON p.id = e."personId"
GROUP BY
a."personId",
a."hostedEventId",
a.status,
e.value,
p."primaryOrganizationId"
)
SELECT
id,
json_build_object(
'summary', "name",
'startTime', "timestamp",
'attendees', (
SELECT json_agg(json_build_object('email', attendees.email, 'eventResponse', attendees.status))
AS formatted_attendees
FROM attendees
WHERE attendees."hostedEventId" = "HostedEvent".id
),
'organizationIds',
(
SELECT array_agg(attendees."primaryOrganizationId")
FROM attendees
WHERE attendees."hostedEventId" = "HostedEvent".id
)
) AS "hosted_events"
FROM
"HostedEvent"
"""
parent_table, schema_tables = parse_schema_tables(view_select_query)
assert parent_table == "HostedEvent"
assert schema_tables == {"EmailAddress", "HostedEvent", "HostedEventAttendance", "Person"}


def test_error_handling_on_query_with_no_table():
view_select_query = """select 1"""
try:
parse_schema_tables(view_select_query)
except ValueError as e:
assert str(e) == "No table found in the query"
else:
assert False, "Expected ValueError"
Loading