Skip to content

Commit a08aa98

Browse files
authored
feat(ebean-dao): add batch SQL layer for multi-aspect upsert and batch deletion (#607)
1 parent fb9fa85 commit a08aa98

5 files changed

Lines changed: 399 additions & 88 deletions

File tree

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/EbeanLocalAccess.java

Lines changed: 220 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import java.sql.ResultSet;
3333
import java.sql.SQLException;
3434
import java.sql.Timestamp;
35+
import java.util.ArrayList;
3536
import java.util.Collections;
3637
import java.util.HashMap;
3738
import java.util.HashSet;
@@ -191,100 +192,50 @@ public <ASPECT_UNION extends RecordTemplate> int create(
191192
@Nullable IngestionTrackingContext ingestionTrackingContext,
192193
boolean isTestMode) {
193194

194-
aspectValues.forEach(aspectValue -> {
195-
if (aspectValue == null) {
196-
throw new IllegalArgumentException("Aspect value cannot be null");
197-
}
198-
});
199-
200-
final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
201-
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
202-
final String impersonator = auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null;
203-
final boolean urnExtraction = _urnPathExtractor != null && !(_urnPathExtractor instanceof EmptyPathExtractor);
204-
205-
final SqlUpdate sqlUpdate;
206-
207-
List<String> classNames = aspectCreateLambdas.stream()
208-
.map(aspectCreateLamdba -> aspectCreateLamdba.getAspectClass().getCanonicalName())
209-
.collect(Collectors.toList());
210-
211-
// Create insert statement with variable number of aspect columns
212-
// For example: INSERT INTO <table_name> (<columns>)
213-
StringBuilder insertIntoSql = new StringBuilder();
214-
// Create part of insert statement with variable number of aspect values
215-
// For example: VALUES (<values>);
216-
StringBuilder insertSqlValues = new StringBuilder();
217-
218-
if (urnExtraction) {
219-
insertIntoSql.append(SQL_INSERT_INTO_ASSET_WITH_URN);
220-
insertSqlValues.append(SQL_INSERT_ASSET_VALUES_WITH_URN);
221-
} else {
222-
insertIntoSql.append(SQL_INSERT_INTO_ASSET);
223-
insertSqlValues.append(SQL_INSERT_ASSET_VALUES);
224-
}
195+
// Build ON DUPLICATE KEY UPDATE clause for create semantics (throws exception on duplicate)
196+
String onDuplicateKeyClause = buildOnDuplicateKeyForCreate(urn, aspectCreateLambdas);
225197

226-
for (int i = 0; i < classNames.size(); i++) {
227-
insertIntoSql.append(getAspectColumnName(urn.getEntityType(), classNames.get(i)));
228-
// Add parameterization for aspect values
229-
insertSqlValues.append(":aspect").append(i);
230-
// Add comma if not the last column
231-
if (i != classNames.size() - 1) {
232-
insertIntoSql.append(", ");
233-
insertSqlValues.append(", ");
234-
}
235-
}
236-
insertIntoSql.append(CLOSING_BRACKET);
237-
insertSqlValues.append(CLOSING_BRACKET);
238-
239-
// Construct DELETED_TS_CHECK_FOR_CREATE String
240-
StringBuilder deletedTsCheckForCreate = new StringBuilder();
241-
deletedTsCheckForCreate.append(DELETED_TS_DUPLICATE_KEY_CHECK);
242-
for (int i = 0; i < classNames.size(); i++) {
243-
deletedTsCheckForCreate.append(getAspectColumnName(urn.getEntityType(), classNames.get(i)));
244-
deletedTsCheckForCreate.append(" = :aspect").append(i);
245-
if (i != classNames.size() - 1) {
246-
deletedTsCheckForCreate.append(", ");
247-
}
248-
}
249-
deletedTsCheckForCreate.append(DELETED_TS_SET_VALUE_CONDITIONALLY);
198+
// Use comprehensive helper to prepare SqlUpdate with all common logic
199+
SqlUpdate sqlUpdate = prepareMultiColumnInsert(urn, aspectValues, aspectCreateLambdas,
200+
auditStamp, ingestionTrackingContext, onDuplicateKeyClause);
250201

251-
// Build the final insert statement as follows:
252-
// INSERT INTO <table_name> (<columns>) VALUES (<values>)
253-
// ON DUPLICATE KEY UPDATE aspectclass1 = aspect1, ...,
254-
// deleted_ts = IF(deleted_ts IS NULL, CAST('DuplicateKeyException' AS UNSIGNED), NULL);
255-
String insertStatement = insertIntoSql.toString() + insertSqlValues.toString() + deletedTsCheckForCreate.toString();
256-
257-
insertStatement = String.format(insertStatement, getTableName(urn));
202+
return sqlUpdate.execute();
203+
}
258204

259-
sqlUpdate = _server.createSqlUpdate(insertStatement);
205+
/**
206+
* Batch upsert multiple aspects for a single URN using multi-column UPDATE.
207+
* This method generates a single SQL statement that updates all aspect columns at once.
208+
* Unlike create(), this does UPSERT (always updates if exists, no duplicate key check).
209+
*
210+
* @param urn entity URN
211+
* @param updateContexts list of aspect update contexts containing values and lambdas
212+
* @param auditStamp audit stamp for tracking
213+
* @param ingestionTrackingContext tracking context for ingestion
214+
* @param isTestMode whether this is a test mode operation
215+
* @return number of rows affected
216+
*/
217+
public <ASPECT_UNION extends RecordTemplate> int batchUpsert(
218+
@Nonnull URN urn,
219+
@Nonnull List<BaseLocalDAO.AspectUpdateContext<RecordTemplate>> updateContexts,
220+
@Nonnull AuditStamp auditStamp,
221+
@Nullable IngestionTrackingContext ingestionTrackingContext,
222+
boolean isTestMode) {
260223

261-
String utcTimestamp = Instant.ofEpochMilli(timestamp)
262-
.atZone(ZoneOffset.UTC)
263-
.format(DateTimeFormatter.ofPattern(DATE_TIME_FORMAT));
264-
// Set parameters for each aspect value
265-
for (int i = 0; i < aspectValues.size(); i++) {
266-
AuditedAspect auditedAspect = new AuditedAspect()
267-
.setAspect(RecordUtils.toJsonString(aspectValues.get(i)))
268-
.setCanonicalName(aspectCreateLambdas.get(i).getAspectClass().getCanonicalName())
269-
.setLastmodifiedby(actor)
270-
.setLastmodifiedon(utcTimestamp)
271-
.setCreatedfor(impersonator, SetMode.IGNORE_NULL);
272-
if (ingestionTrackingContext != null) {
273-
auditedAspect.setEmitTime(ingestionTrackingContext.getEmitTime(), SetMode.IGNORE_NULL);
274-
auditedAspect.setEmitter(ingestionTrackingContext.getEmitter(), SetMode.IGNORE_NULL);
275-
}
276-
sqlUpdate.setParameter("aspect" + i, toJsonString(auditedAspect));
224+
// Extract parallel lists from contexts for prepareMultiColumnInsert
225+
List<RecordTemplate> aspectValues = new ArrayList<>();
226+
List<BaseLocalDAO.AspectUpdateLambda<? extends RecordTemplate>> aspectUpdateLambdas = new ArrayList<>();
227+
228+
for (BaseLocalDAO.AspectUpdateContext<RecordTemplate> ctx : updateContexts) {
229+
aspectValues.add(ctx.getNewValue());
230+
aspectUpdateLambdas.add(ctx.getLambda());
277231
}
278232

233+
// Build ON DUPLICATE KEY UPDATE clause for upsert semantics (always updates and clears deleted_ts)
234+
String onDuplicateKeyClause = buildOnDuplicateKeyForUpsert(urn, aspectUpdateLambdas);
279235

280-
// If a non-default UrnPathExtractor is provided, the user MUST specify in their schema generation scripts
281-
// 'ALTER TABLE <table> ADD COLUMN a_urn JSON'.
282-
if (urnExtraction) {
283-
sqlUpdate.setParameter("a_urn", toJsonString(urn));
284-
}
285-
sqlUpdate.setParameter("urn", urn.toString())
286-
.setParameter("lastmodifiedon", utcTimestamp)
287-
.setParameter("lastmodifiedby", actor);
236+
// Use comprehensive helper to prepare SqlUpdate with all common logic
237+
SqlUpdate sqlUpdate = prepareMultiColumnInsert(urn, aspectValues, aspectUpdateLambdas,
238+
auditStamp, ingestionTrackingContext, onDuplicateKeyClause);
288239

289240
return sqlUpdate.execute();
290241
}
@@ -784,4 +735,186 @@ static <ASPECT extends RecordTemplate, URN> EbeanMetadataAspect findLatestMetada
784735
return findLatestMetadataAspect(ebeanServer.currentTransaction().getConnection(), urn, aspectClass);
785736
}
786737
}
738+
739+
/**
740+
* Comprehensive helper method that prepares a SqlUpdate with all common logic up to the ON DUPLICATE KEY clause.
741+
* This consolidates audit extraction, SQL building, and parameter setting for both create() and batchUpsert().
742+
*
743+
* <p>Returns a SqlUpdate object with:</p>
744+
* <ul>
745+
* <li>INSERT INTO and VALUES clauses built</li>
746+
* <li>All aspect parameters set</li>
747+
* <li>URN parameter set (if urnExtraction enabled)</li>
748+
* <li>lastmodifiedon and lastmodifiedby parameters set</li>
749+
* </ul>
750+
*
751+
* <p>The caller only needs to append the ON DUPLICATE KEY clause and execute.</p>
752+
*
753+
* <p>TODO: Refactor to accept List&lt;AspectUpdateContext&gt; instead of parallel lists.
754+
* This would eliminate the positional contract between aspectValues and aspectLambdas,
755+
* making it impossible to misalign them and improving type safety. The create() pathway
756+
* would need to wrap values in AspectUpdateContext with null oldValue. This change would
757+
* complete the AspectUpdateContext refactoring throughout the entire call chain.</p>
758+
*
759+
* @param urn entity URN
760+
* @param aspectValues list of aspect values
761+
* @param aspectLambdas list of aspect lambdas (AspectUpdateLambda or AspectCreateLambda)
762+
* @param auditStamp audit stamp for tracking
763+
* @param ingestionTrackingContext tracking context for ingestion
764+
* @param onDuplicateKeyClause the ON DUPLICATE KEY UPDATE clause to append
765+
* @return SqlUpdate object ready for execution
766+
*/
767+
private SqlUpdate prepareMultiColumnInsert(
768+
@Nonnull URN urn,
769+
@Nonnull List<? extends RecordTemplate> aspectValues,
770+
@Nonnull List<? extends BaseLocalDAO.AspectUpdateLambda<? extends RecordTemplate>> aspectLambdas,
771+
@Nonnull AuditStamp auditStamp,
772+
@Nullable IngestionTrackingContext ingestionTrackingContext,
773+
@Nonnull String onDuplicateKeyClause) {
774+
775+
// Validate that aspectValues and aspectLambdas have the same size
776+
if (aspectValues.size() != aspectLambdas.size()) {
777+
throw new IllegalArgumentException(
778+
String.format("Aspect values size (%d) must match aspect lambdas size (%d)",
779+
aspectValues.size(), aspectLambdas.size()));
780+
}
781+
782+
// Validate that no aspect values are null
783+
aspectValues.forEach(aspectValue -> {
784+
if (aspectValue == null) {
785+
throw new IllegalArgumentException("Aspect value cannot be null");
786+
}
787+
});
788+
789+
// Extract audit information
790+
final long timestamp = auditStamp.hasTime() ? auditStamp.getTime() : System.currentTimeMillis();
791+
final String actor = auditStamp.hasActor() ? auditStamp.getActor().toString() : DEFAULT_ACTOR;
792+
final String impersonator = auditStamp.hasImpersonator() ? auditStamp.getImpersonator().toString() : null;
793+
final boolean urnExtraction = _urnPathExtractor != null && !(_urnPathExtractor instanceof EmptyPathExtractor);
794+
795+
// Extract class names from lambdas
796+
List<String> classNames = aspectLambdas.stream()
797+
.map(lambda -> lambda.getAspectClass().getCanonicalName())
798+
.collect(Collectors.toList());
799+
800+
// Build INSERT INTO and VALUES clauses
801+
StringBuilder insertIntoSql = new StringBuilder();
802+
StringBuilder insertSqlValues = new StringBuilder();
803+
804+
if (urnExtraction) {
805+
insertIntoSql.append(SQL_INSERT_INTO_ASSET_WITH_URN);
806+
insertSqlValues.append(SQL_INSERT_ASSET_VALUES_WITH_URN);
807+
} else {
808+
insertIntoSql.append(SQL_INSERT_INTO_ASSET);
809+
insertSqlValues.append(SQL_INSERT_ASSET_VALUES);
810+
}
811+
812+
// Build column list and value placeholders
813+
for (int i = 0; i < classNames.size(); i++) {
814+
insertIntoSql.append(getAspectColumnName(urn.getEntityType(), classNames.get(i)));
815+
insertSqlValues.append(":aspect").append(i);
816+
if (i != classNames.size() - 1) {
817+
insertIntoSql.append(", ");
818+
insertSqlValues.append(", ");
819+
}
820+
}
821+
insertIntoSql.append(CLOSING_BRACKET);
822+
insertSqlValues.append(CLOSING_BRACKET);
823+
824+
// Build complete SQL statement with ON DUPLICATE KEY clause
825+
String insertStatement = insertIntoSql.toString() + insertSqlValues.toString() + onDuplicateKeyClause;
826+
insertStatement = String.format(insertStatement, getTableName(urn));
827+
828+
// Create SqlUpdate and set all parameters
829+
SqlUpdate sqlUpdate = _server.createSqlUpdate(insertStatement);
830+
831+
String utcTimestamp = Instant.ofEpochMilli(timestamp)
832+
.atZone(ZoneOffset.UTC)
833+
.format(DateTimeFormatter.ofPattern(DATE_TIME_FORMAT));
834+
835+
// Set aspect parameters
836+
for (int i = 0; i < aspectValues.size(); i++) {
837+
AuditedAspect auditedAspect = new AuditedAspect()
838+
.setAspect(RecordUtils.toJsonString(aspectValues.get(i)))
839+
.setCanonicalName(classNames.get(i))
840+
.setLastmodifiedby(actor)
841+
.setLastmodifiedon(utcTimestamp)
842+
.setCreatedfor(impersonator, SetMode.IGNORE_NULL);
843+
if (ingestionTrackingContext != null) {
844+
auditedAspect.setEmitTime(ingestionTrackingContext.getEmitTime(), SetMode.IGNORE_NULL);
845+
auditedAspect.setEmitter(ingestionTrackingContext.getEmitter(), SetMode.IGNORE_NULL);
846+
}
847+
sqlUpdate.setParameter("aspect" + i, toJsonString(auditedAspect));
848+
}
849+
850+
// Set URN parameter if extraction is enabled
851+
if (urnExtraction) {
852+
sqlUpdate.setParameter("a_urn", toJsonString(urn));
853+
}
854+
855+
// Set common parameters
856+
sqlUpdate.setParameter("urn", urn.toString())
857+
.setParameter("lastmodifiedon", utcTimestamp)
858+
.setParameter("lastmodifiedby", actor);
859+
860+
return sqlUpdate;
861+
}
862+
863+
/**
864+
* Helper method to build the ON DUPLICATE KEY UPDATE clause for create() method.
865+
* This clause throws a DuplicateKeyException if the row already exists and is not soft-deleted.
866+
*
867+
* @param urn entity URN
868+
* @param aspectLambdas list of aspect lambdas
869+
* @return the ON DUPLICATE KEY UPDATE clause string
870+
*/
871+
private String buildOnDuplicateKeyForCreate(
872+
@Nonnull URN urn,
873+
@Nonnull List<? extends BaseLocalDAO.AspectUpdateLambda<? extends RecordTemplate>> aspectLambdas) {
874+
875+
List<String> classNames = aspectLambdas.stream()
876+
.map(lambda -> lambda.getAspectClass().getCanonicalName())
877+
.collect(Collectors.toList());
878+
879+
StringBuilder onDuplicateKey = new StringBuilder();
880+
onDuplicateKey.append(ON_DUPLICATE_KEY_UPDATE);
881+
for (int i = 0; i < classNames.size(); i++) {
882+
onDuplicateKey.append(getAspectColumnName(urn.getEntityType(), classNames.get(i)));
883+
onDuplicateKey.append(" = :aspect").append(i);
884+
if (i != classNames.size() - 1) {
885+
onDuplicateKey.append(", ");
886+
}
887+
}
888+
onDuplicateKey.append(DELETED_TS_SET_VALUE_CONDITIONALLY);
889+
return onDuplicateKey.toString();
890+
}
891+
892+
/**
893+
* Helper method to build the ON DUPLICATE KEY UPDATE clause for batchUpsert() method.
894+
* This clause always updates the row and clears deleted_ts (UPSERT semantics).
895+
*
896+
* @param urn entity URN
897+
* @param aspectLambdas list of aspect lambdas
898+
* @return the ON DUPLICATE KEY UPDATE clause string
899+
*/
900+
private String buildOnDuplicateKeyForUpsert(
901+
@Nonnull URN urn,
902+
@Nonnull List<? extends BaseLocalDAO.AspectUpdateLambda<? extends RecordTemplate>> aspectLambdas) {
903+
904+
List<String> classNames = aspectLambdas.stream()
905+
.map(lambda -> lambda.getAspectClass().getCanonicalName())
906+
.collect(Collectors.toList());
907+
908+
StringBuilder onDuplicateKey = new StringBuilder();
909+
onDuplicateKey.append(ON_DUPLICATE_KEY_UPDATE);
910+
for (int i = 0; i < classNames.size(); i++) {
911+
String columnName = getAspectColumnName(urn.getEntityType(), classNames.get(i));
912+
onDuplicateKey.append(columnName).append(" = :aspect").append(i);
913+
if (i != classNames.size() - 1) {
914+
onDuplicateKey.append(", ");
915+
}
916+
}
917+
onDuplicateKey.append(", lastmodifiedon = :lastmodifiedon, deleted_ts = NULL;");
918+
return onDuplicateKey.toString();
919+
}
787920
}

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/IEbeanLocalAccess.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,22 @@ <ASPECT_UNION extends RecordTemplate> int create(@Nonnull URN urn,
7474
@Nonnull AuditStamp auditStamp,
7575
@Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode);
7676

