Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 64 additions & 33 deletions dao-api/src/main/java/com/linkedin/metadata/dao/BaseLocalDAO.java
Original file line number Diff line number Diff line change
Expand Up @@ -503,37 +503,22 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
@Nonnull AuditStamp auditStamp, @Nonnull EqualityTester<ASPECT> equalityTester,
@Nullable IngestionTrackingContext trackingContext, @Nonnull IngestionParams ingestionParams) {

final ASPECT oldValue = latest.getAspect() == null ? null : latest.getAspect();
final ASPECT oldValue = latest.getAspect();
final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit();
final Long oldEmitTime = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getEmitTime();

final boolean isBackfillEvent = trackingContext != null
&& trackingContext.hasBackfill() && trackingContext.isBackfill();
if (isBackfillEvent) {
boolean shouldBackfill =
// new value is being inserted. We should backfill
oldValue == null
|| (
// tracking context should ideally always have emitTime. If it's not present, we will skip backfilling
trackingContext.hasEmitTime()
&& (
// old emit time is available so we'll use it for comparison
// if new event emit time > old event emit time, we'll backfill
(oldEmitTime != null && trackingContext.getEmitTime() > oldEmitTime)
// old emit time is not available, so we'll fall back to comparing new emit time against old audit time
// old audit time represents the last modified time of the aspect
|| (oldEmitTime == null && oldAuditStamp != null && oldAuditStamp.hasTime() && trackingContext.getEmitTime() > oldAuditStamp.getTime())));

log.info("Encounter backfill event. Old value = null: {}. Tracking context: {}. Urn: {}. Aspect class: {}. Old audit stamp: {}. "
+ "Old emit time: {}. "
boolean shouldBackfill = shouldBackfill(urn, latest, aspectClass, trackingContext);
log.info("Encounter backfill event. Old value = null: {}. isSoftDeleted: {}. Tracking context: {}. Urn: {}. "
+ "Aspect class: {}. Old audit stamp: {}. Old emit time: {}. "
+ "Based on this information, shouldBackfill = {}.",
oldValue == null, trackingContext, urn, aspectClass, oldAuditStamp, oldEmitTime, shouldBackfill);

oldValue == null, latest.isSoftDeleted(), trackingContext, urn, aspectClass, oldAuditStamp, oldEmitTime, shouldBackfill);
if (!shouldBackfill) {
return new AddResult<>(oldValue, oldValue, aspectClass);
}
}

// TODO(yanyang) added for job-gms duplicity debug, throwaway afterwards
if (log.isDebugEnabled()) {
if ("AzkabanFlowInfo".equals(aspectClass.getSimpleName())) {
Expand All @@ -555,14 +540,60 @@ private <ASPECT extends RecordTemplate> AddResult<ASPECT> addCommon(@Nonnull URN
long largestVersion =
saveLatest(urn, aspectClass, oldValue,
optimisticLockAuditStamp != null ? optimisticLockAuditStamp : oldAuditStamp,
newValue, auditStamp, latest.isSoftDeleted, trackingContext, ingestionParams.isTestMode());
newValue, auditStamp, latest.isSoftDeleted(), trackingContext, ingestionParams.isTestMode());

// Apply retention policy
applyRetention(urn, aspectClass, getRetention(aspectClass), largestVersion);

return new AddResult<>(oldValue, newValue, aspectClass);
}

/**
* Determines whether a backfill event should be applied.
*/
private <ASPECT extends RecordTemplate> boolean shouldBackfill(
@Nonnull URN urn,
@Nonnull AspectEntry<ASPECT> latest,
@Nonnull Class<ASPECT> aspectClass,
@Nonnull IngestionTrackingContext trackingContext) {

final ASPECT oldValue = latest.getAspect();
final AuditStamp oldAuditStamp = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getAudit();
final Long oldEmitTime = latest.getExtraInfo() == null ? null : latest.getExtraInfo().getEmitTime();

// 1. Aspect has never existed (no value, not soft-deleted) — safe to backfill unconditionally.
// This is the only case where we allow backfill without an emitTime comparison.
if (oldValue == null && !latest.isSoftDeleted()) {
return true;
}

// 2. All remaining cases require emitTime to determine staleness. Without emitTime we cannot safely compare, so reject the backfill.
if (!trackingContext.hasEmitTime()) {
return false;
}
long emitTime = trackingContext.getEmitTime();
// 3. Aspect was soft-deleted (oldValue is null because the aspect was deleted, not because it never existed).
// Compare emitTime against the per-aspect deletion timestamp (from SoftDeletedAspect.deleted_timestamp,
// surfaced via oldAuditStamp) to prevent stale DLQ replays from resurrecting deleted aspects.
// Uses strict > (not >=) — an event at the exact same millisecond as the deletion is treated as stale,
// since it was likely in-flight when delete occurred.
if (oldValue == null) {
// If the deletion timestamp is missing or invalid, we can't safely compare — reject and warn.
if (oldAuditStamp == null || !oldAuditStamp.hasTime()) {
log.warn("Soft-deleted entity has valid emitTime but missing/invalid deletion timestamp. Backfill will be rejected. Urn: {}. Aspect: {}. "
+ "emitTime: {}. oldAuditStamp: {}.", urn, aspectClass, emitTime, oldAuditStamp);
return false;
}
return emitTime > oldAuditStamp.getTime();
}
// 4. Aspect exists with a current value — standard staleness check. Prefer comparing against the old event's emitTime (most accurate).
if (oldEmitTime != null) {
return emitTime > oldEmitTime;
}
// 5. Old emitTime unavailable — fall back to the aspect's last-modified audit timestamp. This is less precise but the best available signal.
return oldAuditStamp != null && oldAuditStamp.hasTime() && emitTime > oldAuditStamp.getTime();
}

/**
* Implemented in EbeanLocalDAO.
*/
Expand Down Expand Up @@ -629,7 +660,7 @@ public List<ASPECT_UNION> addMany(@Nonnull URN urn,
} else {
// no atomic multiple updates: run each in its own transaction. This is the same as repeated calls to add
results = aspectUpdateLambdas.stream().map(x -> runInTransactionWithRetry(() ->
aspectUpdateHelper(urn, x, auditStamp, trackingContext), maxTransactionRetry)).collect(Collectors.toList());
aspectUpdateHelper(urn, x, auditStamp, trackingContext), maxTransactionRetry)).collect(Collectors.toList());
}

