From 1bf3ea702e148e9f4bf9f5f605d622ab117ca678 Mon Sep 17 00:00:00 2001 From: Derek Pham Date: Fri, 1 Dec 2023 12:16:43 -0800 Subject: [PATCH 1/6] feat(backfill): Persist emitTime for the new schema as well as use the emitTime field when during backfill logic if available --- .../linkedin/metadata/dao/BaseLocalDAO.java | 26 +++--- .../metadata/aspect/AuditedAspect.pdl | 5 ++ .../metadata/query/ListResultMetadata.pdl | 5 ++ .../metadata/dao/BaseLocalDAOTest.java | 84 ++++++++++++++----- .../metadata/dao/EbeanLocalAccess.java | 16 +++- .../linkedin/metadata/dao/EbeanLocalDAO.java | 4 +- .../metadata/dao/IEbeanLocalAccess.java | 17 ++-- 7 files changed, 113 insertions(+), 44 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index f56a7388b..260dea375 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -388,6 +388,7 @@ private AddResult addCommon(@Nonnull URN final ASPECT oldValue = latest.getAspect() == null ? null : latest.getAspect(); final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit(); + final Long oldEmitTime = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getEmitTime(); boolean isBackfillEvent = trackingContext != null && trackingContext.hasBackfill() && trackingContext.isBackfill(); @@ -395,19 +396,22 @@ private AddResult addCommon(@Nonnull URN boolean shouldBackfill = // new value is being inserted. We should backfill oldValue == null - // the time in old audit stamp represents last modified time of the aspect - // if the record doesn't exist, it will be null, which means we should process the record as normal || ( - oldAuditStamp != null && oldAuditStamp.hasTime() - // ingestionTrackingContext if not null should always have emitTime. If emitTime doesn't exist within - // a non-null IngestionTrackingContext, it should be investigated. We'll also skip backfilling in this case - && trackingContext.hasEmitTime() - // we should only process this backfilling event if the emit time is greater than last modified time - && trackingContext.getEmitTime() > oldAuditStamp.getTime()); - - log.info("Encounter backfill event. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. " + // ingestionTrackingContext if not null should always have emitTime. If emitTime doesn't exist within + // a non-null IngestionTrackingContext, it should be investigated. We'll also skip backfilling in this case + trackingContext.hasEmitTime() + && ( + // old emit time is available so we'll use it for comparison + // if new event emit time > old event emit time, we'll backfill + (oldEmitTime != null && trackingContext.getEmitTime() > oldEmitTime) + // old emit time is not available, so we'll fall back to comparing new emit time against old audit time + // old audit time represents the last modified time of the aspect + || (oldEmitTime == null && oldAuditStamp != null && oldAuditStamp.hasTime() && trackingContext.getEmitTime() > oldAuditStamp.getTime()))); + + log.info("Encounter backfill event. Old value = null: {}. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. " + + "Old emit time: {}. " + "Based on this information, shouldBackfill = {}.", - trackingContext, urn, aspectClass, oldAuditStamp, shouldBackfill); + oldValue == null, trackingContext, urn, aspectClass, oldAuditStamp, oldEmitTime, shouldBackfill); if (!shouldBackfill) { return new AddResult<>(oldValue, oldValue, aspectClass); diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl index b3e84df0d..19213b6d1 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl @@ -37,6 +37,11 @@ record AuditedAspect { * This value is different from lastmodifiedon / the timestamp in AuditStamp since auditStamp * is created when the restli resource receives the ingestion request. * This is set by the MCE producers (or MCE consumers if not set by producers) + * + * This will be null in the following scenarios: + * - The record is still on the old schema + * - The record inserted before we started persisting emitTime to the new schema + * - The record was inserted via ingest instead of ingestWithTracking */ emitTime: optional long } \ No newline at end of file diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl index 5d9381539..ace76439c 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl @@ -40,6 +40,11 @@ record ListResultMetadata { * This value is different from lastmodifiedon / the timestamp in AuditStamp since auditStamp * is created when the restli resource receives the ingestion request. * This is set by the MCE producers (or MCE consumers if not set by producers) + * + * This will be null in the following scenarios: + * - The record is still on the old schema + * - The record inserted before we started persisting emitTime to the new schema + * - The record was inserted via ingest instead of ingestWithTracking */ emitTime: optional long }] diff --git a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java index 6ebcdf477..91c382671 100644 --- a/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java +++ b/dao-api/src/test/java/com/linkedin/metadata/dao/BaseLocalDAOTest.java @@ -2,6 +2,7 @@ import com.linkedin.common.AuditStamp; import com.linkedin.data.template.RecordTemplate; +import com.linkedin.data.template.SetMode; import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer; import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer; @@ -437,24 +438,38 @@ public Object[][] addBackfillForNoopCases() { AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L); // case 1 - emitTime doesn't exist - IngestionTrackingContext contextWithNoEmitTime = new IngestionTrackingContext(); - contextWithNoEmitTime.setBackfill(true); - - // case 2 - emitTime < old stamp - IngestionTrackingContext contextWithSmallEmitTime = new IngestionTrackingContext(); - contextWithSmallEmitTime.setBackfill(true); - contextWithSmallEmitTime.setEmitTime(5L); + IngestionTrackingContext context1 = new IngestionTrackingContext(); + context1.setBackfill(true); + + // case 2 - new emit time < old emit time + IngestionTrackingContext context2 = new IngestionTrackingContext(); + context2.setBackfill(true); + context2.setEmitTime(4L); + long oldEmitTime2 = 5L; + + // case 3 - new emit time < old emit time (same as case 2, but old stamp < new emit time) + IngestionTrackingContext context3 = new IngestionTrackingContext(); + context3.setBackfill(true); + context3.setEmitTime(10L); + long oldEmitTime3 = 11L; + + // case 4 - old emit time = null, new emit time < old audit stamp + IngestionTrackingContext context4 = new IngestionTrackingContext(); + context4.setBackfill(true); + context4.setEmitTime(3L); return new Object[][] { - { contextWithNoEmitTime, oldAuditStamp }, - { contextWithSmallEmitTime, oldAuditStamp } + { context1, oldAuditStamp, null }, + { context2, oldAuditStamp, oldEmitTime2 }, + { context3, oldAuditStamp, oldEmitTime3 }, + { context4, oldAuditStamp, null } }; } @Test(description = "Each test case represents a scenario where a backfill event should NOT be backfilled", dataProvider = "addBackfillForNoopCases") - public void testAddBackfillEmitTimeLargerThanOldAuditTime( - IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp + public void testAddForBackfillEventsWhenWeShouldNotDoBackfill( + IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp, Long oldEmitTime ) throws URISyntaxException { FooUrn urn = new FooUrn(1); AspectFoo oldFoo = new AspectFoo().setValue("oldFoo"); @@ -462,6 +477,7 @@ public void testAddBackfillEmitTimeLargerThanOldAuditTime( ExtraInfo extraInfo = new ExtraInfo(); extraInfo.setAudit(oldAuditStamp); + extraInfo.setEmitTime(oldEmitTime, SetMode.IGNORE_NULL); DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager, _dummyLocalDAO._transactionRunner); @@ -469,7 +485,8 @@ public void testAddBackfillEmitTimeLargerThanOldAuditTime( dummyLocalDAO.setAlwaysEmitAuditEvent(true); dummyLocalDAO.setEmitAspectSpecificAuditEvent(true); dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true); - expectGetLatest(urn, AspectFoo.class, Collections.singletonList(makeAspectEntry(oldFoo, oldAuditStamp))); + BaseLocalDAO.AspectEntry aspectEntry = new BaseLocalDAO.AspectEntry<>(oldFoo, extraInfo); + expectGetLatest(urn, AspectFoo.class, Collections.singletonList(aspectEntry)); dummyLocalDAO.add(urn, newFoo, _dummyAuditStamp, ingestionTrackingContext); @@ -479,15 +496,45 @@ public void testAddBackfillEmitTimeLargerThanOldAuditTime( verifyNoMoreInteractions(_mockTrackingEventProducer); } - @Test(description = "Event should be processed for backfill event") - public void testAddForBackfill() throws URISyntaxException { + @DataProvider(name = "addBackfillForCasesThatShouldBackfill") + public Object[][] addBackfillForCasesThatShouldBackfill() { + AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L); + + // case 1 - emitTime exists and is larger than old emit time + IngestionTrackingContext context1 = new IngestionTrackingContext(); + context1.setBackfill(true); + context1.setEmitTime(5L); + long oldEmitTime1 = 4L; + + // case 2 - emitTime exists and is larger than old emit time + IngestionTrackingContext context2 = new IngestionTrackingContext(); + context2.setBackfill(true); + context2.setEmitTime(10L); + long oldEmitTime2 = 4L; + + // case 3 - emitTime exists, old emitTime doesn't exist, emitTime > old audit stamp + IngestionTrackingContext context3 = new IngestionTrackingContext(); + context3.setBackfill(true); + context3.setEmitTime(7L); + + return new Object[][] { + { context1, oldAuditStamp, oldEmitTime1 }, + { context2, oldAuditStamp, oldEmitTime2 }, + { context3, oldAuditStamp, null } + }; + } + + @Test(description = "Event should be processed for backfill event", dataProvider = "addBackfillForCasesThatShouldBackfill") + public void testAddForBackfill( + IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp, Long oldEmitTime + ) throws URISyntaxException { FooUrn urn = new FooUrn(1); AspectFoo oldFoo = new AspectFoo().setValue("oldFoo"); AspectFoo newFoo = new AspectFoo().setValue("newFoo"); ExtraInfo extraInfo = new ExtraInfo(); - AuditStamp oldAuditStamp = makeAuditStamp("nonSusActor", 5L); extraInfo.setAudit(oldAuditStamp); + extraInfo.setEmitTime(oldEmitTime, SetMode.IGNORE_NULL); DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager, _dummyLocalDAO._transactionRunner); @@ -495,11 +542,8 @@ public void testAddForBackfill() throws URISyntaxException { dummyLocalDAO.setAlwaysEmitAuditEvent(true); dummyLocalDAO.setEmitAspectSpecificAuditEvent(true); dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true); - expectGetLatest(urn, AspectFoo.class, Collections.singletonList(makeAspectEntry(oldFoo, oldAuditStamp))); - - IngestionTrackingContext ingestionTrackingContext = new IngestionTrackingContext(); - ingestionTrackingContext.setBackfill(true); - ingestionTrackingContext.setEmitTime(6L); + BaseLocalDAO.AspectEntry aspectEntry = new BaseLocalDAO.AspectEntry<>(oldFoo, extraInfo); + expectGetLatest(urn, AspectFoo.class, Collections.singletonList(aspectEntry)); dummyLocalDAO.add(urn, newFoo, _dummyAuditStamp, ingestionTrackingContext); diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 1f234508d..05f4dc02d 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -15,6 +15,7 @@ import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.dao.utils.SQLSchemaUtils; import com.linkedin.metadata.dao.utils.SQLStatementUtils; +import com.linkedin.metadata.events.IngestionTrackingContext; import com.linkedin.metadata.query.ExtraInfo; import com.linkedin.metadata.query.ExtraInfoArray; import com.linkedin.metadata.query.IndexFilter; @@ -93,13 +94,17 @@ public void ensureSchemaUpToDate() { @Transactional public int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, @Nullable UUID messageId) { - return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, messageId); + return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, null); } @Override - public int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue, - @Nonnull Class aspectClass, @Nonnull AuditStamp auditStamp, @Nonnull Timestamp oldTimestamp, - @Nullable UUID messageId) { + public int addWithOptimisticLocking( + @Nonnull URN urn, + @Nullable ASPECT newValue, + @Nonnull Class aspectClass, + @Nonnull AuditStamp auditStamp, + @Nullable Timestamp oldTimestamp, + @Nullable IngestionTrackingContext ingestionTrackingContext) { final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis(); final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR; @@ -151,6 +156,9 @@ public int addWithOptimisticLocking(@Nonnull URN .setLastmodifiedby(actor) .setLastmodifiedon(new Timestamp(timestamp).toString()) .setCreatedfor(impersonator, SetMode.IGNORE_NULL); + if (ingestionTrackingContext != null) { + auditedAspect.setEmitTime(ingestionTrackingContext.getEmitTime(), SetMode.IGNORE_NULL); + } final String metadata = toJsonString(auditedAspect); return sqlUpdate.setParameter("metadata", metadata).execute(); diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 39ba6a28c..6962d72cb 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -760,11 +760,11 @@ protected void updateWithOptimisticLocking(@Nonn // aspect table will apply regular update over (urn, aspect, version) primary key combination. oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null); numOfUpdatedRows = runInTransactionWithRetry(() -> { - UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null; // DUAL WRITE: 1) update aspect table, 2) update entity table. // Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called. _server.execute(oldSchemaSqlUpdate); - return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp, messageId); + return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp, + trackingContext); }, 1); } else { // In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 8efda39b2..723b248f8 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -7,6 +7,7 @@ import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates; import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry; import com.linkedin.metadata.dao.scsi.UrnPathExtractor; +import com.linkedin.metadata.events.IngestionTrackingContext; import com.linkedin.metadata.query.IndexFilter; import com.linkedin.metadata.query.IndexGroupByCriterion; import com.linkedin.metadata.query.IndexSortCriterion; @@ -37,16 +38,18 @@ int add(@Nonnull URN urn, @Nullable ASPECT newVa /** * Update aspect on entity table with optimistic locking. (compare-and-update on oldTimestamp). - * @param urn entity urn - * @param newValue aspect value in {@link RecordTemplate} - * @param aspectClass class of the aspect - * @param auditStamp audit timestamp - * @param oldTimestamp old time stamp for optimistic lock checking - * @param metadata aspect value + * + * @param metadata aspect value + * @param urn entity urn + * @param newValue aspect value in {@link RecordTemplate} + * @param aspectClass class of the aspect + * @param auditStamp audit timestamp + * @param oldTimestamp old time stamp for optimistic lock checking + * @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for calling this update * @return number of rows inserted or updated */ int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable UUID messageId); + @Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable IngestionTrackingContext ingestionTrackingContext); /** * Upsert relationships to the local relationship table(s). From 4c5926f3f2e6441505c2ae010541a018c64a2d42 Mon Sep 17 00:00:00 2001 From: Derek Pham Date: Mon, 4 Dec 2023 17:27:06 -0800 Subject: [PATCH 2/6] update docs --- .../main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl | 2 +- .../pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl index 19213b6d1..ae7d283e7 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl @@ -40,7 +40,7 @@ record AuditedAspect { * * This will be null in the following scenarios: * - The record is still on the old schema - * - The record inserted before we started persisting emitTime to the new schema + * - The record was inserted before we started persisting emitTime to the new schema * - The record was inserted via ingest instead of ingestWithTracking */ emitTime: optional long diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl index ace76439c..482b3cd0a 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl @@ -43,7 +43,7 @@ record ListResultMetadata { * * This will be null in the following scenarios: * - The record is still on the old schema - * - The record inserted before we started persisting emitTime to the new schema + * - The record was inserted before we started persisting emitTime to the new schema * - The record was inserted via ingest instead of ingestWithTracking */ emitTime: optional long From dcf30a188b364767f052252a52be2095f38f6516 Mon Sep 17 00:00:00 2001 From: Derek Pham Date: Tue, 5 Dec 2023 12:28:32 -0800 Subject: [PATCH 3/6] Reset backfill to false when emitting MAE --- .../src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 260dea375..45d14c554 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -559,6 +559,10 @@ private ASPECT_UNION unwrapAddResultToUnion(URN private ASPECT unwrapAddResult(URN urn, AddResult result, @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext trackingContext) { + if (trackingContext != null) { + trackingContext.setBackfill(false); // reset backfill since MAE won't be a backfill event + } + Class aspectClass = result.getKlass(); final ASPECT oldValue = result.getOldValue(); final ASPECT newValue = result.getNewValue(); From 2080708e7531f6a880a36d832920623ea0a73020 Mon Sep 17 00:00:00 2001 From: Derek Pham Date: Wed, 6 Dec 2023 21:34:02 -0800 Subject: [PATCH 4/6] Address comments --- .../linkedin/metadata/dao/BaseLocalDAO.java | 5 +- .../metadata/aspect/AuditedAspect.pdl | 2 +- .../metadata/query/ListResultMetadata.pdl | 2 +- .../metadata/dao/EbeanLocalAccess.java | 5 +- .../linkedin/metadata/dao/EbeanLocalDAO.java | 4 +- .../metadata/dao/IEbeanLocalAccess.java | 15 +++-- .../metadata/dao/EbeanLocalAccessTest.java | 41 ++++++------ .../EbeanLocalRelationshipQueryDAOTest.java | 66 +++++++++---------- 8 files changed, 65 insertions(+), 75 deletions(-) diff --git a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java index 45d14c554..a8b78914f 100644 --- a/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java +++ b/dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java @@ -397,8 +397,7 @@ private AddResult addCommon(@Nonnull URN // new value is being inserted. We should backfill oldValue == null || ( - // ingestionTrackingContext if not null should always have emitTime. If emitTime doesn't exist within - // a non-null IngestionTrackingContext, it should be investigated. We'll also skip backfilling in this case + // tracking context should ideally always have emitTime. If it's not present, we will skip backfilling trackingContext.hasEmitTime() && ( // old emit time is available so we'll use it for comparison @@ -408,7 +407,7 @@ private AddResult addCommon(@Nonnull URN // old audit time represents the last modified time of the aspect || (oldEmitTime == null && oldAuditStamp != null && oldAuditStamp.hasTime() && trackingContext.getEmitTime() > oldAuditStamp.getTime()))); - log.info("Encounter backfill event. Old value = null: {}. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. " + log.info("Encounter backfill event. Old value = null: {}. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. " + "Old emit time: {}. " + "Based on this information, shouldBackfill = {}.", oldValue == null, trackingContext, urn, aspectClass, oldAuditStamp, oldEmitTime, shouldBackfill); diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl index ae7d283e7..af8d29170 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl @@ -39,7 +39,7 @@ record AuditedAspect { * This is set by the MCE producers (or MCE consumers if not set by producers) * * This will be null in the following scenarios: - * - The record is still on the old schema + * - The record is still from the old schema * - The record was inserted before we started persisting emitTime to the new schema * - The record was inserted via ingest instead of ingestWithTracking */ diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl index 482b3cd0a..1b993888f 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/query/ListResultMetadata.pdl @@ -42,7 +42,7 @@ record ListResultMetadata { * This is set by the MCE producers (or MCE consumers if not set by producers) * * This will be null in the following scenarios: - * - The record is still on the old schema + * - The record is from the old schema * - The record was inserted before we started persisting emitTime to the new schema * - The record was inserted via ingest instead of ingestWithTracking */ diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 05f4dc02d..77ac822b1 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -1,6 +1,5 @@ package com.linkedin.metadata.dao; -import com.linkedin.avro2pegasus.events.UUID; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; @@ -93,8 +92,8 @@ public void ensureSchemaUpToDate() { @Override @Transactional public int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, @Nullable UUID messageId) { - return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, null); + @Nonnull AuditStamp auditStamp, IngestionTrackingContext ingestionTrackingContext) { + return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext); } @Override diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 6962d72cb..a3410f398 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -1,7 +1,6 @@ package com.linkedin.metadata.dao; import com.google.common.annotations.VisibleForTesting; -import com.linkedin.avro2pegasus.events.UUID; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.schema.DataSchema; @@ -787,8 +786,7 @@ protected void insert(@Nonnull URN urn, @Nullabl if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) { // insert() could be called when updating log table (moving current versions into new history version) // the metadata entity tables shouldn't been updated. - UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null; - _localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, messageId); + _localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, trackingContext); } if (_changeLogEnabled) { diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java index 723b248f8..37cf4374d 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java @@ -1,6 +1,5 @@ package com.linkedin.metadata.dao; -import com.linkedin.avro2pegasus.events.UUID; import com.linkedin.common.AuditStamp; import com.linkedin.common.urn.Urn; import com.linkedin.data.template.RecordTemplate; @@ -26,15 +25,17 @@ public interface IEbeanLocalAccess { /** * Upsert aspect into entity table. - * @param urn entity urn - * @param newValue aspect value in {@link RecordTemplate} - * @param aspectClass class of the aspect - * @param auditStamp audit timestamp - * @param metadata aspect value + * + * @param metadata aspect value + * @param urn entity urn + * @param newValue aspect value in {@link RecordTemplate} + * @param aspectClass class of the aspect + * @param auditStamp audit timestamp + * @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for this update * @return number of rows inserted or updated */ int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, @Nullable UUID messageId); + @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext); /** * Update aspect on entity table with optimistic locking. (compare-and-update on oldTimestamp). diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java index 66ca1eff0..cffade17d 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalAccessTest.java @@ -1,14 +1,12 @@ package com.linkedin.metadata.dao; import com.google.common.io.Resources; -import com.linkedin.avro2pegasus.events.UUID; import com.linkedin.common.AuditStamp; -import com.linkedin.data.ByteString; import com.linkedin.metadata.dao.localrelationship.SampleLocalRelationshipRegistryImpl; import com.linkedin.metadata.dao.scsi.EmptyPathExtractor; import com.linkedin.metadata.dao.utils.BarUrnPathExtractor; -import com.linkedin.metadata.dao.utils.FooUrnPathExtractor; import com.linkedin.metadata.dao.utils.EmbeddedMariaInstance; +import com.linkedin.metadata.dao.utils.FooUrnPathExtractor; import com.linkedin.metadata.dao.utils.RecordUtils; import com.linkedin.metadata.dao.utils.SQLIndexFilterUtils; import com.linkedin.metadata.query.Condition; @@ -48,7 +46,9 @@ import static com.linkedin.testing.TestUtils.*; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; -import static org.testng.AssertJUnit.*; +import static org.testng.AssertJUnit.assertFalse; +import static org.testng.AssertJUnit.assertNull; +import static org.testng.AssertJUnit.assertTrue; public class EbeanLocalAccessTest { private static EbeanServer _server; @@ -57,7 +57,6 @@ public class EbeanLocalAccessTest { private static IEbeanLocalAccess _ebeanLocalAccessBurger; private static long _now; private static final LocalRelationshipFilter EMPTY_FILTER = new LocalRelationshipFilter().setCriteria(new LocalRelationshipCriterionArray()); - private static final byte[] UUID = {0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1}; @BeforeClass public void init() { @@ -84,7 +83,7 @@ public void setupTest() throws IOException { AspectFoo aspectFoo = new AspectFoo(); aspectFoo.setValue(String.valueOf(i)); AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis()); - _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, null); } } @@ -285,7 +284,7 @@ public void testCountAggregate() { AspectFoo aspectFoo = new AspectFoo(); aspectFoo.setValue(String.valueOf(25)); AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis()); - _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(fooUrn, aspectFoo, AspectFoo.class, auditStamp, null); countMap = _ebeanLocalAccessFoo.countAggregate(indexFilter, indexGroupByCriterion); // Expect: there are 2 counts for value 25 @@ -299,7 +298,7 @@ public void testEscapeSpecialCharInUrn() { // Single quote is a special char in SQL. BurgerUrn johnsBurgerUrn1 = makeBurgerUrn("urn:li:burger:John's burger"); - _ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessBurger.add(johnsBurgerUrn1, aspectFoo, AspectFoo.class, auditStamp, null); AspectKey aspectKey1 = new AspectKey(AspectFoo.class, johnsBurgerUrn1, 0L); List ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey1), 1, 0, false); @@ -308,7 +307,7 @@ public void testEscapeSpecialCharInUrn() { // Double quote is a special char in SQL. BurgerUrn johnsBurgerUrn2 = makeBurgerUrn("urn:li:burger:John\"s burger"); - _ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessBurger.add(johnsBurgerUrn2, aspectFoo, AspectFoo.class, auditStamp, null); AspectKey aspectKey2 = new AspectKey(AspectFoo.class, johnsBurgerUrn2, 0L); ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey2), 1, 0, false); @@ -317,7 +316,7 @@ public void testEscapeSpecialCharInUrn() { // Backslash is a special char in SQL. BurgerUrn johnsBurgerUrn3 = makeBurgerUrn("urn:li:burger:John\\s burger"); - _ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessBurger.add(johnsBurgerUrn3, aspectFoo, AspectFoo.class, auditStamp, null); AspectKey aspectKey3 = new AspectKey(AspectFoo.class, johnsBurgerUrn3, 0L); ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey3), 1, 0, false); @@ -334,10 +333,10 @@ public void testAddWithLocalRelationshipBuilder() throws URISyntaxException { AspectFooBar aspectFooBar = new AspectFooBar().setBars(new BarUrnArray(barUrn1, barUrn2, barUrn3)); AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis()); - _ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp, new UUID(ByteString.copy(UUID))); - _ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); - _ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); - _ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp, null); + _ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp, null); // Verify local relationships and entity are added. EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server); @@ -364,7 +363,7 @@ public void testAtomicityWithLocalRelationshipBuilder() throws URISyntaxExceptio AuditStamp auditStamp = makeAuditStamp("foo", System.currentTimeMillis()); try { - _ebeanLocalAccessFoo.add(makeFooUrn(1), aspectFooBar, AspectFooBar.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(makeFooUrn(1), aspectFooBar, AspectFooBar.class, auditStamp, null); } catch (Exception exception) { // Verify no relationship is added. List relationships = _server.createSqlQuery("SELECT * FROM metadata_relationship_belongsto").findList(); @@ -376,7 +375,7 @@ public void testAtomicityWithLocalRelationshipBuilder() throws URISyntaxExceptio public void testUrnExtraction() { FooUrn urn1 = makeFooUrn(1); AspectFoo foo1 = new AspectFoo().setValue("foo"); - _ebeanLocalAccessFoo.add(urn1, foo1, AspectFoo.class, makeAuditStamp("actor", _now), new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(urn1, foo1, AspectFoo.class, makeAuditStamp("actor", _now), null); // get content of virtual column List results = _server.createSqlQuery("SELECT i_urn$fooId as id FROM metadata_entity_foo").findList(); @@ -398,10 +397,10 @@ public void testAddRelationships() throws URISyntaxException { // Turn off local relationship ingestion first, to fill only the entity tables. _ebeanLocalAccessFoo.setLocalRelationshipBuilderRegistry(null); - _ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp, new UUID(ByteString.copy(UUID))); - _ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); - _ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); - _ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp, new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(fooUrn, aspectFooBar, AspectFooBar.class, auditStamp, null); + _ebeanLocalAccessBar.add(barUrn1, new AspectFoo().setValue("1"), AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBar.add(barUrn2, new AspectFoo().setValue("2"), AspectFoo.class, auditStamp, null); + _ebeanLocalAccessBar.add(barUrn3, new AspectFoo().setValue("3"), AspectFoo.class, auditStamp, null); // Verify that NO local relationships were added EbeanLocalRelationshipQueryDAO ebeanLocalRelationshipQueryDAO = new EbeanLocalRelationshipQueryDAO(_server); @@ -459,7 +458,7 @@ public void testFindLatestMetadataAspect() throws URISyntaxException { @Test public void testGetAspectNoSoftDeleteCheck() { FooUrn fooUrn = makeFooUrn(0); - _ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis()), new UUID(ByteString.copy(UUID))); + _ebeanLocalAccessFoo.add(fooUrn, null, AspectFoo.class, makeAuditStamp("foo", System.currentTimeMillis()), null); AspectKey aspectKey = new AspectKey(AspectFoo.class, fooUrn, 0L); List ebeanMetadataAspectList = _ebeanLocalAccessFoo.batchGetUnion(Collections.singletonList(aspectKey), 1000, 0, false); diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java index d05bbac06..99c0c1b63 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/localrelationship/EbeanLocalRelationshipQueryDAOTest.java @@ -2,9 +2,7 @@ import com.google.common.collect.ImmutableSet; import com.google.common.io.Resources; -import com.linkedin.avro2pegasus.events.UUID; import com.linkedin.common.AuditStamp; -import com.linkedin.data.ByteString; import com.linkedin.data.template.RecordTemplate; import com.linkedin.data.template.StringArray; import com.linkedin.metadata.dao.EbeanLocalAccess; @@ -57,7 +55,6 @@ public class EbeanLocalRelationshipQueryDAOTest { private EbeanLocalRelationshipQueryDAO _localRelationshipQueryDAO; private IEbeanLocalAccess _fooUrnEBeanLocalAccess; private IEbeanLocalAccess _barUrnEBeanLocalAccess; - private static final byte[] UUID = {0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1}; @BeforeClass public void init() { @@ -79,7 +76,7 @@ public void recreateTables() throws IOException { @Test public void testFindOneEntity() throws URISyntaxException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); // Prepare filter AspectField aspectField = new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"); @@ -102,8 +99,8 @@ public void testFindOneEntity() throws URISyntaxException { @Test public void testFindOneEntityTwoAspects() throws URISyntaxException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectBar().setValue("bar"), AspectBar.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectBar().setValue("bar"), AspectBar.class, new AuditStamp(), null); // Prepare filter AspectField aspectField = new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"); @@ -138,9 +135,9 @@ public void testFindOneRelationship() throws Exception { FooUrn jack = new FooUrn(3); // Add Alice, Bob and Jack into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); // Add Bob reports-to ALice relationship ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice); @@ -182,13 +179,14 @@ public void testFindOneRelationshipWithFilter() throws Exception { BarUrn samza = new BarUrn(2); // Add Kafka_Topic, HDFS_Dataset and Restli_Service into entity tables. - _fooUrnEBeanLocalAccess.add(kafka, new AspectFoo().setValue("Kafka_Topic"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(hdfs, new AspectFoo().setValue("HDFS_Dataset"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(restli, new AspectFoo().setValue("Restli_Service"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(kafka, new AspectFoo().setValue("Kafka_Topic"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(hdfs, new AspectFoo().setValue("HDFS_Dataset"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(restli, new AspectFoo().setValue("Restli_Service"), AspectFoo.class, new AuditStamp(), + null); // Add Spark and Samza into entity tables. - _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _barUrnEBeanLocalAccess.add(spark, new AspectFoo().setValue("Spark"), AspectFoo.class, new AuditStamp(), null); + _barUrnEBeanLocalAccess.add(samza, new AspectFoo().setValue("Samza"), AspectFoo.class, new AuditStamp(), null); // Add Spark consume-from hdfs relationship ConsumeFrom sparkConsumeFromHdfs = new ConsumeFrom().setSource(spark).setDestination(hdfs).setEnvironment(EnvorinmentType.OFFLINE); @@ -256,9 +254,9 @@ public void testFindEntitiesOneHopAwayIncomingDirection() throws Exception { FooUrn jack = new FooUrn(3); // Add Alice, Bob and Jack into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); // Add Bob reports-to ALice relationship ReportsTo bobReportsToAlice = new ReportsTo().setSource(bob).setDestination(alice); @@ -309,12 +307,12 @@ public void testFindEntitiesOneHopAwayOutgoingDirection() throws Exception { BarUrn mit = new BarUrn(2); // Add Alice and Bob into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); // Add Stanford and MIT into entity tables. - _barUrnEBeanLocalAccess.add(stanford, new AspectFoo().setValue("Stanford"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _barUrnEBeanLocalAccess.add(mit, new AspectFoo().setValue("MIT"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _barUrnEBeanLocalAccess.add(stanford, new AspectFoo().setValue("Stanford"), AspectFoo.class, new AuditStamp(), null); + _barUrnEBeanLocalAccess.add(mit, new AspectFoo().setValue("MIT"), AspectFoo.class, new AuditStamp(), null); // Add Alice belongs to MIT and Stanford. BelongsTo aliceBelongsToMit = new BelongsTo().setSource(alice).setDestination(mit); @@ -369,22 +367,18 @@ public void testFindEntitiesOneHopAwayUndirected() throws Exception { FooUrn john = new FooUrn(4); // Add Alice, Bob, Jack and John into entity tables. - _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); - _fooUrnEBeanLocalAccess.add(john, new AspectFoo().setValue("John"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(alice, new AspectFoo().setValue("Alice"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(bob, new AspectFoo().setValue("Bob"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(jack, new AspectFoo().setValue("Jack"), AspectFoo.class, new AuditStamp(), null); + _fooUrnEBeanLocalAccess.add(john, new AspectFoo().setValue("John"), AspectFoo.class, new AuditStamp(), null); - _fooUrnEBeanLocalAccess.add(alice, new AspectBar().setValue("32"), AspectBar.class, new AuditStamp(), - new UUID(ByteString.copy(UUID))); // Alice 32 years old + _fooUrnEBeanLocalAccess.add(alice, new AspectBar().setValue("32"), AspectBar.class, new AuditStamp(), null); // Alice 32 years old - _fooUrnEBeanLocalAccess.add(bob, new AspectBar().setValue("52"), AspectBar.class, new AuditStamp(), - new UUID(ByteString.copy(UUID))); // Bob 52 years old + _fooUrnEBeanLocalAccess.add(bob, new AspectBar().setValue("52"), AspectBar.class, new AuditStamp(), null); // Bob 52 years old - _fooUrnEBeanLocalAccess.add(jack, new AspectBar().setValue("16"), AspectBar.class, new AuditStamp(), - new UUID(ByteString.copy(UUID))); // Jack 16 years old + _fooUrnEBeanLocalAccess.add(jack, new AspectBar().setValue("16"), AspectBar.class, new AuditStamp(), null); // Jack 16 years old - _fooUrnEBeanLocalAccess.add(john, new AspectBar().setValue("42"), AspectBar.class, new AuditStamp(), - new UUID(ByteString.copy(UUID))); // John 42 years old + _fooUrnEBeanLocalAccess.add(john, new AspectBar().setValue("42"), AspectBar.class, new AuditStamp(), null); // John 42 years old // Add Alice pair-with Jack relationships. Alice --> Jack. PairsWith alicePairsWithJack = new PairsWith().setSource(alice).setDestination(jack); @@ -449,7 +443,7 @@ public void testFindEntitiesOneHopAwayUndirected() throws Exception { @Test public void testFindOneEntityWithInCondition() throws URISyntaxException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); // Prepare filter AspectField aspectField = new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"); @@ -472,7 +466,7 @@ public void testFindOneEntityWithInCondition() throws URISyntaxException { @Test public void testFindNoEntityWithInCondition() throws URISyntaxException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); // Prepare filter AspectField aspectField = new AspectField().setAspect(AspectFoo.class.getCanonicalName()).setPath("/value"); @@ -493,7 +487,7 @@ public void testFindNoEntityWithInCondition() throws URISyntaxException { @Test public void testFindEntitiesWithEmptyRelationshipFilter() throws URISyntaxException { // Ingest data - _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), new UUID(ByteString.copy(UUID))); + _fooUrnEBeanLocalAccess.add(new FooUrn(1), new AspectFoo().setValue("foo"), AspectFoo.class, new AuditStamp(), null); // Create empty filter LocalRelationshipFilter emptyFilter = new LocalRelationshipFilter(); From f64f476239ed406770fc2cdc51d2f0ed07933e9d Mon Sep 17 00:00:00 2001 From: Derek Pham Date: Wed, 6 Dec 2023 21:35:51 -0800 Subject: [PATCH 5/6] Comment --- .../main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl index af8d29170..54edf21e2 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/aspect/AuditedAspect.pdl @@ -39,7 +39,7 @@ record AuditedAspect { * This is set by the MCE producers (or MCE consumers if not set by producers) * * This will be null in the following scenarios: - * - The record is still from the old schema + * - The record is from the old schema * - The record was inserted before we started persisting emitTime to the new schema * - The record was inserted via ingest instead of ingestWithTracking */ From 3c3adb6d90d58e5adbcd6e2bbd4858099da7e157 Mon Sep 17 00:00:00 2001 From: Derek Pham Date: Tue, 12 Dec 2023 10:18:46 -0800 Subject: [PATCH 6/6] Add nullable annotation --- .../main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java index 77ac822b1..265d1bbec 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java @@ -92,7 +92,7 @@ public void ensureSchemaUpToDate() { @Override @Transactional public int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class aspectClass, - @Nonnull AuditStamp auditStamp, IngestionTrackingContext ingestionTrackingContext) { + @Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext) { return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext); }