Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Persist emitTime from IngestionTrackingContext to the new entity tables and use emitTime during backfill #329

Merged
merged 6 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 18 additions & 11 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -388,26 +388,29 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN

final ASPECT oldValue = latest.getAspect() == null ? null : latest.getAspect();
final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit();
final Long oldEmitTime = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getEmitTime();

boolean isBackfillEvent = trackingContext != null
&& trackingContext.hasBackfill() && trackingContext.isBackfill();
if (isBackfillEvent) {
boolean shouldBackfill =
// new value is being inserted. We should backfill
oldValue == null
// the time in old audit stamp represents last modified time of the aspect
// if the record doesn't exist, it will be null, which means we should process the record as normal
|| (
oldAuditStamp != null && oldAuditStamp.hasTime()
// ingestionTrackingContext if not null should always have emitTime. If emitTime doesn't exist within
// a non-null IngestionTrackingContext, it should be investigated. We'll also skip backfilling in this case
&& trackingContext.hasEmitTime()
// we should only process this backfilling event if the emit time is greater than last modified time
&& trackingContext.getEmitTime() > oldAuditStamp.getTime());

log.info("Encounter backfill event. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. "
// tracking context should ideally always have emitTime. If it's not present, we will skip backfilling
trackingContext.hasEmitTime()
&& (
// old emit time is available so we'll use it for comparison
// if new event emit time > old event emit time, we'll backfill
(oldEmitTime != null && trackingContext.getEmitTime() > oldEmitTime)
// old emit time is not available, so we'll fall back to comparing new emit time against old audit time
// old audit time represents the last modified time of the aspect
|| (oldEmitTime == null && oldAuditStamp != null && oldAuditStamp.hasTime() && trackingContext.getEmitTime() > oldAuditStamp.getTime())));

log.info("Encounter backfill event. Old value = null: {}. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. "
+ "Old emit time: {}. "
+ "Based on this information, shouldBackfill = {}.",
trackingContext, urn, aspectClass, oldAuditStamp, shouldBackfill);
oldValue == null, trackingContext, urn, aspectClass, oldAuditStamp, oldEmitTime, shouldBackfill);

