Skip to content

Commit

Permalink
feat(search): include timestamp for entity metadata change (#12567)
Browse files Browse the repository at this point in the history
  • Loading branch information
deepgarg-visa authored Feb 13, 2025
1 parent 7326bb9 commit 2d762f0
Show file tree
Hide file tree
Showing 14 changed files with 162 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,9 @@ private void extractSearchableAnnotation(
annotation.getNumValuesFieldName(),
annotation.getWeightsPerFieldValue(),
annotation.getFieldNameAliases(),
annotation.isIncludeQueryEmptyAggregation());
annotation.isIncludeQueryEmptyAggregation(),
annotation.isIncludeSystemModifiedAt(),
annotation.getSystemModifiedAtFieldName());
}
}
log.debug("Searchable annotation for field: {} : {}", schemaPathSpec, annotation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ public class SearchableAnnotation {
// only adds to query time not mapping
boolean includeQueryEmptyAggregation;

boolean includeSystemModifiedAt;

Optional<String> systemModifiedAtFieldName;

public enum FieldType {
KEYWORD,
TEXT,
Expand Down Expand Up @@ -125,6 +129,10 @@ public static SearchableAnnotation fromPegasusAnnotationObject(
final List<String> fieldNameAliases = getFieldNameAliases(map);

final FieldType resolvedFieldType = getFieldType(fieldType, schemaDataType);
final Optional<Boolean> includeSystemModifiedAt =
AnnotationUtils.getField(map, "includeSystemModifiedAt", Boolean.class);
final Optional<String> systemModifiedAtFieldName =
AnnotationUtils.getField(map, "systemModifiedAtFieldName", String.class);
return new SearchableAnnotation(
fieldName.orElse(schemaFieldName),
resolvedFieldType,
Expand All @@ -139,7 +147,9 @@ public static SearchableAnnotation fromPegasusAnnotationObject(
numValuesFieldName,
weightsPerFieldValueMap.orElse(ImmutableMap.of()),
fieldNameAliases,
includeQueryEmptyAggregation.orElse(false));
includeQueryEmptyAggregation.orElse(false),
includeSystemModifiedAt.orElse(false),
systemModifiedAtFieldName);
}

private static FieldType getFieldType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,12 @@ private static Map<String, Object> getMappingsForField(
.getNumValuesFieldName()
.ifPresent(
fieldName -> mappings.put(fieldName, ImmutableMap.of(TYPE, ESUtils.LONG_FIELD_TYPE)));

if (ESUtils.getSystemModifiedAtFieldName(searchableFieldSpec).isPresent()) {
String modifiedAtFieldName = ESUtils.getSystemModifiedAtFieldName(searchableFieldSpec).get();
mappings.put(modifiedAtFieldName, ImmutableMap.of(TYPE, ESUtils.DATE_FIELD_TYPE));
}

mappings.putAll(getMappingsForFieldNameAliases(searchableFieldSpec));

return mappings;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.linkedin.metadata.models.annotation.SearchableAnnotation.FieldType;
import com.linkedin.metadata.models.extractor.FieldExtractor;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.r2.RemoteInvocationException;
import com.linkedin.structured.StructuredProperties;
import com.linkedin.structured.StructuredPropertyDefinition;
Expand Down Expand Up @@ -92,7 +94,9 @@ public Optional<String> transformSnapshot(
final ObjectNode searchDocument = JsonNodeFactory.instance.objectNode();
searchDocument.put("urn", snapshot.data().get("urn").toString());
extractedSearchableFields.forEach(
(key, value) -> setSearchableValue(key, value, searchDocument, forDelete));
(key, value) ->
setSearchableValue(
key, value, searchDocument, forDelete, AuditStampUtils.createDefaultAuditStamp()));
extractedSearchScoreFields.forEach(
(key, values) -> setSearchScoreValue(key, values, searchDocument, forDelete));
return Optional.of(searchDocument.toString());
Expand Down Expand Up @@ -149,7 +153,8 @@ public Optional<ObjectNode> transformAspect(
final @Nonnull Urn urn,
final @Nullable RecordTemplate aspect,
final @Nonnull AspectSpec aspectSpec,
final Boolean forDelete)
final Boolean forDelete,
final AuditStamp mclCreateAuditStamp)
throws RemoteInvocationException, URISyntaxException {
final Map<SearchableFieldSpec, List<Object>> extractedSearchableFields =
FieldExtractor.extractFields(aspect, aspectSpec.getSearchableFieldSpecs(), maxValueLength);
Expand All @@ -168,10 +173,12 @@ public Optional<ObjectNode> transformAspect(
searchDocument.put("urn", urn.toString());

extractedSearchableFields.forEach(
(key, values) -> setSearchableValue(key, values, searchDocument, forDelete));
(key, values) ->
setSearchableValue(key, values, searchDocument, forDelete, mclCreateAuditStamp));
extractedSearchRefFields.forEach(
(key, values) ->
setSearchableRefValue(opContext, key, values, searchDocument, forDelete));
setSearchableRefValue(
opContext, key, values, searchDocument, forDelete, mclCreateAuditStamp));
extractedSearchScoreFields.forEach(
(key, values) -> setSearchScoreValue(key, values, searchDocument, forDelete));
result = Optional.of(searchDocument);
Expand All @@ -190,7 +197,8 @@ public void setSearchableValue(
final SearchableFieldSpec fieldSpec,
final List<Object> fieldValues,
final ObjectNode searchDocument,
final Boolean forDelete) {
final Boolean forDelete,
final AuditStamp mclCreatedAuditStamp) {
DataSchema.Type valueType = fieldSpec.getPegasusSchema().getType();
Optional<Object> firstValue = fieldValues.stream().findFirst();
boolean isArray = fieldSpec.isArray();
Expand Down Expand Up @@ -255,6 +263,13 @@ public void setSearchableValue(
return;
}

if (ESUtils.getSystemModifiedAtFieldName(fieldSpec).isPresent()) {
String modifiedAtFieldName = ESUtils.getSystemModifiedAtFieldName(fieldSpec).get();
searchDocument.set(
modifiedAtFieldName,
JsonNodeFactory.instance.numberNode((Long) mclCreatedAuditStamp.getTime()));
}

if (isArray || (valueType == DataSchema.Type.MAP && !OBJECT_FIELD_TYPES.contains(fieldType))) {
if (fieldType == FieldType.BROWSE_PATH_V2) {
String browsePathV2Value = getBrowsePathV2Value(fieldValues);
Expand Down Expand Up @@ -525,7 +540,8 @@ public void setSearchableRefValue(
final SearchableRefFieldSpec searchableRefFieldSpec,
final List<Object> fieldValues,
final ObjectNode searchDocument,
final Boolean forDelete) {
final Boolean forDelete,
final AuditStamp mclCreatedAuditStamp) {
String fieldName = searchableRefFieldSpec.getSearchableRefAnnotation().getFieldName();
FieldType fieldType = searchableRefFieldSpec.getSearchableRefAnnotation().getFieldType();
boolean isArray = searchableRefFieldSpec.isArray();
Expand All @@ -540,11 +556,13 @@ public void setSearchableRefValue(
fieldValues
.subList(0, Math.min(fieldValues.size(), maxArrayLength))
.forEach(
value -> getNodeForRef(opContext, depth, value, fieldType).ifPresent(arrayNode::add));
value ->
getNodeForRef(opContext, depth, value, fieldType, mclCreatedAuditStamp)
.ifPresent(arrayNode::add));
searchDocument.set(fieldName, arrayNode);
} else if (!fieldValues.isEmpty()) {
String finalFieldName = fieldName;
getNodeForRef(opContext, depth, fieldValues.get(0), fieldType)
getNodeForRef(opContext, depth, fieldValues.get(0), fieldType, mclCreatedAuditStamp)
.ifPresent(node -> searchDocument.set(finalFieldName, node));
} else {
searchDocument.set(fieldName, JsonNodeFactory.instance.nullNode());
Expand All @@ -555,7 +573,8 @@ private Optional<JsonNode> getNodeForRef(
@Nonnull OperationContext opContext,
final int depth,
final Object fieldValue,
final FieldType fieldType) {
final FieldType fieldType,
final AuditStamp auditStamp) {
EntityRegistry entityRegistry = opContext.getEntityRegistry();
AspectRetriever aspectRetriever = opContext.getAspectRetriever();

Expand Down Expand Up @@ -598,7 +617,7 @@ private Optional<JsonNode> getNodeForRef(
SearchableFieldSpec spec = entry.getKey();
List<Object> value = entry.getValue();
if (!value.isEmpty()) {
setSearchableValue(spec, value, resultNode, false);
setSearchableValue(spec, value, resultNode, false, auditStamp);
}
}

Expand All @@ -624,7 +643,8 @@ private Optional<JsonNode> getNodeForRef(
opContext,
newDepth,
val,
spec.getSearchableRefAnnotation().getFieldType())
spec.getSearchableRefAnnotation().getFieldType(),
auditStamp)
.ifPresent(arrayNode::add));
resultNode.set(fieldName, arrayNode);
} else {
Expand All @@ -633,7 +653,8 @@ private Optional<JsonNode> getNodeForRef(
opContext,
newDepth,
value.get(0),
spec.getSearchableRefAnnotation().getFieldType());
spec.getSearchableRefAnnotation().getFieldType(),
auditStamp);
if (node.isPresent()) {
resultNode.set(fieldName, node.get());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,4 +907,15 @@ public static BoolQueryBuilder buildFilterNonLatestEntities(
queryFilterRewriteChain);
return QueryBuilders.boolQuery().should(isLatest).should(isNotVersioned).minimumShouldMatch(1);
}

public static Optional<String> getSystemModifiedAtFieldName(
@Nonnull SearchableFieldSpec searchableFieldSpec) {
final String fieldName = searchableFieldSpec.getSearchableAnnotation().getFieldName();
return searchableFieldSpec.getSearchableAnnotation().isIncludeSystemModifiedAt()
? searchableFieldSpec
.getSearchableAnnotation()
.getSystemModifiedAtFieldName()
.or(() -> Optional.of(String.format("%sSystemModifiedAt", fieldName)))
: Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
Expand Down Expand Up @@ -293,7 +294,8 @@ private void handleNonSystemMetadataDeleteChangeEvent(
specPair.getFirst().getName(),
specPair.getSecond(),
event.getRecordTemplate(),
isDeletingKey);
isDeletingKey,
event.getAuditStamp());
}
}

Expand Down Expand Up @@ -325,7 +327,7 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev
try {
searchDocument =
searchDocumentTransformer
.transformAspect(opContext, urn, aspect, aspectSpec, false)
.transformAspect(opContext, urn, aspect, aspectSpec, false, event.getAuditStamp())
.map(
objectNode ->
withSystemCreated(
Expand Down Expand Up @@ -356,7 +358,7 @@ private void updateSearchService(@Nonnull OperationContext opContext, MCLItem ev
try {
previousSearchDocument =
searchDocumentTransformer.transformAspect(
opContext, urn, previousAspect, aspectSpec, false);
opContext, urn, previousAspect, aspectSpec, false, event.getAuditStamp());
} catch (Exception e) {
log.error(
"Error in getting documents from previous aspect state for urn: {} for aspect {}, continuing without diffing.",
Expand Down Expand Up @@ -445,7 +447,8 @@ private void deleteSearchData(
String entityName,
AspectSpec aspectSpec,
@Nullable RecordTemplate aspect,
Boolean isKeyAspect) {
Boolean isKeyAspect,
AuditStamp auditStamp) {
String docId;
try {
docId = URLEncoder.encode(urn.toString(), "UTF-8");
Expand All @@ -463,7 +466,7 @@ private void deleteSearchData(
try {
searchDocument =
searchDocumentTransformer
.transformAspect(opContext, urn, aspect, aspectSpec, true)
.transformAspect(opContext, urn, aspect, aspectSpec, true, auditStamp)
.map(Objects::toString); // TODO
} catch (Exception e) {
log.error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,9 @@ public void testGetDefaultAggregationsHasFields() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchConfiguration config = new SearchConfiguration();
config.setMaxTermBucketSize(25);
Expand Down Expand Up @@ -203,7 +205,9 @@ public void testGetDefaultAggregationsFields() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchConfiguration config = new SearchConfiguration();
config.setMaxTermBucketSize(25);
Expand Down Expand Up @@ -235,7 +239,9 @@ public void testGetSpecificAggregationsHasFields() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchableAnnotation annotation2 =
new SearchableAnnotation(
Expand All @@ -252,7 +258,9 @@ public void testGetSpecificAggregationsHasFields() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchConfiguration config = new SearchConfiguration();
config.setMaxTermBucketSize(25);
Expand Down Expand Up @@ -462,7 +470,9 @@ public void testAggregateOverFieldsAndStructProp() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchableAnnotation annotation2 =
new SearchableAnnotation(
Expand All @@ -479,7 +489,9 @@ public void testAggregateOverFieldsAndStructProp() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchConfiguration config = new SearchConfiguration();
config.setMaxTermBucketSize(25);
Expand Down Expand Up @@ -532,7 +544,9 @@ public void testAggregateOverFieldsAndStructPropV1() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchableAnnotation annotation2 =
new SearchableAnnotation(
Expand All @@ -549,7 +563,9 @@ public void testAggregateOverFieldsAndStructPropV1() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
false);
false,
false,
Optional.empty());

SearchConfiguration config = new SearchConfiguration();
config.setMaxTermBucketSize(25);
Expand Down Expand Up @@ -606,7 +622,9 @@ public void testMissingAggregation() {
Optional.empty(),
Collections.emptyMap(),
Collections.emptyList(),
true);
true,
false,
Optional.empty());

SearchConfiguration config = new SearchConfiguration();
config.setMaxTermBucketSize(25);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,9 @@ public void testGetStandardFields() {
Optional.empty(),
Map.of(),
List.of(),
false),
false,
false,
Optional.empty()),
mock(DataSchema.class)),
new SearchableFieldSpec(
mock(PathSpec.class),
Expand All @@ -472,7 +474,9 @@ public void testGetStandardFields() {
Optional.empty(),
Map.of(),
List.of(),
false),
false,
false,
Optional.empty()),
mock(DataSchema.class)),
new SearchableFieldSpec(
mock(PathSpec.class),
Expand All @@ -490,7 +494,9 @@ public void testGetStandardFields() {
Optional.empty(),
Map.of(),
List.of(),
false),
false,
false,
Optional.empty()),
mock(DataSchema.class))));

fieldConfigs =
Expand Down
Loading

0 comments on commit 2d762f0

Please sign in to comment.