Skip to content

Commit

Permalink
[Issue #2405] Account for updates when last_upd_date is null (#2406)
Browse files Browse the repository at this point in the history
## Summary
Fixes #2405

### Time to review: __5 mins__

## Changes proposed
Fallback to using `created_date` when `last_upd_date` is null

## Context for reviewers
The legacy Oracle database has a last_upd_date timestamp that we use for
finding updates to our system.

This field is always null when a record is first inserted, and only on
receiving at least one update will it be populated.

This means that if a record is inserted, we copy over last_upd_date=null
and then an update happens, we try to do effectively: where null < {some
timestamp} which means we find nothing when trying to update. What we
instead should do is fallback to using the created_date timestamp if the
update date is null.

## Additional information
Found this when I realized there were cases where we just weren't
processing updates. Turns out, the ELT process doesn't handle updates
pretty much at all right now (this is the common case)
  • Loading branch information
chouinar authored Oct 8, 2024
1 parent 2a7f4c1 commit 12bf286
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
10 changes: 9 additions & 1 deletion api/src/data_migration/load/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,15 @@ def build_select_updated_rows_sql(
== sqlalchemy.tuple_(*source_table.primary_key.columns),
)
# `WHERE ...`
.where(destination_table.c.last_upd_date < source_table.c.last_upd_date)
# NOTE: The legacy system doesn't populate the last_upd_date unless at least one update
# has occurred, so we need to fallback to the created_date otherwise it will always be
# null on the destination table side
.where(
sqlalchemy.func.coalesce(
destination_table.c.last_upd_date, destination_table.c.created_date
)
< source_table.c.last_upd_date
)
.order_by(*source_table.primary_key.columns)
)

Expand Down
15 changes: 11 additions & 4 deletions api/tests/src/data_migration/load/test_load_oracle_data_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact
source_record4 = ForeignTopportunityFactory.create(
opportunity_id=4, oppnumber="A-4-update", cfdas=[], last_upd_date=time3
)
source_record5 = ForeignTopportunityFactory.create(
opportunity_id=6, oppnumber="A-6-update", cfdas=[], last_upd_date=time3
)

## Destination records
# unchanged:
Expand All @@ -95,6 +98,9 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact
StagingTopportunityFactory.create(
opportunity_id=4, oppnumber="A-4", cfdas=[], last_upd_date=time1
)
StagingTopportunityFactory.create(
opportunity_id=6, oppnumber="A-6", cfdas=[], last_upd_date=None
)
# delete:
StagingTopportunityFactory.create(
opportunity_id=5, oppnumber="A-5", cfdas=[], last_upd_date=time2
Expand All @@ -109,8 +115,8 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact
# this prevents some weirdness with the value comparison we'll do
db_session.expire_all()

assert db_session.query(source_table).count() == 4
assert db_session.query(destination_table).count() == 5
assert db_session.query(source_table).count() == 5
assert db_session.query(destination_table).count() == 6

destination_records = (
db_session.query(destination_table).order_by(destination_table.c.opportunity_id).all()
Expand All @@ -123,13 +129,14 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact
)
validate_copied_value(source_table, source_record4, destination_records[3])
validate_copied_value(source_table, None, destination_records[4], is_delete=True)
validate_copied_value(source_table, source_record5, destination_records[5])

assert task.metrics["count.delete.topportunity"] == 1
assert task.metrics["count.insert.topportunity"] == 2
assert task.metrics["count.update.topportunity"] == 1
assert task.metrics["count.update.topportunity"] == 2
assert task.metrics["count.delete.total"] == 1
assert task.metrics["count.insert.total"] == 2
assert task.metrics["count.update.total"] == 1
assert task.metrics["count.update.total"] == 2

def test_raises_if_table_dicts_different(self, db_session, foreign_tables, staging_tables):
with pytest.raises(
Expand Down
12 changes: 7 additions & 5 deletions api/tests/src/data_migration/load/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def source_table(sqlalchemy_metadata):
sqlalchemy.Column("id2", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("x", sqlalchemy.Text),
sqlalchemy.Column("last_upd_date", sqlalchemy.TIMESTAMP),
sqlalchemy.Column("created_date", sqlalchemy.TIMESTAMP),
)


Expand All @@ -35,6 +36,7 @@ def destination_table(sqlalchemy_metadata):
sqlalchemy.Column("x", sqlalchemy.Text),
sqlalchemy.Column("is_deleted", sqlalchemy.Boolean),
sqlalchemy.Column("last_upd_date", sqlalchemy.TIMESTAMP),
sqlalchemy.Column("created_date", sqlalchemy.TIMESTAMP),
)


Expand All @@ -59,17 +61,17 @@ def test_build_select_updated_rows_sql(source_table, destination_table):
"JOIN test_source_table ON "
"(test_destination_table.id1, test_destination_table.id2) = "
"(test_source_table.id1, test_source_table.id2) \n"
"WHERE test_destination_table.last_upd_date < test_source_table.last_upd_date "
"WHERE coalesce(test_destination_table.last_upd_date, test_destination_table.created_date) < test_source_table.last_upd_date "
"ORDER BY test_source_table.id1, test_source_table.id2"
)


def test_build_insert_select_sql(source_table, destination_table):
insert = sql.build_insert_select_sql(source_table, destination_table, [(1, 2), (3, 4), (5, 6)])
assert str(insert) == (
"INSERT INTO test_destination_table (id1, id2, x, last_upd_date, is_deleted) "
"INSERT INTO test_destination_table (id1, id2, x, last_upd_date, created_date, is_deleted) "
"SELECT test_source_table.id1, test_source_table.id2, test_source_table.x, "
"test_source_table.last_upd_date, FALSE AS is_deleted \n"
"test_source_table.last_upd_date, test_source_table.created_date, FALSE AS is_deleted \n"
"FROM test_source_table \n"
"WHERE (test_source_table.id1, test_source_table.id2) IN (__[POSTCOMPILE_param_1])"
)
Expand All @@ -80,8 +82,8 @@ def test_build_update_sql(source_table, destination_table):
assert str(update) == (
"UPDATE test_destination_table "
"SET id1=test_source_table.id1, id2=test_source_table.id2, x=test_source_table.x, "
"last_upd_date=test_source_table.last_upd_date FROM test_source_table "
"WHERE (test_destination_table.id1, test_destination_table.id2) = "
"last_upd_date=test_source_table.last_upd_date, created_date=test_source_table.created_date "
"FROM test_source_table WHERE (test_destination_table.id1, test_destination_table.id2) = "
"(test_source_table.id1, test_source_table.id2) AND "
"(test_source_table.id1, test_source_table.id2) "
"IN (__[POSTCOMPILE_param_1])"
Expand Down

0 comments on commit 12bf286

Please sign in to comment.