if (!shouldBackfill) {
return new AddResult<>(oldValue, oldValue, aspectClass);
Expand Down Expand Up @@ -555,6 +558,10 @@ private <ASPECT extends RecordTemplate> ASPECT_UNION unwrapAddResultToUnion(URN

private <ASPECT extends RecordTemplate> ASPECT unwrapAddResult(URN urn, AddResult<ASPECT> result, @Nonnull AuditStamp auditStamp,
@Nullable IngestionTrackingContext trackingContext) {
if (trackingContext != null) {
trackingContext.setBackfill(false); // reset backfill since MAE won't be a backfill event
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in what scenario trackingContext will be null and we want to enforce it's setBackfill to be false?

Can we log such as enforcement activity to capture unexpected cases?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's null when someone calls ingest instead of ingestWithTracking. I can't think of a case where we don't want to set backfill to false.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

^^ this isBackfill field in the ingestionTrackingContext is ONLY used for MCE backfills right? I believe Jinxin added a separate field for MAE backfill. If that's the case then yeah we always want to set it to false during MAE emission. But I hope that this is clear enough for ourselves and future devs...might be a little confusing to have 2 backfill related fields in an MAE.

}

Class<ASPECT> aspectClass = result.getKlass();
final ASPECT oldValue = result.getOldValue();
final ASPECT newValue = result.getNewValue();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ record AuditedAspect {
* This value is different from lastmodifiedon / the timestamp in AuditStamp since auditStamp
* is created when the restli resource receives the ingestion request.
* This is set by the MCE producers (or MCE consumers if not set by producers)
*
* This will be null in the following scenarios:
* - The record is from the old schema
* - The record was inserted before we started persisting emitTime to the new schema
* - The record was inserted via ingest instead of ingestWithTracking
*/
emitTime: optional long
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ record ListResultMetadata {
* This value is different from lastmodifiedon / the timestamp in AuditStamp since auditStamp
* is created when the restli resource receives the ingestion request.
* This is set by the MCE producers (or MCE consumers if not set by producers)
*
* This will be null in the following scenarios:
* - The record is from the old schema
* - The record was inserted before we started persisting emitTime to the new schema
* - The record was inserted via ingest instead of ingestWithTracking
*/
emitTime: optional long
}]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.data.template.SetMode;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.producer.BaseMetadataEventProducer;
import com.linkedin.metadata.dao.producer.BaseTrackingMetadataEventProducer;
Expand Down Expand Up @@ -437,39 +438,55 @@ public Object[][] addBackfillForNoopCases() {
AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L);

// case 1 - emitTime doesn't exist
IngestionTrackingContext contextWithNoEmitTime = new IngestionTrackingContext();
contextWithNoEmitTime.setBackfill(true);

// case 2 - emitTime < old stamp
IngestionTrackingContext contextWithSmallEmitTime = new IngestionTrackingContext();
contextWithSmallEmitTime.setBackfill(true);
contextWithSmallEmitTime.setEmitTime(5L);
IngestionTrackingContext context1 = new IngestionTrackingContext();
context1.setBackfill(true);

// case 2 - new emit time < old emit time
IngestionTrackingContext context2 = new IngestionTrackingContext();
context2.setBackfill(true);
context2.setEmitTime(4L);
long oldEmitTime2 = 5L;

// case 3 - new emit time < old emit time (same as case 2, but old stamp < new emit time)
IngestionTrackingContext context3 = new IngestionTrackingContext();
context3.setBackfill(true);
context3.setEmitTime(10L);
long oldEmitTime3 = 11L;

// case 4 - old emit time = null, new emit time < old audit stamp
IngestionTrackingContext context4 = new IngestionTrackingContext();
context4.setBackfill(true);
context4.setEmitTime(3L);

return new Object[][] {
{ contextWithNoEmitTime, oldAuditStamp },
{ contextWithSmallEmitTime, oldAuditStamp }
{ context1, oldAuditStamp, null },
{ context2, oldAuditStamp, oldEmitTime2 },
{ context3, oldAuditStamp, oldEmitTime3 },
{ context4, oldAuditStamp, null }
};
}

@Test(description = "Each test case represents a scenario where a backfill event should NOT be backfilled",
dataProvider = "addBackfillForNoopCases")
public void testAddBackfillEmitTimeLargerThanOldAuditTime(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp
public void testAddForBackfillEventsWhenWeShouldNotDoBackfill(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp, Long oldEmitTime
) throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo oldFoo = new AspectFoo().setValue("oldFoo");
AspectFoo newFoo = new AspectFoo().setValue("newFoo");

ExtraInfo extraInfo = new ExtraInfo();
extraInfo.setAudit(oldAuditStamp);
extraInfo.setEmitTime(oldEmitTime, SetMode.IGNORE_NULL);

DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager,
_dummyLocalDAO._transactionRunner);
dummyLocalDAO.setEmitAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAuditEvent(true);
dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(makeAspectEntry(oldFoo, oldAuditStamp)));
BaseLocalDAO.AspectEntry<AspectFoo> aspectEntry = new BaseLocalDAO.AspectEntry<>(oldFoo, extraInfo);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(aspectEntry));

dummyLocalDAO.add(urn, newFoo, _dummyAuditStamp, ingestionTrackingContext);

Expand All @@ -479,27 +496,54 @@ public void testAddBackfillEmitTimeLargerThanOldAuditTime(
verifyNoMoreInteractions(_mockTrackingEventProducer);
}

