Skip to content

Commit

Permalink
Persist emitTime from IngestionTrackingContext to the new entity tabl…
Browse files Browse the repository at this point in the history
…es and use emitTime during backfill (#329)

* feat(backfill): Persist emitTime for the new schema as well as use the emitTime field when during backfill logic if available

* update docs

* Reset backfill to false when emitting MAE

* Address comments

* Comment

* Add nullable annotation

---------

Co-authored-by: Derek Pham <[email protected]>
  • Loading branch information
derekpham and Derek Pham authored Dec 12, 2023
1 parent 1472141 commit 5874c4f
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 113 deletions.
29 changes: 18 additions & 11 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -411,26 +411,29 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN

final ASPECT oldValue = latest.getAspect() == null ? null : latest.getAspect();
final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit();
final Long oldEmitTime = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getEmitTime();

boolean isBackfillEvent = trackingContext != null
&& trackingContext.hasBackfill() && trackingContext.isBackfill();
if (isBackfillEvent) {
boolean shouldBackfill =
// new value is being inserted. We should backfill
oldValue == null
// the time in old audit stamp represents last modified time of the aspect
// if the record doesn't exist, it will be null, which means we should process the record as normal
|| (
oldAuditStamp != null && oldAuditStamp.hasTime()
// ingestionTrackingContext if not null should always have emitTime. If emitTime doesn't exist within
// a non-null IngestionTrackingContext, it should be investigated. We'll also skip backfilling in this case
&& trackingContext.hasEmitTime()
// we should only process this backfilling event if the emit time is greater than last modified time
&& trackingContext.getEmitTime() > oldAuditStamp.getTime());

log.info("Encounter backfill event. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. "
// tracking context should ideally always have emitTime. If it's not present, we will skip backfilling
trackingContext.hasEmitTime()
&& (
// old emit time is available so we'll use it for comparison
// if new event emit time > old event emit time, we'll backfill
(oldEmitTime != null && trackingContext.getEmitTime() > oldEmitTime)
// old emit time is not available, so we'll fall back to comparing new emit time against old audit time
// old audit time represents the last modified time of the aspect
|| (oldEmitTime == null && oldAuditStamp != null && oldAuditStamp.hasTime() && trackingContext.getEmitTime() > oldAuditStamp.getTime())));

log.info("Encounter backfill event. Old value = null: {}. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. "
+ "Old emit time: {}. "
+ "Based on this information, shouldBackfill = {}.",
trackingContext, urn, aspectClass, oldAuditStamp, shouldBackfill);
oldValue == null, trackingContext, urn, aspectClass, oldAuditStamp, oldEmitTime, shouldBackfill);

if (!shouldBackfill) {
return new AddResult<>(oldValue, oldValue, aspectClass);
Expand Down Expand Up @@ -578,6 +581,10 @@ private <ASPECT extends RecordTemplate> ASPECT_UNION unwrapAddResultToUnion(URN

private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResult<ASPECT> result, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext) {
if (trackingContext != null) {
trackingContext.setBackfill(false); // reset backfill since MAE won't be a backfill event
}

Class<ASPECT> aspectClass = result.getKlass();
final ASPECT oldValue = result.getOldValue();
final ASPECT newValue = result.getNewValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ record AuditedAspect {
* This value is different from lastmodifiedon / the timestamp in AuditStamp since auditStamp
* is created when the restli resource receives the ingestion request.
* This is set by the MCE producers (or MCE consumers if not set by producers)
*
* This will be null in the following scenarios:
* - The record is from the old schema
* - The record was inserted before we started persisting emitTime to the new schema
* - The record was inserted via ingest instead of ingestWithTracking
*/
emitTime: optional long
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ record ListResultMetadata {
* This value is different from lastmodifiedon / the timestamp in AuditStamp since auditStamp
* is created when the restli resource receives the ingestion request.
* This is set by the MCE producers (or MCE consumers if not set by producers)
*
* This will be null in the following scenarios:
* - The record is from the old schema
* - The record was inserted before we started persisting emitTime to the new schema
* - The record was inserted via ingest instead of ingestWithTracking
*/
emitTime: optional long
}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
Expand Down Expand Up @@ -437,39 +438,55 @@ public Object[][] addBackfillForNoopCases() {
AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L);

// case 1 - emitTime doesn't exist
IngestionTrackingContext contextWithNoEmitTime = new IngestionTrackingContext();
contextWithNoEmitTime.setBackfill(true);

// case 2 - emitTime < old stamp
IngestionTrackingContext contextWithSmallEmitTime = new IngestionTrackingContext();
contextWithSmallEmitTime.setBackfill(true);
contextWithSmallEmitTime.setEmitTime(5L);
IngestionTrackingContext context1 = new IngestionTrackingContext();
context1.setBackfill(true);

// case 2 - new emit time < old emit time
IngestionTrackingContext context2 = new IngestionTrackingContext();
context2.setBackfill(true);
context2.setEmitTime(4L);
long oldEmitTime2 = 5L;

// case 3 - new emit time < old emit time (same as case 2, but old stamp < new emit time)
IngestionTrackingContext context3 = new IngestionTrackingContext();
context3.setBackfill(true);
context3.setEmitTime(10L);
long oldEmitTime3 = 11L;

// case 4 - old emit time = null, new emit time < old audit stamp
IngestionTrackingContext context4 = new IngestionTrackingContext();
context4.setBackfill(true);
context4.setEmitTime(3L);

return new Object[][] {
{ contextWithNoEmitTime, oldAuditStamp },
{ contextWithSmallEmitTime, oldAuditStamp }
{ context1, oldAuditStamp, null },
{ context2, oldAuditStamp, oldEmitTime2 },
{ context3, oldAuditStamp, oldEmitTime3 },
{ context4, oldAuditStamp, null }
};
}

@Test(description = "Each test case represents a scenario where a backfill event should NOT be backfilled",
dataProvider = "addBackfillForNoopCases")
public void testAddBackfillEmitTimeLargerThanOldAuditTime(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp
public void testAddForBackfillEventsWhenWeShouldNotDoBackfill(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp, Long oldEmitTime
) throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo oldFoo = new AspectFoo().setValue("oldFoo");
AspectFoo newFoo = new AspectFoo().setValue("newFoo");

ExtraInfo extraInfo = new ExtraInfo();
extraInfo.setAudit(oldAuditStamp);
extraInfo.setEmitTime(oldEmitTime, SetMode.IGNORE_NULL);

DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager,
_dummyLocalDAO._transactionRunner);
dummyLocalDAO.setEmitAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAuditEvent(true);
dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(makeAspectEntry(oldFoo, oldAuditStamp)));
BaseLocalDAO.AspectEntry<AspectFoo> aspectEntry = new BaseLocalDAO.AspectEntry<>(oldFoo, extraInfo);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(aspectEntry));

dummyLocalDAO.add(urn, newFoo, _dummyAuditStamp, ingestionTrackingContext);

Expand All @@ -479,27 +496,54 @@ public void testAddBackfillEmitTimeLargerThanOldAuditTime(
verifyNoMoreInteractions(_mockTrackingEventProducer);
}

@Test(description = "Event should be processed for backfill event")
public void testAddForBackfill() throws URISyntaxException {
@DataProvider(name = "addBackfillForCasesThatShouldBackfill")
public Object[][] addBackfillForCasesThatShouldBackfill() {
AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L);

// case 1 - emitTime exists and is larger than old emit time
IngestionTrackingContext context1 = new IngestionTrackingContext();
context1.setBackfill(true);
context1.setEmitTime(5L);
long oldEmitTime1 = 4L;

// case 2 - emitTime exists and is larger than old emit time
IngestionTrackingContext context2 = new IngestionTrackingContext();
context2.setBackfill(true);
context2.setEmitTime(10L);
long oldEmitTime2 = 4L;

// case 3 - emitTime exists, old emitTime doesn't exist, emitTime > old audit stamp
IngestionTrackingContext context3 = new IngestionTrackingContext();
context3.setBackfill(true);
context3.setEmitTime(7L);

return new Object[][] {
{ context1, oldAuditStamp, oldEmitTime1 },
{ context2, oldAuditStamp, oldEmitTime2 },
{ context3, oldAuditStamp, null }
};
}

@Test(description = "Event should be processed for backfill event", dataProvider = "addBackfillForCasesThatShouldBackfill")
public void testAddForBackfill(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp, Long oldEmitTime
) throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo oldFoo = new AspectFoo().setValue("oldFoo");
AspectFoo newFoo = new AspectFoo().setValue("newFoo");

ExtraInfo extraInfo = new ExtraInfo();
AuditStamp oldAuditStamp = makeAuditStamp("nonSusActor", 5L);
extraInfo.setAudit(oldAuditStamp);
extraInfo.setEmitTime(oldEmitTime, SetMode.IGNORE_NULL);

DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager,
_dummyLocalDAO._transactionRunner);
dummyLocalDAO.setEmitAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAuditEvent(true);
dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(makeAspectEntry(oldFoo, oldAuditStamp)));

