Skip to content

Commit

Permalink
Optimize update using a MATERIALIZED cte
Browse files Browse the repository at this point in the history
  • Loading branch information
jamesbursa committed May 3, 2024
1 parent 7d6f5d3 commit 667b34e
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 6 deletions.
16 changes: 13 additions & 3 deletions api/src/data_migration/load/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,26 @@ def build_update_sql(
source_table: sqlalchemy.Table, destination_table: sqlalchemy.Table
) -> sqlalchemy.Update:
"""Build an `UPDATE ... SET ... WHERE ...` statement for updated rows."""
cte = (
sqlalchemy.select(*destination_table.primary_key.columns)
.join(
source_table,
sqlalchemy.tuple_(*destination_table.primary_key.columns)
== sqlalchemy.tuple_(*source_table.primary_key.columns),
)
.where(destination_table.c.last_upd_date < source_table.c.last_upd_date)
.cte("update_pks")
.prefix_with("MATERIALIZED")
)

return (
# `UPDATE <destination_table>`
sqlalchemy.update(destination_table)
# `SET col1=source_table.col1, col2=source_table.col2, ...`
.values(dict(source_table.columns))
# `WHERE ...`
.where(
sqlalchemy.tuple_(*destination_table.primary_key.columns)
== sqlalchemy.tuple_(*source_table.primary_key.columns),
destination_table.c.last_upd_date < source_table.c.last_upd_date,
sqlalchemy.tuple_(*destination_table.primary_key.columns).in_(sqlalchemy.select(cte.c))

Check warning on line 56 in api/src/data_migration/load/sql.py

View workflow job for this annotation

GitHub Actions / Run API Checks / API Lint, Format & Tests

No overload variant of "select" matches argument type "ReadOnlyColumnCollection[str, KeyedColumnElement[Any]]" [call-overload]
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ def destination_table(sqlalchemy_metadata):
def create_tables(db_client, sqlalchemy_metadata, source_table, destination_table):
with db_client.get_connection() as conn, conn.begin():
sqlalchemy_metadata.create_all(bind=conn)
yield
with db_client.get_connection() as conn, conn.begin():
sqlalchemy_metadata.drop_all(bind=conn)


@freezegun.freeze_time()
Expand Down
14 changes: 11 additions & 3 deletions api/tests/src/data_migration/load/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,20 @@ def test_build_insert_select_sql(source_table, destination_table):
def test_build_update_sql(source_table, destination_table):
update = sql.build_update_sql(source_table, destination_table)
assert str(update) == (
"WITH update_pks AS MATERIALIZED \n"
"(SELECT test_destination_table.id1 AS id1, test_destination_table.id2 AS id2 \n"
"FROM test_destination_table "
"JOIN test_source_table "
"ON (test_destination_table.id1, test_destination_table.id2) = "
"(test_source_table.id1, test_source_table.id2) \n"
"WHERE test_destination_table.last_upd_date < "
"test_source_table.last_upd_date)\n "
"UPDATE test_destination_table "
"SET id1=test_source_table.id1, id2=test_source_table.id2, x=test_source_table.x, "
"last_upd_date=test_source_table.last_upd_date FROM test_source_table "
"WHERE (test_destination_table.id1, test_destination_table.id2) = "
"(test_source_table.id1, test_source_table.id2) "
"AND test_destination_table.last_upd_date < test_source_table.last_upd_date"
"WHERE (test_destination_table.id1, test_destination_table.id2) "
"IN (SELECT update_pks.id1, update_pks.id2 \n"
"FROM update_pks)"
)


Expand Down

0 comments on commit 667b34e

Please sign in to comment.