// send the audit events etc
Expand Down Expand Up @@ -960,8 +991,8 @@ public <ASPECT extends RecordTemplate> URN create(@Nonnull URN urn,

// create aspects and process callbacks in a single transaction
return runInTransactionWithRetry(() -> {
return createAspectsWithCallbacks(urn, aspectValues, aspectCreateLambdas, auditStamp, trackingContext);
}, maxTransactionRetry
return createAspectsWithCallbacks(urn, aspectValues, aspectCreateLambdas, auditStamp, trackingContext);
}, maxTransactionRetry
);
}

Expand Down Expand Up @@ -1037,18 +1068,18 @@ public Collection<ASPECT_UNION> deleteAll(@Nonnull URN urn,

final Map<Class<?>, RecordTemplate> results = new HashMap<>();
runInTransactionWithRetry(() -> {
aspectClasses.forEach(aspectClass -> {
try {
RecordTemplate deletedAspect = delete(urn, aspectClass, auditStamp, maxTransactionRetry, trackingContext);
results.put(aspectClass, deletedAspect);
} catch (NullPointerException e) {
log.warn("Aspect {} for urn {} does not exist", aspectClass.getName(), urn);
}
});
aspectClasses.forEach(aspectClass -> {
try {
RecordTemplate deletedAspect = delete(urn, aspectClass, auditStamp, maxTransactionRetry, trackingContext);
results.put(aspectClass, deletedAspect);
} catch (NullPointerException e) {
log.warn("Aspect {} for urn {} does not exist", aspectClass.getName(), urn);
}
});

permanentDelete(urn, nonNullIngestionParams.isTestMode());
return results;
}, maxTransactionRetry);
}, maxTransactionRetry);


Collection<RecordTemplate> deletedAspects = new ArrayList<>();
Expand Down
Loading
Loading