diff --git a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java index b896fcb4cf1019..b7bdd372cfded0 100644 --- a/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java +++ b/metadata-io/src/main/java/com/linkedin/metadata/entity/EntityServiceImpl.java @@ -1031,7 +1031,17 @@ This condition is specifically for an older conditional write ingestAspectIfNotP if (!upsertResults.isEmpty()) { // commit upserts prior to retention or kafka send, if supported by impl if (txContext != null) { - txContext.commitAndContinue(); + try { + txContext.commitAndContinue(); + } catch (EntityNotFoundException e) { + if (e.getMessage() != null + && e.getMessage().contains("No rows updated")) { + log.debug("Ignoring no rows updated condition for metadata update", e); + MetricUtils.counter(EntityServiceImpl.class, "no_rows_updated").inc(); + return TransactionResult.rollback(); + } + throw e; + } } // Retention optimization and tx diff --git a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java index 88f84ee94c8ee7..3620eb0c85f923 100644 --- a/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java +++ b/metadata-io/src/test/java/com/linkedin/metadata/entity/EbeanEntityServiceTest.java @@ -3,8 +3,21 @@ import static com.linkedin.metadata.Constants.CORP_USER_ENTITY_NAME; import static com.linkedin.metadata.Constants.STATUS_ASPECT_NAME; import static com.linkedin.metadata.entity.ebean.EbeanAspectDao.TX_ISOLATION; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.verify; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertTrue; @@ -41,6 +54,7 @@ import io.ebean.Database; import io.ebean.Transaction; import io.ebean.TxScope; +import jakarta.persistence.EntityNotFoundException; import java.net.URISyntaxException; import java.sql.Timestamp; import java.time.Instant; @@ -50,10 +64,11 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.commons.lang3.tuple.Triple; -import org.testng.Assert; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; @@ -112,14 +127,98 @@ public void setupTest() { null); } - /** - * Ideally, all tests would be in the base class, so they're reused between all implementations. - * When that's the case - test runner will ignore this class (and its base!) so we keep this dummy - * test to make sure this class will always be discovered. - */ @Test - public void obligatoryTest() throws AssertionError { - Assert.assertTrue(true); + public void testNoRowsUpdatedErrorHandling() throws Exception { + // Setup test data + Urn entityUrn = UrnUtils.getUrn("urn:li:corpuser:testUser"); + SystemMetadata systemMetadata = AspectGenerationUtils.createSystemMetadata(); + CorpUserInfo writeAspect = AspectGenerationUtils.createCorpUserInfo("email@test.com"); + String aspectName = PegasusUtils.getAspectNameFromSchema(writeAspect.schema()); + + // Create database and spy on aspectDao + Database server = EbeanTestUtils.createTestServer(EbeanEntityServiceTest.class.getSimpleName()); + EbeanAspectDao aspectDao = spy(new EbeanAspectDao(server, EbeanConfiguration.testDefault)); + + // Prevent actual saves + doNothing().when(aspectDao).saveAspect(any(), any(), anyBoolean()); + doReturn(0L) + .when(aspectDao) + .saveLatestAspect( + any(), + anyString(), + anyString(), + any(), + any(), + any(), + any(), + any(), + anyString(), + anyString(), + any(), + any(), + any(), + anyLong()); + + // Create spied transaction context that throws on commitAndContinue + AtomicReference capturedTxContext = new AtomicReference<>(); + AtomicReference> capturedResult = new AtomicReference<>(); + + doAnswer( + invocation -> { + Function> block = invocation.getArgument(0); + Integer maxTransactionRetry = invocation.getArgument(2); + + TransactionContext txContext = spy(TransactionContext.empty(maxTransactionRetry)); + capturedTxContext.set(txContext); + + doThrow(new EntityNotFoundException("No rows updated")) + .when(txContext) + .commitAndContinue(); + + TransactionResult result = block.apply(txContext); + capturedResult.set(result); + return result.getResults(); + }) + .when(aspectDao) + .runInTransactionWithRetry(any(), any(), anyInt()); + + // Create the service with our spied dao + PreProcessHooks preProcessHooks = new PreProcessHooks(); + preProcessHooks.setUiEnabled(false); + EntityServiceImpl entityService = + new EntityServiceImpl(aspectDao, _mockProducer, false, preProcessHooks, true); + + // Create the test batch + List items = + List.of( + ChangeItemImpl.builder() + .urn(entityUrn) + .aspectName(aspectName) + .recordTemplate(writeAspect) + .systemMetadata(systemMetadata) + .auditStamp(TEST_AUDIT_STAMP) + .build(TestOperationContexts.emptyActiveUsersAspectRetriever(null))); + + AspectsBatchImpl batch = + AspectsBatchImpl.builder() + .retrieverContext(opContext.getRetrieverContext()) + .items(items) + .build(); + + // Execute the test + List results = entityService.ingestAspects(opContext, batch, false, true); + + // Verify results + assertEquals(results.size(), 0, "Expected no results for rolled back transaction"); + + // Verify transaction behavior + verify(aspectDao).runInTransactionWithRetry(any(), eq(batch), anyInt()); + verify(capturedTxContext.get()).commitAndContinue(); + + // Verify the transaction result was a rollback + TransactionResult result = capturedResult.get(); + assertNotNull(result, "Expected a transaction result"); + assertFalse(result.isCommitOrRollback(), "Expected a rollback result"); } @Override