@Test(description = "Event should be processed for backfill event")
public void testAddForBackfill() throws URISyntaxException {
@DataProvider(name = "addBackfillForCasesThatShouldBackfill")
public Object[][] addBackfillForCasesThatShouldBackfill() {
AuditStamp oldAuditStamp = makeAuditStamp("susActor", 6L);

// case 1 - emitTime exists and is larger than old emit time
IngestionTrackingContext context1 = new IngestionTrackingContext();
context1.setBackfill(true);
context1.setEmitTime(5L);
long oldEmitTime1 = 4L;

// case 2 - emitTime exists and is larger than old emit time
IngestionTrackingContext context2 = new IngestionTrackingContext();
context2.setBackfill(true);
context2.setEmitTime(10L);
long oldEmitTime2 = 4L;

// case 3 - emitTime exists, old emitTime doesn't exist, emitTime > old audit stamp
IngestionTrackingContext context3 = new IngestionTrackingContext();
context3.setBackfill(true);
context3.setEmitTime(7L);

return new Object[][] {
{ context1, oldAuditStamp, oldEmitTime1 },
{ context2, oldAuditStamp, oldEmitTime2 },
{ context3, oldAuditStamp, null }
};
}

@Test(description = "Event should be processed for backfill event", dataProvider = "addBackfillForCasesThatShouldBackfill")
public void testAddForBackfill(
IngestionTrackingContext ingestionTrackingContext, AuditStamp oldAuditStamp, Long oldEmitTime
) throws URISyntaxException {
FooUrn urn = new FooUrn(1);
AspectFoo oldFoo = new AspectFoo().setValue("oldFoo");
AspectFoo newFoo = new AspectFoo().setValue("newFoo");

ExtraInfo extraInfo = new ExtraInfo();
AuditStamp oldAuditStamp = makeAuditStamp("nonSusActor", 5L);
extraInfo.setAudit(oldAuditStamp);
extraInfo.setEmitTime(oldEmitTime, SetMode.IGNORE_NULL);

DummyLocalDAO dummyLocalDAO = new DummyLocalDAO(_mockGetLatestFunction, _mockTrackingEventProducer, _mockTrackingManager,
_dummyLocalDAO._transactionRunner);
dummyLocalDAO.setEmitAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAuditEvent(true);
dummyLocalDAO.setEmitAspectSpecificAuditEvent(true);
dummyLocalDAO.setAlwaysEmitAspectSpecificAuditEvent(true);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(makeAspectEntry(oldFoo, oldAuditStamp)));

IngestionTrackingContext ingestionTrackingContext = new IngestionTrackingContext();
ingestionTrackingContext.setBackfill(true);
ingestionTrackingContext.setEmitTime(6L);
BaseLocalDAO.AspectEntry<AspectFoo> aspectEntry = new BaseLocalDAO.AspectEntry<>(oldFoo, extraInfo);
expectGetLatest(urn, AspectFoo.class, Collections.singletonList(aspectEntry));

dummyLocalDAO.add(urn, newFoo, _dummyAuditStamp, ingestionTrackingContext);

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.linkedin.metadata.dao;

import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
Expand All @@ -15,6 +14,7 @@
import com.linkedin.metadata.dao.utils.RecordUtils;
import com.linkedin.metadata.dao.utils.SQLSchemaUtils;
import com.linkedin.metadata.dao.utils.SQLStatementUtils;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.ExtraInfo;
import com.linkedin.metadata.query.ExtraInfoArray;
import com.linkedin.metadata.query.IndexFilter;
Expand Down Expand Up @@ -92,14 +92,18 @@ public void ensureSchemaUpToDate() {
@Override
@Transactional
public <ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable UUID messageId) {
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, messageId);
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext) {
return addWithOptimisticLocking(urn, newValue, aspectClass, auditStamp, null, ingestionTrackingContext);
}