IngestionTrackingContext ingestionTrackingContext = new IngestionTrackingContext();
ingestionTrackingContext.setBackfill(true);
ingestionTrackingContext.setEmitTime(6L);
BaseLocalDAO.AspectEntry<AspectFoo> aspectEntry = new BaseLocalDAO.AspectEntry<>(oldFoo, extraInfo);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(aspectEntry));

dummyLocalDAO.add(urn, newFoo, _dummyAuditStamp, ingestionTrackingContext);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.metadata.dao;

import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
Expand All @@ -15,6 +14,7 @@
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.dao.utils.SQLSchemaUtils;
import com.linkedin.metadata.dao.utils.SQLStatementUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.IndexFilter;
Expand Down Expand Up @@ -92,14 +92,18 @@ public void ensureSchemaUpToDate() {
@Override
@Transactional
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable UUID messageId) {
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, messageId);
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext) {
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext);
}

@Override
public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, @Nonnull Timestamp oldTimestamp,
@Nullable UUID messageId) {
public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
@Nonnull URN urn,
@Nullable ASPECT newValue,
@Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp,
@Nullable Timestamp oldTimestamp,
@Nullable IngestionTrackingContext ingestionTrackingContext) {

final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
Expand Down Expand Up @@ -151,6 +155,9 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN
.setLastmodifiedby(actor)
.setLastmodifiedon(new Timestamp(timestamp).toString())
.setCreatedfor(impersonator, SetMode.IGNORE_NULL);
if (ingestionTrackingContext != null) {
auditedAspect.setEmitTime(ingestionTrackingContext.getEmitTime(), SetMode.IGNORE_NULL);
}

final String metadata = toJsonString(auditedAspect);
return sqlUpdate.setParameter("metadata", metadata).execute();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.DataSchema;
Expand Down Expand Up @@ -760,11 +759,11 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
// aspect table will apply regular update over (urn, aspect, version) primary key combination.
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null;
// DUAL WRITE: 1) update aspect table, 2) update entity table.
// Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called.
_server.execute(oldSchemaSqlUpdate);
return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp, messageId);
return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp,
trackingContext);
}, 1);
} else {
// In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table
Expand All @@ -787,8 +786,7 @@ protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullabl
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
// the metadata entity tables shouldn't been updated.
UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null;
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, messageId);
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, trackingContext);
}

if (_changeLogEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.metadata.dao;

import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexGroupByCriterion;
import com.linkedin.metadata.query.IndexSortCriterion;
Expand All @@ -25,28 +25,32 @@ public interface IEbeanLocalAccess<URN extends Urn> {

/**
* Upsert aspect into entity table.
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param <ASPECT> metadata aspect value
*
* @param <ASPECT> metadata aspect value
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for this update
* @return number of rows inserted or updated
*/
<ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable UUID messageId);
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext);

/**
* Update aspect on entity table with optimistic locking. (compare-and-update on oldTimestamp).
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param oldTimestamp old time stamp for optimistic lock checking
* @param <ASPECT> metadata aspect value
*
* @param <ASPECT> metadata aspect value
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param oldTimestamp old time stamp for optimistic lock checking
* @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for calling this update
* @return number of rows inserted or updated
*/
<ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable UUID messageId);
@Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable IngestionTrackingContext ingestionTrackingContext);

/**
* Upsert relationships to the local relationship table(s).
Expand Down
Loading

0 comments on commit 5874c4f

Please sign in to comment.