diff --git a/dao-api/src/main/pegasus/com/linkedin/metadata/internal/IngestionParams.pdl b/dao-api/src/main/pegasus/com/linkedin/metadata/internal/IngestionParams.pdl index 6b5676a4e..2231ae56f 100644 --- a/dao-api/src/main/pegasus/com/linkedin/metadata/internal/IngestionParams.pdl +++ b/dao-api/src/main/pegasus/com/linkedin/metadata/internal/IngestionParams.pdl @@ -2,7 +2,7 @@ namespace com.linkedin.metadata.internal import com.linkedin.metadata.events.IngestionMode import com.linkedin.metadata.events.IngestionTrackingContext -import pegasus.com.linkedin.metadata.events.IngestionAspectETag +import pegasus.com.linkedin.metadata.aspect.RequestMetadata /** * Record defining ingestion-related parameters that can be passed into DAO API calls. @@ -25,7 +25,7 @@ record IngestionParams { testMode: boolean = false /** - * eTag for aspect ingestion optimistic locking + * Metadata for API request */ - ingestionETags: optional array[IngestionAspectETag] = [ ] + request_metadata: optional RequestMetadata } \ No newline at end of file diff --git a/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/AspectMetadata.pdl b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/AspectMetadata.pdl new file mode 100644 index 000000000..f032f7b27 --- /dev/null +++ b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/AspectMetadata.pdl @@ -0,0 +1,18 @@ +namespace pegasus.com.linkedin.metadata.aspect + +/** + * * + * Metadata associated with an aspect + */ +record AspectMetadata { + + /** + * aspect alias name + */ + alias: optional string = "" + + /** + * used for optimistic locking in update API + */ + etag: optional string = "" +} \ No newline at end of file diff --git a/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/RequestMetadata.pdl b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/RequestMetadata.pdl new file mode 100644 index 000000000..93389286f --- /dev/null +++ b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/RequestMetadata.pdl @@ -0,0 +1,13 @@ +namespace pegasus.com.linkedin.metadata.aspect + +/** + * * + * Metadata for API request + */ +record RequestMetadata { + + /** + * * Aspect metadata map + */ + aspect_metadata: optional array[AspectMetadata] = [ ] +} \ No newline at end of file diff --git a/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/ResponseMetadata.pdl b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/ResponseMetadata.pdl new file mode 100644 index 000000000..f46399f08 --- /dev/null +++ b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/aspect/ResponseMetadata.pdl @@ -0,0 +1,13 @@ +namespace pegasus.com.linkedin.metadata.aspect + +/** + * * + * Metadata for API response + */ +record ResponseMetadata { + + /** + * * Aspect metadata map + */ + aspect_metadata: optional array[AspectMetadata] = [ ] +} \ No newline at end of file diff --git a/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/events/IngestionAspectETag.pdl b/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/events/IngestionAspectETag.pdl deleted file mode 100644 index a962b9ac3..000000000 --- a/dao-api/src/main/pegasus/pegasus/com/linkedin/metadata/events/IngestionAspectETag.pdl +++ /dev/null @@ -1,17 +0,0 @@ -namespace pegasus.com.linkedin.metadata.events - -/** - * eTag used for atomic aspect updating - */ -record IngestionAspectETag { - - /** - * aspect field name, e.g. "status" - */ - aspect_alias: optional string = "" - - /** - * e.g. used for optimistic locking when writing new aspect value - */ - etag: optional string = "" -} \ No newline at end of file diff --git a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java index 196e379ba..e2ac7328e 100644 --- a/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java +++ b/dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalDAO.java @@ -68,8 +68,8 @@ import javax.persistence.Table; import lombok.Value; import lombok.extern.slf4j.Slf4j; -import pegasus.com.linkedin.metadata.events.IngestionAspectETag; -import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray; +import pegasus.com.linkedin.metadata.aspect.AspectMetadata; +import pegasus.com.linkedin.metadata.aspect.RequestMetadata; import static com.linkedin.metadata.dao.EbeanLocalAccess.*; import static com.linkedin.metadata.dao.EbeanMetadataAspect.*; @@ -619,10 +619,10 @@ public AuditStamp extractOptimisticLockForAspect AuditStamp optimisticLockAuditStamp = null; - final IngestionAspectETagArray ingestionAspectETags = ingestionParams.getIngestionETags(); + final RequestMetadata requestMetadata = ingestionParams.getRequest_metadata(); - if (ingestionAspectETags != null) { - for (IngestionAspectETag ingestionAspectETag: ingestionAspectETags) { + if (requestMetadata != null && requestMetadata.getAspect_metadata() != null) { + for (AspectMetadata aspectMetadata: requestMetadata.getAspect_metadata()) { final String aspectAlias; @@ -632,8 +632,8 @@ public AuditStamp extractOptimisticLockForAspect continue; } - if (aspectAlias != null && aspectAlias.equalsIgnoreCase(ingestionAspectETag.getAspect_alias())) { - Long decryptedETag = getDecryptedETag(ingestionAspectETag); + if (aspectAlias != null && aspectAlias.equalsIgnoreCase(aspectMetadata.getAlias())) { + Long decryptedETag = getDecryptedETag(aspectMetadata); if (decryptedETag != null) { optimisticLockAuditStamp = new AuditStamp(); optimisticLockAuditStamp.setTime(decryptedETag); @@ -649,12 +649,12 @@ public AuditStamp extractOptimisticLockForAspect * When eTag is null, it means this is a regular ingestion request, no read-modify-write consistency guarantee. */ @Nullable - private Long getDecryptedETag(@Nonnull IngestionAspectETag ingestionAspectETag) { + private Long getDecryptedETag(@Nonnull AspectMetadata aspectMetadata) { try { - if (ingestionAspectETag.getEtag() == null) { + if (aspectMetadata.getEtag() == null) { return null; } - return ETagUtils.decrypt(ingestionAspectETag.getEtag()); + return ETagUtils.decrypt(aspectMetadata.getEtag()); } catch (Exception e) { return null; } diff --git a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java index 48fa71b46..bf9c47205 100644 --- a/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java +++ b/dao-impl/ebean-dao/src/test/java/com/linkedin/metadata/dao/EbeanLocalDAOTest.java @@ -115,8 +115,9 @@ import org.testng.annotations.DataProvider; import org.testng.annotations.Factory; import org.testng.annotations.Test; -import pegasus.com.linkedin.metadata.events.IngestionAspectETag; -import pegasus.com.linkedin.metadata.events.IngestionAspectETagArray; +import pegasus.com.linkedin.metadata.aspect.AspectMetadata; +import pegasus.com.linkedin.metadata.aspect.AspectMetadataArray; +import pegasus.com.linkedin.metadata.aspect.RequestMetadata; import static com.linkedin.common.AuditStamps.*; import static com.linkedin.metadata.dao.internal.BaseGraphWriterDAO.RemovalOption.*; @@ -4190,13 +4191,17 @@ public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossible() FooUrn urn = new FooUrn(1); - IngestionAspectETag ingestionAspectETag = new IngestionAspectETag(); - ingestionAspectETag.setAspect_alias("aspectFoo"); long timestamp = 1750796203701L; - ingestionAspectETag.setEtag("KsFkRXtjaBGQf37HjdEjDQ=="); + + AspectMetadata aspectMetadata = new AspectMetadata(); + aspectMetadata.setAlias("aspectFoo"); + aspectMetadata.setEtag("KsFkRXtjaBGQf37HjdEjDQ=="); + + RequestMetadata requestMetadata = new RequestMetadata(); + requestMetadata.setAspect_metadata(new AspectMetadataArray(aspectMetadata)); IngestionParams ingestionParams = new IngestionParams(); - ingestionParams.setIngestionETags(new IngestionAspectETagArray(ingestionAspectETag)); + ingestionParams.setRequest_metadata(requestMetadata); AuditStamp result = dao.extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, AspectFoo.class, urn); @@ -4224,12 +4229,15 @@ public void testExtractOptimisticLockForAspectFromIngestionParamsIfPossibleAspec FooUrn urn = new FooUrn(1); - IngestionAspectETag ingestionAspectETag = new IngestionAspectETag(); - ingestionAspectETag.setAspect_alias("aspectBar"); - ingestionAspectETag.setEtag("KsFkRXtjaBGQf37HjdEjDQ=="); + AspectMetadata aspectMetadata = new AspectMetadata(); + aspectMetadata.setAlias("aspectBar"); + aspectMetadata.setEtag("KsFkRXtjaBGQf37HjdEjDQ=="); + + RequestMetadata requestMetadata = new RequestMetadata(); + requestMetadata.setAspect_metadata(new AspectMetadataArray(aspectMetadata)); IngestionParams ingestionParams = new IngestionParams(); - ingestionParams.setIngestionETags(new IngestionAspectETagArray(ingestionAspectETag)); + ingestionParams.setRequest_metadata(requestMetadata); AuditStamp result = dao.extractOptimisticLockForAspectFromIngestionParamsIfPossible(ingestionParams, AspectFoo.class, urn);