77+
/**
78+
* Batch upsert multiple aspects for a single URN using multi-column UPDATE.
79+
* This method generates a single SQL statement that updates all aspect columns at once.
80+
*
81+
* @param urn entity URN
82+
* @param updateContexts list of aspect update contexts containing values and lambdas
83+
* @param auditStamp audit stamp for tracking
84+
* @param ingestionTrackingContext tracking context for ingestion
85+
* @param isTestMode whether the test mode is enabled or not
86+
* @return number of rows inserted or updated
87+
*/
88+
<ASPECT_UNION extends RecordTemplate> int batchUpsert(@Nonnull URN urn,
89+
@Nonnull List<BaseLocalDAO.AspectUpdateContext<RecordTemplate>> updateContexts,
90+
@Nonnull AuditStamp auditStamp,
91+
@Nullable IngestionTrackingContext ingestionTrackingContext, boolean isTestMode);
92+
7793
/**
7894
* Get read aspects from entity table. This a new schema implementation for batchGetUnion() in {@link EbeanLocalDAO}
7995
* @param keys {@link AspectKey} to retrieve aspect metadata

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/InstrumentedEbeanLocalAccess.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,15 @@ public <ASPECT_UNION extends RecordTemplate> int create(@Nonnull URN urn,
8585
auditStamp, ingestionTrackingContext, isTestMode));
8686
}
8787

88+
@Override
89+
public <ASPECT_UNION extends RecordTemplate> int batchUpsert(@Nonnull URN urn,
90+
@Nonnull List<BaseLocalDAO.AspectUpdateContext<RecordTemplate>> updateContexts,
91+
@Nonnull AuditStamp auditStamp, @Nullable IngestionTrackingContext ingestionTrackingContext,
92+
boolean isTestMode) {
93+
return instrument("batchUpsert.aspects_" + updateContexts.size(),
94+
() -> _delegate.batchUpsert(urn, updateContexts, auditStamp, ingestionTrackingContext, isTestMode));
95+
}
96+
8897
@Nonnull
8998
@Override
9099
public <ASPECT extends RecordTemplate> List<EbeanMetadataAspect> batchGetUnion(

dao-impl/ebean-dao/src/main/java/com/linkedin/metadata/dao/utils/SQLStatementUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ public class SQLStatementUtils {
9999
public static final String CLOSING_BRACKET = ") ";
100100
// deleted_ts check on create Statement SQL. This is used to set the deleted_ts to a non-null value
101101
// If a record that is NOT marked for deletion is attempted to be created again, an exception will be thrown
102-
public static final String DELETED_TS_DUPLICATE_KEY_CHECK = "ON DUPLICATE KEY UPDATE ";
102+
public static final String ON_DUPLICATE_KEY_UPDATE = "ON DUPLICATE KEY UPDATE ";
103103
public static final String DELETED_TS_SET_VALUE_CONDITIONALLY = ", deleted_ts = IF(deleted_ts IS NULL, CAST('DuplicateKeyException' AS UNSIGNED), NULL);";
104104
// "JSON_EXTRACT(%s, '$.gma_deleted') IS NOT NULL" is used to exclude soft-deleted entity which has no lastmodifiedon.
105105
// for details, see the known limitations on https://github.com/linkedin/datahub-gma/pull/311. Same reason for

0 commit comments

Comments
 (0)