diff --git a/api/src/data_migration/load/sql.py b/api/src/data_migration/load/sql.py index d7c060eb2..6d4c182de 100644 --- a/api/src/data_migration/load/sql.py +++ b/api/src/data_migration/load/sql.py @@ -67,7 +67,15 @@ def build_select_updated_rows_sql( == sqlalchemy.tuple_(*source_table.primary_key.columns), ) # `WHERE ...` - .where(destination_table.c.last_upd_date < source_table.c.last_upd_date) + # NOTE: The legacy system doesn't populate the last_upd_date unless at least one update + # has occurred, so we need to fallback to the created_date otherwise it will always be + # null on the destination table side + .where( + sqlalchemy.func.coalesce( + destination_table.c.last_upd_date, destination_table.c.created_date + ) + < source_table.c.last_upd_date + ) .order_by(*source_table.primary_key.columns) ) diff --git a/api/tests/src/data_migration/load/test_load_oracle_data_task.py b/api/tests/src/data_migration/load/test_load_oracle_data_task.py index 2f6746124..ca8b0619a 100644 --- a/api/tests/src/data_migration/load/test_load_oracle_data_task.py +++ b/api/tests/src/data_migration/load/test_load_oracle_data_task.py @@ -85,6 +85,9 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact source_record4 = ForeignTopportunityFactory.create( opportunity_id=4, oppnumber="A-4-update", cfdas=[], last_upd_date=time3 ) + source_record5 = ForeignTopportunityFactory.create( + opportunity_id=6, oppnumber="A-6-update", cfdas=[], last_upd_date=time3 + ) ## Destination records # unchanged: @@ -95,6 +98,9 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact StagingTopportunityFactory.create( opportunity_id=4, oppnumber="A-4", cfdas=[], last_upd_date=time1 ) + StagingTopportunityFactory.create( + opportunity_id=6, oppnumber="A-6", cfdas=[], last_upd_date=None + ) # delete: StagingTopportunityFactory.create( opportunity_id=5, oppnumber="A-5", cfdas=[], last_upd_date=time2 @@ -109,8 +115,8 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact # this prevents some weirdness with the value comparison we'll do db_session.expire_all() - assert db_session.query(source_table).count() == 4 - assert db_session.query(destination_table).count() == 5 + assert db_session.query(source_table).count() == 5 + assert db_session.query(destination_table).count() == 6 destination_records = ( db_session.query(destination_table).order_by(destination_table.c.opportunity_id).all() @@ -123,13 +129,14 @@ def test_load_data(self, db_session, foreign_tables, staging_tables, enable_fact ) validate_copied_value(source_table, source_record4, destination_records[3]) validate_copied_value(source_table, None, destination_records[4], is_delete=True) + validate_copied_value(source_table, source_record5, destination_records[5]) assert task.metrics["count.delete.topportunity"] == 1 assert task.metrics["count.insert.topportunity"] == 2 - assert task.metrics["count.update.topportunity"] == 1 + assert task.metrics["count.update.topportunity"] == 2 assert task.metrics["count.delete.total"] == 1 assert task.metrics["count.insert.total"] == 2 - assert task.metrics["count.update.total"] == 1 + assert task.metrics["count.update.total"] == 2 def test_raises_if_table_dicts_different(self, db_session, foreign_tables, staging_tables): with pytest.raises( diff --git a/api/tests/src/data_migration/load/test_sql.py b/api/tests/src/data_migration/load/test_sql.py index ce6a156af..d980fe69c 100644 --- a/api/tests/src/data_migration/load/test_sql.py +++ b/api/tests/src/data_migration/load/test_sql.py @@ -22,6 +22,7 @@ def source_table(sqlalchemy_metadata): sqlalchemy.Column("id2", sqlalchemy.Integer, primary_key=True), sqlalchemy.Column("x", sqlalchemy.Text), sqlalchemy.Column("last_upd_date", sqlalchemy.TIMESTAMP), + sqlalchemy.Column("created_date", sqlalchemy.TIMESTAMP), ) @@ -35,6 +36,7 @@ def destination_table(sqlalchemy_metadata): sqlalchemy.Column("x", sqlalchemy.Text), sqlalchemy.Column("is_deleted", sqlalchemy.Boolean), sqlalchemy.Column("last_upd_date", sqlalchemy.TIMESTAMP), + sqlalchemy.Column("created_date", sqlalchemy.TIMESTAMP), ) @@ -59,7 +61,7 @@ def test_build_select_updated_rows_sql(source_table, destination_table): "JOIN test_source_table ON " "(test_destination_table.id1, test_destination_table.id2) = " "(test_source_table.id1, test_source_table.id2) \n" - "WHERE test_destination_table.last_upd_date < test_source_table.last_upd_date " + "WHERE coalesce(test_destination_table.last_upd_date, test_destination_table.created_date) < test_source_table.last_upd_date " "ORDER BY test_source_table.id1, test_source_table.id2" ) @@ -67,9 +69,9 @@ def test_build_select_updated_rows_sql(source_table, destination_table): def test_build_insert_select_sql(source_table, destination_table): insert = sql.build_insert_select_sql(source_table, destination_table, [(1, 2), (3, 4), (5, 6)]) assert str(insert) == ( - "INSERT INTO test_destination_table (id1, id2, x, last_upd_date, is_deleted) " + "INSERT INTO test_destination_table (id1, id2, x, last_upd_date, created_date, is_deleted) " "SELECT test_source_table.id1, test_source_table.id2, test_source_table.x, " - "test_source_table.last_upd_date, FALSE AS is_deleted \n" + "test_source_table.last_upd_date, test_source_table.created_date, FALSE AS is_deleted \n" "FROM test_source_table \n" "WHERE (test_source_table.id1, test_source_table.id2) IN (__[POSTCOMPILE_param_1])" ) @@ -80,8 +82,8 @@ def test_build_update_sql(source_table, destination_table): assert str(update) == ( "UPDATE test_destination_table " "SET id1=test_source_table.id1, id2=test_source_table.id2, x=test_source_table.x, " - "last_upd_date=test_source_table.last_upd_date FROM test_source_table " - "WHERE (test_destination_table.id1, test_destination_table.id2) = " + "last_upd_date=test_source_table.last_upd_date, created_date=test_source_table.created_date " + "FROM test_source_table WHERE (test_destination_table.id1, test_destination_table.id2) = " "(test_source_table.id1, test_source_table.id2) AND " "(test_source_table.id1, test_source_table.id2) " "IN (__[POSTCOMPILE_param_1])"