@Override
public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue,
@Nonnull Class<ASPECT> aspectClass, @Nonnull AuditStamp auditStamp, @Nonnull Timestamp oldTimestamp,
@Nullable UUID messageId) {
public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(
@Nonnull URN urn,
@Nullable ASPECT newValue,
@Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp,
@Nullable Timestamp oldTimestamp,
@Nullable IngestionTrackingContext ingestionTrackingContext) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

messageId was never used before?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nope. Intellij didn't have much trouble removing that param.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it mean we'll have a new messageId during the backfill? or the backfill's messageId is always null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the param was before I added the backfill feature. It was an unused param.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

talked offline with @yangyangv2 . For the method add, I will switch param messageId with the ingestionTrackingContext


final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
Expand Down Expand Up @@ -151,6 +155,9 @@ public <ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN
.setLastmodifiedby(actor)
.setLastmodifiedon(new Timestamp(timestamp).toString())
.setCreatedfor(impersonator, SetMode.IGNORE_NULL);
if (ingestionTrackingContext != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be safer and more clear to still create IngestionTrackingContext in the input and here we check


} ```

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think your comment is missing some code?

auditedAspect.setEmitTime(ingestionTrackingContext.getEmitTime(), SetMode.IGNORE_NULL);
}

final String metadata = toJsonString(auditedAspect);
return sqlUpdate.setParameter("metadata", metadata).execute();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package com.linkedin.metadata.dao;

import com.google.common.annotations.VisibleForTesting;
import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.DataSchema;
Expand Down Expand Up @@ -760,11 +759,11 @@ protected <ASPECT extends RecordTemplate> void updateWithOptimisticLocking(@Nonn
// aspect table will apply regular update over (urn, aspect, version) primary key combination.
oldSchemaSqlUpdate = assembleOldSchemaSqlUpdate(aspect, null);
numOfUpdatedRows = runInTransactionWithRetry(() -> {
UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null;
// DUAL WRITE: 1) update aspect table, 2) update entity table.
// Note: when cold-archive is enabled, this method: updateWithOptimisticLocking will not be called.
_server.execute(oldSchemaSqlUpdate);
return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp, messageId);
return _localAccess.addWithOptimisticLocking(urn, (ASPECT) value, aspectClass, newAuditStamp, oldTimestamp,
trackingContext);
}, 1);
} else {
// In OLD_SCHEMA mode since aspect table is the SOT and the getLatest (oldTimestamp) is from the aspect table
Expand All @@ -787,8 +786,7 @@ protected <ASPECT extends RecordTemplate> void insert(@Nonnull URN urn, @Nullabl
if (_schemaConfig != SchemaConfig.OLD_SCHEMA_ONLY && version == LATEST_VERSION) {
// insert() could be called when updating log table (moving current versions into new history version)
// the metadata entity tables shouldn't been updated.
UUID messageId = trackingContext != null ? trackingContext.getTrackingId() : null;
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, messageId);
_localAccess.add(urn, (ASPECT) value, aspectClass, auditStamp, trackingContext);
}

if (_changeLogEnabled) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.linkedin.metadata.dao;

import com.linkedin.avro2pegasus.events.UUID;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.metadata.dao.builder.BaseLocalRelationshipBuilder.LocalRelationshipUpdates;
import com.linkedin.metadata.dao.builder.LocalRelationshipBuilderRegistry;
import com.linkedin.metadata.dao.scsi.UrnPathExtractor;
import com.linkedin.metadata.events.IngestionTrackingContext;
import com.linkedin.metadata.query.IndexFilter;
import com.linkedin.metadata.query.IndexGroupByCriterion;
import com.linkedin.metadata.query.IndexSortCriterion;
Expand All @@ -25,28 +25,32 @@ public interface IEbeanLocalAccess<URN extends Urn> {

/**
* Upsert aspect into entity table.
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param <ASPECT> metadata aspect value
*
* @param <ASPECT> metadata aspect value
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for this update
* @return number of rows inserted or updated
*/
<ASPECT extends RecordTemplate> int add(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable UUID messageId);
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext);

/**
* Update aspect on entity table with optimistic locking. (compare-and-update on oldTimestamp).
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param oldTimestamp old time stamp for optimistic lock checking
* @param <ASPECT> metadata aspect value
*
* @param <ASPECT> metadata aspect value
* @param urn entity urn
* @param newValue aspect value in {@link RecordTemplate}
* @param aspectClass class of the aspect
* @param auditStamp audit timestamp
* @param oldTimestamp old time stamp for optimistic lock checking
* @param ingestionTrackingContext the ingestionTrackingContext of the MCE responsible for calling this update
* @return number of rows inserted or updated
*/
<ASPECT extends RecordTemplate> int addWithOptimisticLocking(@Nonnull URN urn, @Nullable ASPECT newValue, @Nonnull Class<ASPECT> aspectClass,
@Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable UUID messageId);
@Nonnull AuditStamp auditStamp, @Nullable Timestamp oldTimestamp, @Nullable IngestionTrackingContext ingestionTrackingContext);

/**
* Upsert relationships to the local relationship table(s).
Expand Down
Loading
Loading